source: fedd/federation/protogeni_access.py @ a452f3c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02 version-3.00
Last change on this file since a452f3c was a452f3c, checked in by Ted Faber <faber@…>, 14 years ago

remove some fdebugging, bump version numbers

  • Property mode set to 100644
File size: 47.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17from M2Crypto.SSL import SSLError
18
19from util import *
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from access import access_base
31from proxy_segment import proxy_segment
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class protogeni_proxy(proxy_segment):
45    class ProtoGENIError(Exception): 
46        def __init__(self, op, code, output):
47            Exception.__init__(self, output)
48            self.op = op
49            self.code = code
50            self.output = output
51
52    def __init__(self, log=None, keyfile=None, debug=False, 
53            ch_url=None, sa_url=None, cm_url=None):
54        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
55
56        self.ProtoGENIError = protogeni_proxy.ProtoGENIError
57        self.ch_url = ch_url
58        self.sa_url = sa_url
59        self.cm_url = cm_url
60
61        self.call_SetValue = service_caller('SetValue')
62
63        self.debug_fail = ['Resolve']
64        self.debug_response = {
65                'RedeemTicket': ("XML blob1", "XML blob2"),
66                'SliceStatus': { 'status': 'ready' },
67            }
68
69
70    def pg_call(self, url, method, params, context):
71        max_retries = 5
72        retries = 0
73
74        s = service_caller(method, request_body_name="", strict=False)
75        self.log.debug("Calling %s %s" % (url, method))
76        if not self.debug:
77            while retries < max_retries:
78                r = s.call_xmlrpc_service(url, params, context=context)
79                code = r.get('code', -1)
80                if code == 0:
81                    # Success leaves the loop here
82                    return r.get('value', None)
83                elif code == 14 and retries +1 < max_retries:
84                    # Busy resource
85                    retries+= 1
86                    self.log.info("Resource busy, retrying in 30 secs")
87                    time.sleep(30)
88                else:
89                    # NB: out of retries falls through to here
90                    raise self.ProtoGENIError(op=method, 
91                            code=r.get('code', 'unknown'), 
92                            output=r.get('output', 'No output'))
93        else:
94            if method in self.debug_fail:
95                raise self.ProtoGENIError(op=method, code='unknown',
96                        output='No output')
97            elif self.debug_response.has_key(method):
98                return self.debug_response[method]
99            else:
100                return "%s XML blob" % method
101
102
103
104class access(access_base):
105    """
106    The implementation of access control based on mapping users to projects.
107
108    Users can be mapped to existing projects or have projects created
109    dynamically.  This implements both direct requests and proxies.
110    """
111
112    def __init__(self, config=None, auth=None):
113        """
114        Initializer.  Pulls parameters out of the ConfigParser's access section.
115        """
116
117        access_base.__init__(self, config, auth)
118
119        self.domain = config.get("access", "domain")
120        self.userconfdir = config.get("access","userconfdir")
121        self.userconfcmd = config.get("access","userconfcmd")
122        self.userconfurl = config.get("access","userconfurl")
123        self.federation_software = config.get("access", "federation_software")
124        self.portal_software = config.get("access", "portal_software")
125        self.ssh_port = config.get("access","ssh_port") or "22"
126        self.sshd = config.get("access","sshd")
127        self.sshd_config = config.get("access", "sshd_config")
128        self.access_type = config.get("access", "type")
129        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
130        self.staging_host = config.get("access", "staging_host") \
131                or "ops.emulab.net"
132        self.local_seer_software = config.get("access", "local_seer_software")
133        self.local_seer_image = config.get("access", "local_seer_image")
134        self.local_seer_start = config.get("access", "local_seer_start")
135   
136        self.dragon_endpoint = config.get("access", "dragon")
137        self.dragon_vlans = config.get("access", "dragon_vlans")
138        self.deter_internal = config.get("access", "deter_internal")
139
140        self.tunnel_config = config.getboolean("access", "tunnel_config")
141        self.portal_command = config.get("access", "portal_command")
142        self.portal_image = config.get("access", "portal_image")
143        self.portal_type = config.get("access", "portal_type") or "pc"
144        self.portal_startcommand = config.get("access", "portal_startcommand")
145        self.node_startcommand = config.get("access", "node_startcommand")
146
147        self.federation_software = self.software_list(self.federation_software)
148        self.portal_software = self.software_list(self.portal_software)
149        self.local_seer_software = self.software_list(self.local_seer_software)
150
151        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
152        self.renewal_interval = int(self.renewal_interval) * 60
153
154        self.ch_url = config.get("access", "ch_url")
155        self.sa_url = config.get("access", "sa_url")
156        self.cm_url = config.get("access", "cm_url")
157
158        self.restricted = [ ]
159
160        # read_state in the base_class
161        self.state_lock.acquire()
162        for a  in ('allocation', 'projects', 'keys', 'types'):
163            if a not in self.state:
164                self.state[a] = { }
165        self.allocation = self.state['allocation']
166        self.projects = self.state['projects']
167        self.keys = self.state['keys']
168        self.types = self.state['types']
169        # Add the ownership attributes to the authorizer.  Note that the
170        # indices of the allocation dict are strings, but the attributes are
171        # fedids, so there is a conversion.
172        for k in self.state.get('allocation', {}).keys():
173            for o in self.state['allocation'][k].get('owners', []):
174                self.auth.set_attribute(o, fedid(hexstr=k))
175            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
176
177        self.state_lock.release()
178
179
180        self.log = logging.getLogger("fedd.access")
181        set_log_level(config, "access", self.log)
182
183        self.access = { }
184        if config.has_option("access", "accessdb"):
185            self.read_access(config.get("access", "accessdb"), 
186                    access_obj=self.make_access_info)
187
188        self.lookup_access = self.lookup_access_base
189
190        self.call_SetValue = service_caller('SetValue')
191        self.call_GetValue = service_caller('GetValue')
192        self.exports = {
193                'local_seer_control': self.export_local_seer,
194                'seer_master': self.export_seer_master,
195                'hide_hosts': self.export_hide_hosts,
196                }
197
198        if not self.local_seer_image or not self.local_seer_software or \
199                not self.local_seer_start:
200            if 'local_seer_control' in self.exports:
201                del self.exports['local_seer_control']
202
203        if not self.local_seer_image or not self.local_seer_software or \
204                not self.seer_master_start:
205            if 'seer_master' in self.exports:
206                del self.exports['seer_master']
207
208        self.RenewSlices()
209
210        self.soap_services = {\
211            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
212            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
213            'StartSegment': soap_handler("StartSegment", self.StartSegment),
214            'TerminateSegment': soap_handler("TerminateSegment", 
215                self.TerminateSegment),
216            }
217        self.xmlrpc_services =  {\
218            'RequestAccess': xmlrpc_handler('RequestAccess',
219                self.RequestAccess),
220            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
221                self.ReleaseAccess),
222            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
223            'TerminateSegment': xmlrpc_handler('TerminateSegment',
224                self.TerminateSegment),
225            }
226
227    @staticmethod
228    def make_access_info(s):
229        """
230        Split a string of the form (id, id, id, id) ito its constituent tuples
231        and return them as a tuple.  Use to import access info from the
232        access_db.
233        """
234
235        ss = s.strip()
236        if ss.startswith('(') and ss.endswith(')'):
237            l = [ s.strip() for s  in ss[1:-1].split(",")]
238            if len(l) == 4:
239                return tuple(l)
240            else:
241                raise self.parse_error(
242                        "Exactly 4 elements in access info required")
243        else:
244            raise self.parse_error("Expecting parenthezied values")
245
246
247    def get_handler(self, path, fid):
248        self.log.info("Get handler %s %s" % (path, fid))
249        if self.auth.check_attribute(fid, path) and self.userconfdir:
250            return ("%s/%s" % (self.userconfdir, path), "application/binary")
251        else:
252            return (None, None)
253
254    def build_access_response(self, alloc_id, services):
255        """
256        Create the SOAP response.
257
258        Build the dictionary description of the response and use
259        fedd_utils.pack_soap to create the soap message.  ap is the allocate
260        project message returned from a remote project allocation (even if that
261        allocation was done locally).
262        """
263        # Because alloc_id is already a fedd_services_types.IDType_Holder,
264        # there's no need to repack it
265        msg = { 
266                'allocID': alloc_id,
267                'fedAttr': [
268                    { 'attribute': 'domain', 'value': self.domain } , 
269                ]
270            }
271        if self.dragon_endpoint:
272            msg['fedAttr'].append({'attribute': 'dragon',
273                'value': self.dragon_endpoint})
274        if self.deter_internal:
275            msg['fedAttr'].append({'attribute': 'deter_internal',
276                'value': self.deter_internal})
277        #XXX: ??
278        if self.dragon_vlans:
279            msg['fedAttr'].append({'attribute': 'vlans',
280                'value': self.dragon_vlans})
281
282        if services:
283            msg['service'] = services
284        return msg
285
286    def RequestAccess(self, req, fid):
287        """
288        Handle the access request.
289        """
290
291        # The dance to get into the request body
292        if req.has_key('RequestAccessRequestBody'):
293            req = req['RequestAccessRequestBody']
294        else:
295            raise service_error(service_error.req, "No request!?")
296
297        if req.has_key('destinationTestbed'):
298            dt = unpack_id(req['destinationTestbed'])
299
300        # Request for this fedd
301        found, match = self.lookup_access(req, fid)
302        services, svc_state = self.export_services(req.get('service',[]),
303                None, None)
304        # keep track of what's been added
305        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
306        aid = unicode(allocID)
307
308        self.state_lock.acquire()
309        self.allocation[aid] = { }
310        # The protoGENI certificate
311        self.allocation[aid]['credentials'] = found
312        # The list of owner FIDs
313        self.allocation[aid]['owners'] = [ fid ]
314        self.write_state()
315        self.state_lock.release()
316        self.auth.set_attribute(fid, allocID)
317        self.auth.set_attribute(allocID, allocID)
318
319        try:
320            f = open("%s/%s.pem" % (self.certdir, aid), "w")
321            print >>f, alloc_cert
322            f.close()
323        except EnvironmentError, e:
324            raise service_error(service_error.internal, 
325                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
326        return self.build_access_response({ 'fedid': allocID }, None)
327
328
329    def ReleaseAccess(self, req, fid):
330        # The dance to get into the request body
331        if req.has_key('ReleaseAccessRequestBody'):
332            req = req['ReleaseAccessRequestBody']
333        else:
334            raise service_error(service_error.req, "No request!?")
335
336        # Local request
337        try:
338            if req['allocID'].has_key('localname'):
339                auth_attr = aid = req['allocID']['localname']
340            elif req['allocID'].has_key('fedid'):
341                aid = unicode(req['allocID']['fedid'])
342                auth_attr = req['allocID']['fedid']
343            else:
344                raise service_error(service_error.req,
345                        "Only localnames and fedids are understood")
346        except KeyError:
347            raise service_error(service_error.req, "Badly formed request")
348
349        self.log.debug("[access] deallocation requested for %s", aid)
350        if not self.auth.check_attribute(fid, auth_attr):
351            self.log.debug("[access] deallocation denied for %s", aid)
352            raise service_error(service_error.access, "Access Denied")
353
354        self.state_lock.acquire()
355        if self.allocation.has_key(aid):
356            self.log.debug("Found allocation for %s" %aid)
357            del self.allocation[aid]
358            self.write_state()
359            self.state_lock.release()
360            # And remove the access cert
361            cf = "%s/%s.pem" % (self.certdir, aid)
362            self.log.debug("Removing %s" % cf)
363            os.remove(cf)
364            return { 'allocID': req['allocID'] } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    def manifest_to_dict(self, manifest, ignore_debug=False):
370        """
371        Turn the manifest into a dict were each virtual nodename (i.e. the
372        topdl name) has an entry with the allocated machine in hostname and the
373        interfaces in 'interfaces'.  I love having XML parser code lying
374        around.
375        """
376        if self.create_debug and not ignore_debug: 
377            self.log.debug("Returning null manifest dict")
378            return { }
379
380        # The class allows us to keep a little state - the dict under
381        # consteruction and the current entry in that dict for the interface
382        # element code.
383        class manifest_parser:
384            def __init__(self):
385                self.d = { }
386                self.current_key=None
387
388            # If the element is a node, create a dict entry for it.  If it's an
389            # interface inside a node, add an entry in the interfaces list with
390            # the virtual name and component id.
391            def start_element(self, name, attrs):
392                if name == 'node':
393                    self.current_key = attrs.get('virtual_id',"")
394                    if self.current_key:
395                        self.d[self.current_key] = {
396                                'hostname': attrs.get('hostname', None),
397                                'interfaces': { }
398                                }
399                elif name == 'interface' and self.current_key:
400                    self.d[self.current_key]['interfaces']\
401                            [attrs.get('virtual_id','')] = \
402                            attrs.get('component_id', None)
403            #  When a node is finished, clear current_key
404            def end_element(self, name):
405                if name == 'node': self.current_key = None
406
407        node = { }
408
409        mp = manifest_parser()
410        p = xml.parsers.expat.ParserCreate()
411        # These are bound to the class we just created
412        p.StartElementHandler = mp.start_element
413        p.EndElementHandler = mp.end_element
414
415        p.Parse(manifest)
416        # Make the node dict that the callers expect
417        for k in mp.d:
418            node[k] = mp.d.get('hostname', '')
419        return mp.d
420
421    def fake_manifest(self, topo):
422        """
423        Fake the output of manifest_to_dict with a bunch of generic node an
424        interface names, for debugging.
425        """
426        node = { }
427        for i, e in enumerate([ e for e in topo.elements \
428                if isinstance(e, topdl.Computer)]):
429            node[e.name] = { 
430                    'hostname': "node%03d" % i,
431                    'interfaces': { }
432                    }
433            for j, inf in enumerate(e.interface):
434                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
435
436        return node
437
438
439    def generate_portal_configs(self, topo, pubkey_base, 
440            secretkey_base, tmpdir, leid, connInfo, services, nodes):
441
442        def conninfo_to_dict(key, info):
443            """
444            Make a cpoy of the connection information about key, and flatten it
445            into a single dict by parsing out any feddAttrs.
446            """
447
448            rv = None
449            for i in info:
450                if key == i.get('portal', "") or \
451                        key in [e.get('element', "") \
452                        for e in i.get('member', [])]:
453                    rv = i.copy()
454                    break
455
456            else:
457                return rv
458
459            if 'fedAttr' in rv:
460                for a in rv['fedAttr']:
461                    attr = a.get('attribute', "")
462                    val = a.get('value', "")
463                    if attr and attr not in rv:
464                        rv[attr] = val
465                del rv['fedAttr']
466            return rv
467
468        # XXX: un hardcode this
469        def client_null(f, s):
470            print >>f, "Service: %s" % s['name']
471       
472        def client_seer_master(f, s):
473            print >>f, 'PortalAlias: seer-master'
474
475        def client_smb(f, s):
476            print >>f, "Service: %s" % s['name']
477            smbshare = None
478            smbuser = None
479            smbproj = None
480            for a in s.get('fedAttr', []):
481                if a.get('attribute', '') == 'SMBSHARE':
482                    smbshare = a.get('value', None)
483                elif a.get('attribute', '') == 'SMBUSER':
484                    smbuser = a.get('value', None)
485                elif a.get('attribute', '') == 'SMBPROJ':
486                    smbproj = a.get('value', None)
487
488            if all((smbshare, smbuser, smbproj)):
489                print >>f, "SMBshare: %s" % smbshare
490                print >>f, "ProjectUser: %s" % smbuser
491                print >>f, "ProjectName: %s" % smbproj
492
493        def client_hide_hosts(f, s):
494            for a in s.get('fedAttr', [ ]):
495                if a.get('attribute', "") == 'hosts':
496                    print >>f, 'Hide: %s' % a.get('value', "")
497
498        client_service_out = {
499                'SMB': client_smb,
500                'tmcd': client_null,
501                'seer': client_null,
502                'userconfig': client_null,
503                'project_export': client_null,
504                'seer_master': client_seer_master,
505                'hide_hosts': client_hide_hosts,
506            }
507
508        def client_seer_master_export(f, s):
509            print >>f, "AddedNode: seer-master"
510
511        def client_seer_local_export(f, s):
512            print >>f, "AddedNode: control"
513
514        client_export_service_out = {
515                'seer_master': client_seer_master_export,
516                'local_seer_control': client_seer_local_export,
517            }
518
519        def server_port(f, s):
520            p = urlparse(s.get('server', 'http://localhost'))
521            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
522
523        def server_null(f,s): pass
524
525        def server_seer(f, s):
526            print >>f, 'seer: true'
527
528        server_service_out = {
529                'SMB': server_port,
530                'tmcd': server_port,
531                'userconfig': server_null,
532                'project_export': server_null,
533                'seer': server_seer,
534                'seer_master': server_port,
535                'hide_hosts': server_null,
536            }
537        # XXX: end un hardcode this
538
539
540        seer_out = False
541        client_out = False
542        for e in [ e for e in topo.elements \
543                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
544            myname = e.name
545            type = e.get_attribute('portal_type')
546
547            info = conninfo_to_dict(myname, connInfo)
548
549            if not info:
550                raise service_error(service_error.req,
551                        "No connectivity info for %s" % myname)
552
553            # Translate to physical name (ProtoGENI doesn't have DNS)
554            physname = nodes.get(myname, { }).get('hostname', None)
555            peer = info.get('peer', "")
556            ldomain = self.domain
557            ssh_port = info.get('ssh_port', 22)
558
559            # Collect this for the client.conf file
560            if 'masterexperiment' in info:
561                mproj, meid = info['masterexperiment'].split("/", 1)
562
563            active = info.get('active', 'False')
564
565            if type in ('control', 'both'):
566                testbed = e.get_attribute('testbed')
567                control_gw = myname
568
569            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
570            tunnelconfig = self.tunnel_config
571            try:
572                f = open(cfn, "w")
573                if active == 'True':
574                    print >>f, "active: True"
575                    print >>f, "ssh_port: %s" % ssh_port
576                    if type in ('control', 'both'):
577                        for s in [s for s in services \
578                                if s.get('name', "") in self.imports]:
579                            server_service_out[s['name']](f, s)
580
581                if tunnelconfig:
582                    print >>f, "tunnelip: %s" % tunnelconfig
583                print >>f, "peer: %s" % peer.lower()
584                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
585                        pubkey_base
586                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
587                        secretkey_base
588                f.close()
589            except EnvironmentError, e:
590                raise service_error(service_error.internal,
591                        "Can't write protal config %s: %s" % (cfn, e))
592
593        # Done with portals, write the client config file.
594        try:
595            f = open("%s/client.conf" % tmpdir, "w")
596            if control_gw:
597                print >>f, "ControlGateway: %s" % physname.lower()
598            for s in services:
599                if s.get('name',"") in self.imports and \
600                        s.get('visibility','') == 'import':
601                    client_service_out[s['name']](f, s)
602                if s.get('name', '') in self.exports and \
603                        s.get('visibility', '') == 'export' and \
604                        s['name'] in client_export_service_out:
605                    client_export_service_out[s['name']](f, s)
606            # Seer uses this.
607            if mproj and meid:
608                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
609            f.close()
610        except EnvironmentError, e:
611            raise service_error(service_error.internal,
612                    "Cannot write client.conf: %s" %s)
613
614
615
616    def export_store_info(self, cf, nodes, ssh_port, connInfo):
617        """
618        For the export requests in the connection info, install the peer names
619        at the experiment controller via SetValue calls.
620        """
621
622        for c in connInfo:
623            for p in [ p for p in c.get('parameter', []) \
624                    if p.get('type', '') == 'output']:
625
626                if p.get('name', '') == 'peer':
627                    k = p.get('key', None)
628                    surl = p.get('store', None)
629                    if surl and k and k.index('/') != -1:
630                        if self.create_debug:
631                            req = { 'name': k, 'value': 'debug' }
632                            self.call_SetValue(surl, req, cf)
633                        else:
634                            n = nodes.get(k[k.index('/')+1:], { })
635                            value = n.get('hostname', None)
636                            if value:
637                                req = { 'name': k, 'value': value }
638                                self.call_SetValue(surl, req, cf)
639                            else:
640                                self.log.error("No hostname for %s" % \
641                                        k[k.index('/'):])
642                    else:
643                        self.log.error("Bad export request: %s" % p)
644                elif p.get('name', '') == 'ssh_port':
645                    k = p.get('key', None)
646                    surl = p.get('store', None)
647                    if surl and k:
648                        req = { 'name': k, 'value': ssh_port }
649                        self.call_SetValue(surl, req, cf)
650                    else:
651                        self.log.error("Bad export request: %s" % p)
652                else:
653
654                    self.log.error("Unknown export parameter: %s" % \
655                            p.get('name'))
656                    continue
657
658    def write_node_config_script(self, elem, node, user, pubkey,
659            secretkey, stagingdir, tmpdir):
660        """
661        Write out the configuration script that is to run on the node
662        represented by elem in the topology.  This is called
663        once per node to configure.
664        """
665        # These little functions/functors just make things more readable.  Each
666        # one encapsulates a small task of copying software files or installing
667        # them.
668        class stage_file_type:
669            """
670            Write code copying file sfrom the staging host to the host on which
671            this will run.
672            """
673            def __init__(self, user, host, stagingdir):
674                self.user = user
675                self.host = host
676                self.stagingdir = stagingdir
677                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
678                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
679
680            def __call__(self, script, file, dest="."):
681                # If the file is a full pathname, do not use stagingdir
682                if file.find('/') == -1:
683                    file = "%s/%s" % (self.stagingdir, file)
684                print >>script, "%s %s@%s:%s %s" % \
685                        (self.scp, self.user, self.host, file, dest)
686
687        def install_tar(script, loc, base):
688            """
689            Print code to script to install a tarfile in loc.
690            """
691            tar = "/bin/tar"
692            mkdir="/bin/mkdir"
693
694            print >>script, "%s -p %s" % (mkdir, loc)
695            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
696
697        def install_rpm(script, base):
698            """
699            Print code to script to install an rpm
700            """
701            rpm = "/bin/rpm"
702            print >>script, "%s --install %s" % (rpm, base)
703
704        ifconfig = "/sbin/ifconfig"
705        stage_file = stage_file_type(user, self.staging_host, stagingdir)
706        pname = node.get('hostname', None)
707        fed_dir = "/usr/local/federation"
708        fed_etc_dir = "%s/etc" % fed_dir
709        fed_bin_dir = "%s/bin" % fed_dir
710        fed_lib_dir = "%s/lib" % fed_dir
711
712        if pname:
713            sfile = "%s/%s.startup" % (tmpdir, pname)
714            script = open(sfile, "w")
715            # Reset the interfaces to the ones in the topo file
716            for i in [ i for i in elem.interface \
717                    if not i.get_attribute('portal')]:
718                pinf = node['interfaces'].get(i.name, None)
719                addr = i.get_attribute('ip4_address') 
720                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
721                if pinf and addr:
722                    print >>script, \
723                            "%s %s %s netmask %s"  % \
724                            (ifconfig, pinf, addr, netmask)
725                else:
726                    self.log.error("Missing interface or address for %s" \
727                            % i.name)
728               
729            for l, f in self.federation_software:
730                base = os.path.basename(f)
731                stage_file(script, base)
732                if l: install_tar(script, l, base)
733                else: install_rpm(script, base)
734
735            for s in elem.software:
736                s_base = s.location.rpartition('/')[2]
737                stage_file(script, s_base)
738                if s.install: install_tar(script, s.install, s_base)
739                else: install_rpm(script, s_base)
740
741            for f in ('hosts', pubkey, secretkey, 'client.conf', 
742                    'userconf'):
743                stage_file(script, f, fed_etc_dir)
744            if self.sshd:
745                stage_file(script, self.sshd, fed_bin_dir)
746            if self.sshd_config:
747                stage_file(script, self.sshd_config, fed_etc_dir)
748
749            # Look in tmpdir to get the names.  They've all been copied
750            # into the (remote) staging dir
751            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
752                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
753
754            # Hackery dackery dock: the ProtoGENI python is really ancient.
755            # A modern version (though packaged for Mandrake (remember
756            # Mandrake?  good times, good times)) should be in the
757            # federation_software list, but we need to move rename is for
758            # SEER.
759            print >>script, "rm /usr/bin/python"
760            print >>script, "ln /usr/bin/python2.4 /usr/bin/python"
761            # Back to less hacky stuff
762
763            # Start commands
764            if elem.get_attribute('portal') and self.portal_startcommand:
765                # Install portal software
766                for l, f in self.portal_software:
767                    base = os.path.basename(f)
768                    stage_file(script, base)
769                    if l: install_tar(script, l, base)
770                    else: install_rpm(script, base)
771
772                # Portals never have a user-specified start command
773                print >>script, self.portal_startcommand
774            elif self.node_startcommand:
775                # XXX: debug
776                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
777                # XXX: debug
778                if elem.get_attribute('startup'):
779                    print >>script, "%s \\$USER '%s'" % \
780                            (self.node_startcommand,
781                                    elem.get_attribute('startup'))
782                else:
783                    print >>script, self.node_startcommand
784            script.close()
785            return sfile, pname
786        else:
787            return None, None
788
789
790    def configure_nodes(self, segment_commands, topo, nodes, user, 
791            pubkey, secretkey, stagingdir, tmpdir):
792        """
793        For each node in the topology, generate a script file that copies
794        software onto it and installs it in the proper places and then runs the
795        startup command (including the federation commands.
796        """
797
798
799
800        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
801            vname = e.name
802            sfile, pname = self.write_node_config_script(e,
803                    nodes.get(vname, { }),
804                    user, pubkey, secretkey, stagingdir, tmpdir)
805            if sfile:
806                if not segment_commands.scp_file(sfile, user, pname):
807                    self.log.error("Could not copy script to %s" % pname)
808            else:
809                self.log.error("Unmapped node: %s" % vname)
810
811    def start_node(self, user, host, node, segment_commands):
812        """
813        Copy an identity to a node for the configuration script to be able to
814        import data and then run the startup script remotely.
815        """
816        # Place an identity on the node so that the copying can succeed
817        segment_commands.ssh_cmd(user, host, "scp .ssh/id_rsa %s:.ssh" % node)
818        segment_commands.ssh_cmd(user, node, 
819                "sudo /bin/sh ./%s.startup &" % node)
820
821    def start_nodes(self, user, host, nodes, segment_commands):
822        """
823        Start a thread to initialize each node and wait for them to complete.
824        Each thread runs start_node.
825        """
826        threads = [ ]
827        for n in nodes:
828            t = Thread(target=self.start_node, args=(user, host, n, 
829                segment_commands))
830            t.start()
831            threads.append(t)
832
833        done = [not t.isAlive() for t in threads]
834        while not all(done):
835            self.log.info("Waiting for threads %s" % done)
836            time.sleep(10)
837            done = [not t.isAlive() for t in threads]
838
839    def set_up_staging_filespace(self, segment_commands, user, host,
840            stagingdir):
841        """
842        Set up teh staging area on the staging machine.  To reduce the number
843        of ssh commands, we compose a script and execute it remotely.
844        """
845
846        self.log.info("[start_segment]: creating script file")
847        try:
848            sf, scriptname = tempfile.mkstemp()
849            scriptfile = os.fdopen(sf, 'w')
850        except EnvironmentError:
851            return False
852
853        scriptbase = os.path.basename(scriptname)
854
855        # Script the filesystem changes
856        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
857        print >>scriptfile, 'mkdir -p %s' % stagingdir
858        print >>scriptfile, "rm -f %s" % scriptbase
859        scriptfile.close()
860
861        # Move the script to the remote machine
862        # XXX: could collide tempfile names on the remote host
863        if segment_commands.scp_file(scriptname, user, host, scriptbase):
864            os.remove(scriptname)
865        else:
866            return False
867
868        # Execute the script (and the script's last line deletes it)
869        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
870            return False
871
872    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
873        """
874        Protogeni interactions take a context and a protogeni certificate.
875        This establishes both for later calls and returns them.
876        """
877        if os.access(certfile, os.R_OK):
878            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
879        else:
880            self.log.error("[start_segment]: Cannot read certfile: %s" % \
881                    certfile)
882            return None, None
883
884        try:
885            gcred = segment_commands.pg_call(self.sa_url, 
886                    'GetCredential', {}, ctxt)
887        except self.ProtoGENIError, e:
888            raise service_error(service_error.federant,
889                    "ProtoGENI: %s" % e)
890
891        return ctxt, gcred
892
893    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
894        """
895        Find a usable slice name by trying random ones until there's no
896        collision.
897        """
898
899        def random_slicename(user):
900            """
901            Return a random slicename by appending 5 letters to the username.
902            """
903            slicename = user
904            for i in range(0,5):
905                slicename += random.choice(string.ascii_letters)
906            return slicename
907
908        while True:
909            slicename = random_slicename(user)
910            try:
911                param = {
912                        'credential': gcred, 
913                        'hrn': slicename,
914                        'type': 'Slice'
915                        }
916                segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt)
917            except segment_commands.ProtoGENIError, e:
918                print e
919                break
920
921        return slicename
922
923    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
924        """
925        Create the slice and allocate resources.  If any of this stuff fails,
926        the allocations will time out on PG in short order, so we just raise
927        the service_error.  Return the slice and sliver credentials as well as
928        the manifest.
929        """
930        try:
931            param = {
932                    'credential': gcred, 
933                    'hrn': slicename,
934                    'type': 'Slice'
935                    }
936            slice_cred = segment_commands.pg_call(self.sa_url, 'Register',
937                    param, ctxt)
938            #f = open("./slice_cred", "w")
939            #print >>f, slice_cred
940            #f.close()
941            # Populate the ssh keys (let PG format them)
942            param = {
943                    'credential': gcred, 
944                    }
945            keys =  segment_commands.pg_call(self.sa_url, 'GetKeys', param, 
946                    ctxt)
947            # Grab and redeem a ticket
948            param = {
949                    'credential': slice_cred, 
950                    'rspec': rspec,
951                    }
952            ticket = segment_commands.pg_call(self.cm_url, 'GetTicket', param,
953                    ctxt)
954            #f = open("./ticket", "w")
955            #print >>f, ticket
956            #f.close()
957            param = { 
958                    'credential': slice_cred, 
959                    'keys': keys,
960                    'ticket': ticket,
961                    }
962            sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 
963                    'RedeemTicket', param, ctxt)
964            #f = open("./sliver_cred", "w")
965            #print >>f, sliver_cred
966            #f.close()
967            #f = open("./manifest", "w")
968            #print >>f, manifest
969            #f.close()
970            # start 'em up
971            param = { 
972                    'credential': sliver_cred,
973                    }
974            segment_commands.pg_call(self.cm_url, 'StartSliver', param, ctxt)
975        except segment_commands.ProtoGENIError, e:
976            raise service_error(service_error.federant,
977                    "ProtoGENI: %s %s" % (e.code, e))
978
979        return (slice_cred, sliver_cred, manifest)
980
981    def wait_for_slice(self, segment_commands, slice_cred, ctxt):
982        """
983        Wait for the given slice to finish its startup.  Return the final
984        status.
985        """
986        status = 'notready'
987        try:
988            while status == 'notready':
989                param = { 
990                        'credential': slice_cred
991                        }
992                r = segment_commands.pg_call(self.cm_url,
993                        'SliceStatus', param, ctxt)
994                status = r.get('status', 'notready')
995                if status == 'notready':
996                    time.sleep(30)
997        except segment_commands.ProtoGENIError, e:
998            raise service_error(service_error.federant,
999                    "ProtoGENI: %s %s" % (e.code, e))
1000
1001        return status
1002
1003    def delete_slice(self, segment_commands, slice_cred, ctxt):
1004        """
1005        Delete the slice resources.  An error from the service is ignores,
1006        because the soft state will go away anyway.
1007        """
1008        try:
1009            param = { 'credential': slice_cred }
1010            segment_commands.pg_call(self.cm_url, 'DeleteSlice',
1011                    param, ctxt)
1012        except segment_commands.ProtoGENIError, e:
1013            self.log.warn("ProtoGENI: %s" % e)
1014
1015
1016
1017    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1018            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1019            export_certfile, topo, connInfo, services, timeout=0):
1020        """
1021        Start a sub-experiment on a federant.
1022
1023        Get the current state, modify or create as appropriate, ship data
1024        and configs and start the experiment.  There are small ordering
1025        differences based on the initial state of the sub-experiment.
1026        """
1027
1028        # Local software dir
1029        lsoftdir = "%s/software" % tmpdir
1030        host = self.staging_host
1031
1032        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1033                certfile, certpw)
1034
1035        if not ctxt: return False
1036
1037        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1038        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1039        self.log.info("Creating %s" % slicename)
1040        slice_cred, sliver_cred, manifest = self.allocate_slice(
1041                segment_commands, slicename, rspec, gcred, ctxt)
1042
1043        # With manifest in hand, we can export the portal node names.
1044        if self.create_debug: nodes = self.fake_manifest(topo)
1045        else: nodes = self.manifest_to_dict(manifest)
1046
1047        self.export_store_info(export_certfile, nodes, self.ssh_port,
1048                connInfo)
1049        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1050                ename, connInfo, services, nodes)
1051
1052        # Copy software to the staging machine (done after generation to copy
1053        # those, too)
1054        for d in (tmpdir, lsoftdir):
1055            if os.path.isdir(d):
1056                for f in os.listdir(d):
1057                    if not os.path.isdir("%s/%s" % (d, f)):
1058                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1059                                user, host, "%s/%s" % (stagingdir, f)):
1060                            self.log.error("Scp failed")
1061                            return False
1062
1063
1064        # Now we wait for the nodes to start on PG
1065        status = self.wait_for_slice(segment_commands, slice_cred, ctxt)
1066        if status == 'failed':
1067            self.log.error('Sliver failed to start on ProtoGENI')
1068            self.delete_slice(segment_commands, slice_cred, ctxt)
1069            return False
1070        else:
1071            # All good: save ProtoGENI info in shared state
1072            self.state_lock.acquire()
1073            self.allocation[aid]['slice_name'] = slicename
1074            self.allocation[aid]['slice_credential'] = slice_cred
1075            self.allocation[aid]['sliver_credential'] = sliver_cred
1076            self.allocation[aid]['manifest'] = manifest
1077            self.allocation[aid]['certfile'] = certfile
1078            self.allocation[aid]['certpw'] = certpw
1079            self.write_state()
1080            self.state_lock.release()
1081
1082        # Now we have configuration to do for ProtoGENI
1083        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1084                secretkey, stagingdir, tmpdir)
1085
1086        self.start_nodes(user, self.staging_host, 
1087                [ n.get('hostname', None) for n in nodes.values()],
1088                segment_commands)
1089
1090        # Everything has gone OK.
1091        return True, dict([(k, n.get('hostname', None)) \
1092                for k, n in nodes.items()])
1093
1094    def generate_rspec(self, topo, softdir, connInfo):
1095        t = topo.clone()
1096
1097        starts = { }
1098        # Weed out the things we aren't going to instantiate: Segments, portal
1099        # substrates, and portal interfaces.  (The copy in the for loop allows
1100        # us to delete from e.elements in side the for loop).  While we're
1101        # touching all the elements, we also adjust paths from the original
1102        # testbed to local testbed paths and put the federation commands and
1103        # startcommands into a dict so we can start them manually later.
1104        # ProtoGENI requires setup before the federation commands run, so we
1105        # run them by hand after we've seeded configurations.
1106        for e in [e for e in t.elements]:
1107            if isinstance(e, topdl.Segment):
1108                t.elements.remove(e)
1109            # Fix software paths
1110            for s in getattr(e, 'software', []):
1111                s.location = re.sub("^.*/", softdir, s.location)
1112            if isinstance(e, topdl.Computer):
1113                if e.get_attribute('portal') and self.portal_startcommand:
1114                    # Portals never have a user-specified start command
1115                    starts[e.name] = self.portal_startcommand
1116                elif self.node_startcommand:
1117                    if e.get_attribute('startup'):
1118                        starts[e.name] = "%s \\$USER '%s'" % \
1119                                (self.node_startcommand, 
1120                                        e.get_attribute('startup'))
1121                        e.remove_attribute('startup')
1122                    else:
1123                        starts[e.name] = self.node_startcommand
1124
1125                # Remove portal interfaces
1126                e.interface = [i for i in e.interface \
1127                        if not i.get_attribute('portal')]
1128
1129        t.substrates = [ s.clone() for s in t.substrates ]
1130        t.incorporate_elements()
1131
1132        # Customize the ns2 output for local portal commands and images
1133        filters = []
1134
1135        # NB: these are extra commands issued for the node, not the startcmds
1136        if self.portal_command:
1137            filters.append(topdl.generate_portal_command_filter(
1138                self.portal_command))
1139
1140        # Convert to rspec and return it
1141        exp_rspec = topdl.topology_to_rspec(t, filters)
1142
1143        return exp_rspec
1144
1145    def retrieve_software(self, topo, certfile, softdir):
1146        """
1147        Collect the software that nodes in the topology need loaded and stage
1148        it locally.  This implies retrieving it from the experiment_controller
1149        and placing it into softdir.  Certfile is used to prove that this node
1150        has access to that data (it's the allocation/segment fedid).  Finally
1151        local portal and federation software is also copied to the same staging
1152        directory for simplicity - all software needed for experiment creation
1153        is in softdir.
1154        """
1155        sw = set()
1156        for e in topo.elements:
1157            for s in getattr(e, 'software', []):
1158                sw.add(s.location)
1159        os.mkdir(softdir)
1160        for s in sw:
1161            self.log.debug("Retrieving %s" % s)
1162            try:
1163                get_url(s, certfile, softdir)
1164            except:
1165                t, v, st = sys.exc_info()
1166                raise service_error(service_error.internal,
1167                        "Error retrieving %s: %s" % (s, v))
1168
1169        # Copy local portal node software to the tempdir
1170        for s in (self.portal_software, self.federation_software):
1171            for l, f in s:
1172                base = os.path.basename(f)
1173                copy_file(f, "%s/%s" % (softdir, base))
1174
1175        # Ick.  Put this python rpm in a place that it will get moved into
1176        # the staging area.  It's a hack to install a modern (in a Roman
1177        # sense of modern) python on ProtoGENI
1178        python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
1179        if os.access("./%s" % python_rpm, os.R_OK):
1180            copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
1181
1182
1183    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1184        """
1185        Gather common configuration files, retrieve or create an experiment
1186        name and project name, and return the ssh_key filenames.  Create an
1187        allocation log bound to the state log variable as well.
1188        """
1189        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1190        ename = None
1191        pubkey_base = None
1192        secretkey_base = None
1193        alloc_log = None
1194
1195        for a in attrs:
1196            if a['attribute'] in configs:
1197                try:
1198                    self.log.debug("Retrieving %s" % a['value'])
1199                    get_url(a['value'], certfile, tmpdir)
1200                except:
1201                    t, v, st = sys.exc_info()
1202                    raise service_error(service_error.internal,
1203                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1204            if a['attribute'] == 'ssh_pubkey':
1205                pubkey_base = a['value'].rpartition('/')[2]
1206            if a['attribute'] == 'ssh_secretkey':
1207                secretkey_base = a['value'].rpartition('/')[2]
1208            if a['attribute'] == 'experiment_name':
1209                ename = a['value']
1210
1211        if not ename:
1212            ename = ""
1213            for i in range(0,5):
1214                ename += random.choice(string.ascii_letters)
1215            self.log.warn("No experiment name: picked one randomly: %s" \
1216                    % ename)
1217
1218        self.state_lock.acquire()
1219        if self.allocation.has_key(aid):
1220            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1221            self.allocation[aid]['experiment'] = ename
1222            self.allocation[aid]['log'] = [ ]
1223            # Create a logger that logs to the experiment's state object as
1224            # well as to the main log file.
1225            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1226            h = logging.StreamHandler(
1227                    list_log.list_log(self.allocation[aid]['log']))
1228            # XXX: there should be a global one of these rather than
1229            # repeating the code.
1230            h.setFormatter(logging.Formatter(
1231                "%(asctime)s %(name)s %(message)s",
1232                        '%d %b %y %H:%M:%S'))
1233            alloc_log.addHandler(h)
1234            self.write_state()
1235        else:
1236            self.log.error("No allocation for %s!?" % aid)
1237        self.state_lock.release()
1238
1239        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1240                cpw, alloc_log)
1241
1242    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1243        # Copy the assigned names into the return topology
1244        rvtopo = topo.clone()
1245        embedding = [ ]
1246        for k, n in nodes.items():
1247            embedding.append({ 
1248                'toponame': k,
1249                'physname': [n ],
1250                })
1251        # Grab the log (this is some anal locking, but better safe than
1252        # sorry)
1253        self.state_lock.acquire()
1254        logv = "".join(self.allocation[aid]['log'])
1255        # It's possible that the StartSegment call gets retried (!).
1256        # if the 'started' key is in the allocation, we'll return it rather
1257        # than redo the setup.
1258        self.allocation[aid]['started'] = { 
1259                'allocID': alloc_id,
1260                'allocationLog': logv,
1261                'segmentdescription': { 
1262                    'topdldescription': rvtopo.to_dict() },
1263                'embedding': embedding,
1264                }
1265        retval = copy.deepcopy(self.allocation[aid]['started'])
1266        self.write_state()
1267        self.state_lock.release()
1268
1269        return retval
1270
1271    def StartSegment(self, req, fid):
1272        err = None  # Any service_error generated after tmpdir is created
1273        rv = None   # Return value from segment creation
1274
1275        try:
1276            req = req['StartSegmentRequestBody']
1277            topref = req['segmentdescription']['topdldescription']
1278        except KeyError:
1279            raise service_error(service_error.req, "Badly formed request")
1280
1281        connInfo = req.get('connection', [])
1282        services = req.get('service', [])
1283        auth_attr = req['allocID']['fedid']
1284        aid = "%s" % auth_attr
1285        attrs = req.get('fedAttr', [])
1286        if not self.auth.check_attribute(fid, auth_attr):
1287            raise service_error(service_error.access, "Access denied")
1288        else:
1289            # See if this is a replay of an earlier succeeded StartSegment -
1290            # sometimes SSL kills 'em.  If so, replay the response rather than
1291            # redoing the allocation.
1292            self.state_lock.acquire()
1293            retval = self.allocation[aid].get('started', None)
1294            self.state_lock.release()
1295            if retval:
1296                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1297                        "replaying response")
1298                return retval
1299
1300        if topref:
1301            topo = topdl.Topology(**topref)
1302        else:
1303            raise service_error(service_error.req, 
1304                    "Request missing segmentdescription'")
1305
1306        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1307        try:
1308            tmpdir = tempfile.mkdtemp(prefix="access-")
1309            softdir = "%s/software" % tmpdir
1310        except EnvironmentError:
1311            raise service_error(service_error.internal, "Cannot create tmp dir")
1312
1313        # Try block alllows us to clean up temporary files.
1314        try:
1315            self.retrieve_software(topo, certfile, softdir)
1316            self.configure_userconf(services, tmpdir)
1317            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1318                cpw, alloc_log = self.initialize_experiment_info(attrs,
1319                        aid, certfile, tmpdir)
1320            self.import_store_info(certfile, connInfo)
1321            rspec = self.generate_rspec(topo, "%s/%s/" \
1322                    % (self.staging_dir, ename), connInfo)
1323
1324            segment_commands = protogeni_proxy(keyfile=ssh_key,
1325                    debug=self.create_debug, log=alloc_log,
1326                    ch_url = self.ch_url, sa_url=self.sa_url,
1327                    cm_url=self.cm_url)
1328            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1329                    pubkey_base, 
1330                    secretkey_base, ename,
1331                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1332                    certfile, topo, connInfo, services)
1333        except EnvironmentError, e:
1334            err = service_error(service_error.internal, "%s" % e)
1335        except service_error, e:
1336            err = e
1337        except:
1338            t, v, st = sys.exc_info()
1339            err = service_error(service_error.internal, "%s: %s" % \
1340                    (v, traceback.extract_tb(st)))
1341
1342        # Walk up tmpdir, deleting as we go
1343        if self.cleanup: self.remove_dirs(tmpdir)
1344        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1345
1346        if rv:
1347            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1348        elif err:
1349            raise service_error(service_error.federant,
1350                    "Swapin failed: %s" % err)
1351        else:
1352            raise service_error(service_error.federant, "Swapin failed")
1353
1354    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1355            certfile, certpw):
1356        """
1357        Stop a sub experiment by calling swapexp on the federant
1358        """
1359        host = self.staging_host
1360        rv = False
1361        try:
1362            # Clean out tar files: we've gone over quota in the past
1363            if stagingdir:
1364                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1365            if slice_cred:
1366                self.log.error('Removing Sliver on ProtoGENI')
1367                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1368                self.delete_slice(segment_commands, slice_cred, ctxt)
1369            return True
1370        except self.ssh_cmd_timeout:
1371            rv = False
1372        return rv
1373
1374    def TerminateSegment(self, req, fid):
1375        try:
1376            req = req['TerminateSegmentRequestBody']
1377        except KeyError:
1378            raise service_error(service_error.req, "Badly formed request")
1379
1380        auth_attr = req['allocID']['fedid']
1381        aid = "%s" % auth_attr
1382        attrs = req.get('fedAttr', [])
1383        if not self.auth.check_attribute(fid, auth_attr):
1384            raise service_error(service_error.access, "Access denied")
1385
1386        self.state_lock.acquire()
1387        if self.allocation.has_key(aid):
1388            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1389            slice_cred = self.allocation[aid].get('slice_credential', None)
1390            ename = self.allocation[aid].get('experiment', None)
1391        else:
1392            cf, user, ssh_key, cpw = (None, None, None, None)
1393            slice_cred = None
1394            ename = None
1395        self.state_lock.release()
1396
1397        if ename:
1398            staging = "%s/%s" % ( self.staging_dir, ename)
1399        else:
1400            self.log.warn("Can't find experiment name for %s" % aid)
1401            staging = None
1402
1403        segment_commands = protogeni_proxy(keyfile=ssh_key,
1404                debug=self.create_debug, ch_url = self.ch_url,
1405                sa_url=self.sa_url, cm_url=self.cm_url)
1406        self.stop_segment(segment_commands, user, staging, slice_cred, cf, cpw)
1407        return { 'allocID': req['allocID'] }
1408
1409    def renew_segment(self, segment_commands, name, scred, interval, 
1410            certfile, certpw):
1411        """
1412        Linear code through the segment renewal calls.
1413        """
1414        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1415        try:
1416            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1417                    time.gmtime(time.time() + interval))
1418            cred = segment_commands.pg_call(self.sa_url, 'GetCredential', 
1419                    {}, ctxt)
1420
1421            param = {
1422                    'credential': scred,
1423                    'expiration': expiration
1424                    }
1425            r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt)
1426            param = {
1427                    'credential': cred,
1428                    'hrn': name,
1429                    'type': 'Slice',
1430                    }
1431            slice = segment_commands.pg_call(self.sa_url, 'Resolve', 
1432                    param, ctxt)
1433            uuid = slice.get('uuid', None)
1434            if uuid == None:
1435                sys.exit('No uuid for %s' % slicename)
1436
1437            print 'Calling GetCredential (uuid)'
1438            param = {
1439                    'credential': cred,
1440                    'uuid': uuid,
1441                    'type': 'Slice',
1442                    }
1443            new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential',
1444                    param, ctxt)
1445            #f = open('./new_slice_cred', 'w')
1446            #print >>f, new_scred
1447            #f.close()
1448
1449        except segment_commands.ProtoGENIError, e:
1450            self.log.error("Failed to extend slice %s: %s" % (name, e))
1451            return None
1452        try:
1453            print 'Calling RenewSlice (CM)'
1454            param = {
1455                    'credential': new_scred,
1456                    }
1457            r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt)
1458        except segment_commands.ProtoGENIError, e:
1459            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1460
1461        return new_scred
1462   
1463
1464    def RenewSlices(self):
1465        self.log.info("Scanning for slices to renew")
1466        self.state_lock.acquire()
1467        aids = self.allocation.keys()
1468        self.state_lock.release()
1469
1470        for aid in aids:
1471            self.state_lock.acquire()
1472            if self.allocation.has_key(aid):
1473                name = self.allocation[aid].get('slice_name', None)
1474                scred = self.allocation[aid].get('slice_credential', None)
1475                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1476            else:
1477                name = None
1478                scred = None
1479            self.state_lock.release()
1480
1481            if not os.access(cf, os.R_OK):
1482                self.log.error(
1483                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1484                continue
1485
1486            # There's a ProtoGENI slice associated with the segment; renew it.
1487            if name and scred:
1488                segment_commands = protogeni_proxy(log=self.log, 
1489                        debug=self.create_debug, keyfile=ssh_key,
1490                        cm_url = self.cm_url, sa_url = self.sa_url,
1491                        ch_url = self.ch_url)
1492                new_scred = self.renew_segment(segment_commands, name, scred, 
1493                        self.renewal_interval, cf, cpw)
1494                if new_scred:
1495                    self.log.info("Slice %s renewed until %s GMT" % \
1496                            (name, time.asctime(time.gmtime(\
1497                                time.time()+self.renewal_interval))))
1498                    self.state_lock.acquire()
1499                    if self.allocation.has_key(aid):
1500                        self.allocation[aid]['slice_credential'] = new_scred
1501                    self.state_lock.release()
1502                else:
1503                    self.log.info("Failed to renew slice %s " % name)
1504
1505        # Let's do this all again soon.  (4 tries before the slices time out)   
1506        t = Timer(self.renewal_interval/4, self.RenewSlices)
1507        t.start()
Note: See TracBrowser for help on using the repository browser.