source: fedd/federation/protogeni_access.py @ 94a4267

axis_examplecompt_changesinfo-ops
Last change on this file since 94a4267 was e83f2f2, checked in by Ted Faber <faber@…>, 14 years ago

Move proofs around. Lots of changes, including fault handling.

  • Property mode set to 100644
File size: 48.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from fedid import fedid, generate_fedid
20from authorizer import authorizer, abac_authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28from access import access_base
29from legacy_access import legacy_access
30from protogeni_proxy import protogeni_proxy
31from geniapi_proxy import geniapi_proxy
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.domain = config.get("access", "domain")
60        self.userconfdir = config.get("access","userconfdir")
61        self.userconfcmd = config.get("access","userconfcmd")
62        self.userconfurl = config.get("access","userconfurl")
63        self.federation_software = config.get("access", "federation_software")
64        self.portal_software = config.get("access", "portal_software")
65        self.ssh_port = config.get("access","ssh_port") or "22"
66        self.sshd = config.get("access","sshd")
67        self.sshd_config = config.get("access", "sshd_config")
68        self.access_type = config.get("access", "type")
69        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
70        self.staging_host = config.get("access", "staging_host") \
71                or "ops.emulab.net"
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75   
76        self.dragon_endpoint = config.get("access", "dragon")
77        self.dragon_vlans = config.get("access", "dragon_vlans")
78        self.deter_internal = config.get("access", "deter_internal")
79
80        self.tunnel_config = config.getboolean("access", "tunnel_config")
81        self.portal_command = config.get("access", "portal_command")
82        self.portal_image = config.get("access", "portal_image")
83        self.portal_type = config.get("access", "portal_type") or "pc"
84        self.portal_startcommand = config.get("access", "portal_startcommand")
85        self.node_startcommand = config.get("access", "node_startcommand")
86
87        self.federation_software = self.software_list(self.federation_software)
88        self.portal_software = self.software_list(self.portal_software)
89        self.local_seer_software = self.software_list(self.local_seer_software)
90
91        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
92        self.renewal_interval = int(self.renewal_interval) * 60
93
94        self.ch_url = config.get("access", "ch_url")
95        self.sa_url = config.get("access", "sa_url")
96        self.cm_url = config.get("access", "cm_url")
97
98        self.restricted = [ ]
99
100        # read_state in the base_class
101        self.state_lock.acquire()
102        for a  in ('allocation', 'projects', 'keys', 'types'):
103            if a not in self.state:
104                self.state[a] = { }
105        self.allocation = self.state['allocation']
106        self.projects = self.state['projects']
107        self.keys = self.state['keys']
108        self.types = self.state['types']
109        self.state_lock.release()
110
111
112        self.log = logging.getLogger("fedd.access")
113        set_log_level(config, "access", self.log)
114
115        # authorization information
116        self.auth_type = config.get('access', 'auth_type') \
117                or 'legacy'
118        self.auth_dir = config.get('access', 'auth_dir')
119        accessdb = config.get("access", "accessdb")
120        # initialize the authorization system
121        if self.auth_type == 'legacy':
122            self.access = { }
123            if accessdb:
124                self.legacy_read_access(accessdb, self.make_access_info)
125            # Add the ownership attributes to the authorizer.  Note that the
126            # indices of the allocation dict are strings, but the attributes are
127            # fedids, so there is a conversion.
128            self.state_lock.acquire()
129            for k in self.state.get('allocation', {}).keys():
130                for o in self.state['allocation'][k].get('owners', []):
131                    self.auth.set_attribute(o, fedid(hexstr=k))
132                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
133
134            self.state_lock.release()
135            self.lookup_access = self.legacy_lookup_access_base
136        elif self.auth_type == 'abac':
137            self.auth = abac_authorizer(load=self.auth_dir)
138            self.access = [ ]
139            if accessdb:
140                self.read_access(accessdb, self.make_access_info)
141        else:
142            raise service_error(service_error.internal, 
143                    "Unknown auth_type: %s" % self.auth_type)
144        api = config.get("access", "api") or "protogeni"
145        if api == "protogeni":
146            self.api_proxy = protogeni_proxy
147        elif api == "geniapi":
148            self.api_proxy = geniapi_proxy
149        else:
150            self.log.debug("Unknown interface, using protogeni")
151            self.api_proxy = protogeni_proxy
152
153        self.call_SetValue = service_caller('SetValue')
154        self.call_GetValue = service_caller('GetValue')
155        self.exports = {
156                'local_seer_control': self.export_local_seer,
157                'seer_master': self.export_seer_master,
158                'hide_hosts': self.export_hide_hosts,
159                }
160
161        if not self.local_seer_image or not self.local_seer_software or \
162                not self.local_seer_start:
163            if 'local_seer_control' in self.exports:
164                del self.exports['local_seer_control']
165
166        if not self.local_seer_image or not self.local_seer_software or \
167                not self.seer_master_start:
168            if 'seer_master' in self.exports:
169                del self.exports['seer_master']
170
171        self.RenewSlices()
172
173        self.soap_services = {\
174            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
175            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
176            'StartSegment': soap_handler("StartSegment", self.StartSegment),
177            'TerminateSegment': soap_handler("TerminateSegment", 
178                self.TerminateSegment),
179            }
180        self.xmlrpc_services =  {\
181            'RequestAccess': xmlrpc_handler('RequestAccess',
182                self.RequestAccess),
183            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
184                self.ReleaseAccess),
185            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
186            'TerminateSegment': xmlrpc_handler('TerminateSegment',
187                self.TerminateSegment),
188            }
189
190    @staticmethod
191    def make_access_info(s):
192        """
193        Split a string of the form (id, id, id, id) ito its constituent tuples
194        and return them as a tuple.  Use to import access info from the
195        access_db.
196        """
197
198        ss = s.strip()
199        if ss.startswith('(') and ss.endswith(')'):
200            l = [ s.strip() for s  in ss[1:-1].split(",")]
201            if len(l) == 4:
202                return tuple(l)
203            else:
204                raise self.parse_error(
205                        "Exactly 4 elements in access info required")
206        else:
207            raise self.parse_error("Expecting parenthezied values")
208
209
210    def get_handler(self, path, fid):
211        self.log.info("Get handler %s %s" % (path, fid))
212        if self.auth.check_attribute(fid, path) and self.userconfdir:
213            return ("%s/%s" % (self.userconfdir, path), "application/binary")
214        else:
215            return (None, None)
216
217    def build_access_response(self, alloc_id, services, proof):
218        """
219        Create the SOAP response.
220
221        Build the dictionary description of the response and use
222        fedd_utils.pack_soap to create the soap message.  ap is the allocate
223        project message returned from a remote project allocation (even if that
224        allocation was done locally).
225        """
226        # Because alloc_id is already a fedd_services_types.IDType_Holder,
227        # there's no need to repack it
228        msg = { 
229                'allocID': alloc_id,
230                'fedAttr': [
231                    { 'attribute': 'domain', 'value': self.domain } , 
232                ],
233                'proof': proof.to_dict()
234            }
235        if self.dragon_endpoint:
236            msg['fedAttr'].append({'attribute': 'dragon',
237                'value': self.dragon_endpoint})
238        if self.deter_internal:
239            msg['fedAttr'].append({'attribute': 'deter_internal',
240                'value': self.deter_internal})
241        #XXX: ??
242        if self.dragon_vlans:
243            msg['fedAttr'].append({'attribute': 'vlans',
244                'value': self.dragon_vlans})
245
246        if services:
247            msg['service'] = services
248        return msg
249
250    def RequestAccess(self, req, fid):
251        """
252        Handle the access request.
253        """
254
255        # The dance to get into the request body
256        if req.has_key('RequestAccessRequestBody'):
257            req = req['RequestAccessRequestBody']
258        else:
259            raise service_error(service_error.req, "No request!?")
260
261        if req.has_key('destinationTestbed'):
262            dt = unpack_id(req['destinationTestbed'])
263
264        # Request for this fedd
265        found, match, owners, proof = self.lookup_access(req, fid)
266        services, svc_state = self.export_services(req.get('service',[]),
267                None, None)
268        # keep track of what's been added
269        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
270        aid = unicode(allocID)
271
272        self.state_lock.acquire()
273        self.allocation[aid] = { }
274        # The protoGENI certificate
275        self.allocation[aid]['credentials'] = found
276        # The list of owner FIDs
277        self.allocation[aid]['owners'] = owners
278        self.allocation[aid]['auth'] = set()
279        self.append_allocation_authorization(aid, 
280                ((fid, allocID), (allocID, allocID)), state_attr='allocation')
281        self.write_state()
282        self.state_lock.release()
283
284        try:
285            f = open("%s/%s.pem" % (self.certdir, aid), "w")
286            print >>f, alloc_cert
287            f.close()
288        except EnvironmentError, e:
289            raise service_error(service_error.internal, 
290                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
291        return self.build_access_response({ 'fedid': allocID }, None, proof)
292
293
294    def ReleaseAccess(self, req, fid):
295        # The dance to get into the request body
296        if req.has_key('ReleaseAccessRequestBody'):
297            req = req['ReleaseAccessRequestBody']
298        else:
299            raise service_error(service_error.req, "No request!?")
300
301        # Local request
302        try:
303            if req['allocID'].has_key('localname'):
304                auth_attr = aid = req['allocID']['localname']
305            elif req['allocID'].has_key('fedid'):
306                aid = unicode(req['allocID']['fedid'])
307                auth_attr = req['allocID']['fedid']
308            else:
309                raise service_error(service_error.req,
310                        "Only localnames and fedids are understood")
311        except KeyError:
312            raise service_error(service_error.req, "Badly formed request")
313
314        self.log.debug("[access] deallocation requested for %s", aid)
315        access_ok , proof = self.auth.check_attribute(fid, auth_attr,
316                with_proof=True)
317        if not access_ok:
318            self.log.debug("[access] deallocation denied for %s", aid)
319            raise service_error(service_error.access, "Access Denied")
320
321        self.state_lock.acquire()
322        if self.allocation.has_key(aid):
323            self.log.debug("Found allocation for %s" %aid)
324            self.clear_allocation_authorization(aid, state_attr='allocation')
325            del self.allocation[aid]
326            self.write_state()
327            self.state_lock.release()
328            # And remove the access cert
329            cf = "%s/%s.pem" % (self.certdir, aid)
330            self.log.debug("Removing %s" % cf)
331            os.remove(cf)
332            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
333        else:
334            self.state_lock.release()
335            raise service_error(service_error.req, "No such allocation")
336
337    def manifest_to_dict(self, manifest, ignore_debug=False):
338        """
339        Turn the manifest into a dict were each virtual nodename (i.e. the
340        topdl name) has an entry with the allocated machine in hostname and the
341        interfaces in 'interfaces'.  I love having XML parser code lying
342        around.
343        """
344        if self.create_debug and not ignore_debug: 
345            self.log.debug("Returning null manifest dict")
346            return { }
347
348        # The class allows us to keep a little state - the dict under
349        # consteruction and the current entry in that dict for the interface
350        # element code.
351        class manifest_parser:
352            def __init__(self):
353                self.d = { }
354                self.current_key=None
355
356            # If the element is a node, create a dict entry for it.  If it's an
357            # interface inside a node, add an entry in the interfaces list with
358            # the virtual name and component id.
359            def start_element(self, name, attrs):
360                if name == 'node':
361                    self.current_key = attrs.get('virtual_id',"")
362                    if self.current_key:
363                        self.d[self.current_key] = {
364                                'hostname': attrs.get('hostname', None),
365                                'interfaces': { },
366                                'mac': { }
367                                }
368                elif name == 'interface' and self.current_key:
369                    self.d[self.current_key]['interfaces']\
370                            [attrs.get('virtual_id','')] = \
371                            attrs.get('component_id', None)
372                elif name == 'interface_ref':
373                    # Collect mac address information from an interface_ref.
374                    # These appear after the node info has been parsed.
375                    nid = attrs.get('virtual_node_id', None)
376                    ifid = attrs.get('virtual_interface_id', None)
377                    mac = attrs.get('MAC', None)
378                    self.d[nid]['mac'][ifid] = mac
379            #  When a node is finished, clear current_key
380            def end_element(self, name):
381                if name == 'node': self.current_key = None
382
383        node = { }
384
385        mp = manifest_parser()
386        p = xml.parsers.expat.ParserCreate()
387        # These are bound to the class we just created
388        p.StartElementHandler = mp.start_element
389        p.EndElementHandler = mp.end_element
390
391        p.Parse(manifest)
392        # Make the node dict that the callers expect
393        for k in mp.d:
394            node[k] = mp.d.get('hostname', '')
395        return mp.d
396
397    def fake_manifest(self, topo):
398        """
399        Fake the output of manifest_to_dict with a bunch of generic node an
400        interface names, for debugging.
401        """
402        node = { }
403        for i, e in enumerate([ e for e in topo.elements \
404                if isinstance(e, topdl.Computer)]):
405            node[e.name] = { 
406                    'hostname': "node%03d" % i,
407                    'interfaces': { }
408                    }
409            for j, inf in enumerate(e.interface):
410                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
411
412        return node
413
414
415    def generate_portal_configs(self, topo, pubkey_base, 
416            secretkey_base, tmpdir, leid, connInfo, services, nodes):
417
418        def conninfo_to_dict(key, info):
419            """
420            Make a cpoy of the connection information about key, and flatten it
421            into a single dict by parsing out any feddAttrs.
422            """
423
424            rv = None
425            for i in info:
426                if key == i.get('portal', "") or \
427                        key in [e.get('element', "") \
428                        for e in i.get('member', [])]:
429                    rv = i.copy()
430                    break
431
432            else:
433                return rv
434
435            if 'fedAttr' in rv:
436                for a in rv['fedAttr']:
437                    attr = a.get('attribute', "")
438                    val = a.get('value', "")
439                    if attr and attr not in rv:
440                        rv[attr] = val
441                del rv['fedAttr']
442            return rv
443
444        # XXX: un hardcode this
445        def client_null(f, s):
446            print >>f, "Service: %s" % s['name']
447       
448        def client_seer_master(f, s):
449            print >>f, 'PortalAlias: seer-master'
450
451        def client_smb(f, s):
452            print >>f, "Service: %s" % s['name']
453            smbshare = None
454            smbuser = None
455            smbproj = None
456            for a in s.get('fedAttr', []):
457                if a.get('attribute', '') == 'SMBSHARE':
458                    smbshare = a.get('value', None)
459                elif a.get('attribute', '') == 'SMBUSER':
460                    smbuser = a.get('value', None)
461                elif a.get('attribute', '') == 'SMBPROJ':
462                    smbproj = a.get('value', None)
463
464            if all((smbshare, smbuser, smbproj)):
465                print >>f, "SMBshare: %s" % smbshare
466                print >>f, "ProjectUser: %s" % smbuser
467                print >>f, "ProjectName: %s" % smbproj
468
469        def client_hide_hosts(f, s):
470            for a in s.get('fedAttr', [ ]):
471                if a.get('attribute', "") == 'hosts':
472                    print >>f, 'Hide: %s' % a.get('value', "")
473
474        client_service_out = {
475                'SMB': client_smb,
476                'tmcd': client_null,
477                'seer': client_null,
478                'userconfig': client_null,
479                'project_export': client_null,
480                'seer_master': client_seer_master,
481                'hide_hosts': client_hide_hosts,
482            }
483
484        def client_seer_master_export(f, s):
485            print >>f, "AddedNode: seer-master"
486
487        def client_seer_local_export(f, s):
488            print >>f, "AddedNode: control"
489
490        client_export_service_out = {
491                'seer_master': client_seer_master_export,
492                'local_seer_control': client_seer_local_export,
493            }
494
495        def server_port(f, s):
496            p = urlparse(s.get('server', 'http://localhost'))
497            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
498
499        def server_null(f,s): pass
500
501        def server_seer(f, s):
502            print >>f, 'seer: true'
503
504        server_service_out = {
505                'SMB': server_port,
506                'tmcd': server_port,
507                'userconfig': server_null,
508                'project_export': server_null,
509                'seer': server_seer,
510                'seer_master': server_port,
511                'hide_hosts': server_null,
512            }
513        # XXX: end un hardcode this
514
515
516        seer_out = False
517        client_out = False
518        control_gw = None
519        for e in [ e for e in topo.elements \
520                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
521            myname = e.name
522            type = e.get_attribute('portal_type')
523
524            info = conninfo_to_dict(myname, connInfo)
525
526            if not info:
527                raise service_error(service_error.req,
528                        "No connectivity info for %s" % myname)
529
530            # Translate to physical name (ProtoGENI doesn't have DNS)
531            physname = nodes.get(myname, { }).get('hostname', None)
532            peer = info.get('peer', "")
533            ldomain = self.domain
534            ssh_port = info.get('ssh_port', 22)
535
536            # Collect this for the client.conf file
537            if 'masterexperiment' in info:
538                mproj, meid = info['masterexperiment'].split("/", 1)
539
540            active = info.get('active', 'False')
541
542            if type in ('control', 'both'):
543                testbed = e.get_attribute('testbed')
544                control_gw = myname
545
546            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
547            tunnelconfig = self.tunnel_config
548            try:
549                f = open(cfn, "w")
550                if active == 'True':
551                    print >>f, "active: True"
552                    print >>f, "ssh_port: %s" % ssh_port
553                    if type in ('control', 'both'):
554                        for s in [s for s in services \
555                                if s.get('name', "") in self.imports]:
556                            server_service_out[s['name']](f, s)
557
558                if tunnelconfig:
559                    print >>f, "tunnelip: %s" % tunnelconfig
560                print >>f, "peer: %s" % peer.lower()
561                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
562                        pubkey_base
563                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
564                        secretkey_base
565                f.close()
566            except EnvironmentError, e:
567                raise service_error(service_error.internal,
568                        "Can't write protal config %s: %s" % (cfn, e))
569
570        # Done with portals, write the client config file.
571        try:
572            f = open("%s/client.conf" % tmpdir, "w")
573            if control_gw:
574                print >>f, "ControlGateway: %s" % physname.lower()
575            for s in services:
576                if s.get('name',"") in self.imports and \
577                        s.get('visibility','') == 'import':
578                    client_service_out[s['name']](f, s)
579                if s.get('name', '') in self.exports and \
580                        s.get('visibility', '') == 'export' and \
581                        s['name'] in client_export_service_out:
582                    client_export_service_out[s['name']](f, s)
583            # Seer uses this.
584            if mproj and meid:
585                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
586            f.close()
587        except EnvironmentError, e:
588            raise service_error(service_error.internal,
589                    "Cannot write client.conf: %s" %s)
590
591
592
593    def export_store_info(self, cf, nodes, ssh_port, connInfo):
594        """
595        For the export requests in the connection info, install the peer names
596        at the experiment controller via SetValue calls.
597        """
598
599        for c in connInfo:
600            for p in [ p for p in c.get('parameter', []) \
601                    if p.get('type', '') == 'output']:
602
603                if p.get('name', '') == 'peer':
604                    k = p.get('key', None)
605                    surl = p.get('store', None)
606                    if surl and k and k.index('/') != -1:
607                        if self.create_debug:
608                            req = { 'name': k, 'value': 'debug' }
609                            self.call_SetValue(surl, req, cf)
610                        else:
611                            n = nodes.get(k[k.index('/')+1:], { })
612                            value = n.get('hostname', None)
613                            if value:
614                                req = { 'name': k, 'value': value }
615                                self.call_SetValue(surl, req, cf)
616                            else:
617                                self.log.error("No hostname for %s" % \
618                                        k[k.index('/'):])
619                    else:
620                        self.log.error("Bad export request: %s" % p)
621                elif p.get('name', '') == 'ssh_port':
622                    k = p.get('key', None)
623                    surl = p.get('store', None)
624                    if surl and k:
625                        req = { 'name': k, 'value': ssh_port }
626                        self.call_SetValue(surl, req, cf)
627                    else:
628                        self.log.error("Bad export request: %s" % p)
629                else:
630
631                    self.log.error("Unknown export parameter: %s" % \
632                            p.get('name'))
633                    continue
634
635    def write_node_config_script(self, elem, node, user, pubkey,
636            secretkey, stagingdir, tmpdir):
637        """
638        Write out the configuration script that is to run on the node
639        represented by elem in the topology.  This is called
640        once per node to configure.
641        """
642        # These little functions/functors just make things more readable.  Each
643        # one encapsulates a small task of copying software files or installing
644        # them.
645        class stage_file_type:
646            """
647            Write code copying file sfrom the staging host to the host on which
648            this will run.
649            """
650            def __init__(self, user, host, stagingdir):
651                self.user = user
652                self.host = host
653                self.stagingdir = stagingdir
654                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
655                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
656
657            def __call__(self, script, file, dest="."):
658                # If the file is a full pathname, do not use stagingdir
659                if file.find('/') == -1:
660                    file = "%s/%s" % (self.stagingdir, file)
661                print >>script, "%s %s@%s:%s %s" % \
662                        (self.scp, self.user, self.host, file, dest)
663
664        def install_tar(script, loc, base):
665            """
666            Print code to script to install a tarfile in loc.
667            """
668            tar = "/bin/tar"
669            mkdir="/bin/mkdir"
670
671            print >>script, "%s -p %s" % (mkdir, loc)
672            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
673
674        def install_rpm(script, base):
675            """
676            Print code to script to install an rpm
677            """
678            rpm = "/bin/rpm"
679            print >>script, "%s --install %s" % (rpm, base)
680
681        ifconfig = "/sbin/ifconfig"
682        findif = '/usr/local/etc/emulab/findif'
683        stage_file = stage_file_type(user, self.staging_host, stagingdir)
684        pname = node.get('hostname', None)
685        fed_dir = "/usr/local/federation"
686        fed_etc_dir = "%s/etc" % fed_dir
687        fed_bin_dir = "%s/bin" % fed_dir
688        fed_lib_dir = "%s/lib" % fed_dir
689
690        if pname:
691            sfile = "%s/%s.startup" % (tmpdir, pname)
692            script = open(sfile, "w")
693            # Reset the interfaces to the ones in the topo file
694            for i in [ i for i in elem.interface \
695                    if not i.get_attribute('portal')]:
696                if 'interfaces' in node:
697                    pinf = node['interfaces'].get(i.name, None)
698                else:
699                    pinf = None
700
701                if 'mac' in node:
702                    pmac = node['mac'].get(i.name, None)
703                else:
704                    pmac = None
705                addr = i.get_attribute('ip4_address') 
706                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
707                # The interface names in manifests are not to be trusted, so we
708                # find the interface to configure using the local node's script
709                # to match mac address to interface name.
710                if pinf and addr and pmac:
711                    print >>script, '# %s' % pinf
712                    print >>script, \
713                            "%s `%s %s` %s netmask %s"  % \
714                            (ifconfig, findif, pmac, addr, netmask)
715                else:
716                    self.log.error("Missing interface or address for %s" \
717                            % i.name)
718               
719            for l, f in self.federation_software:
720                base = os.path.basename(f)
721                stage_file(script, base)
722                if l: install_tar(script, l, base)
723                else: install_rpm(script, base)
724
725            for s in elem.software:
726                s_base = s.location.rpartition('/')[2]
727                stage_file(script, s_base)
728                if s.install: install_tar(script, s.install, s_base)
729                else: install_rpm(script, s_base)
730
731            for f in ('hosts', pubkey, secretkey, 'client.conf', 
732                    'userconf'):
733                stage_file(script, f, fed_etc_dir)
734            if self.sshd:
735                stage_file(script, self.sshd, fed_bin_dir)
736            if self.sshd_config:
737                stage_file(script, self.sshd_config, fed_etc_dir)
738
739            # Look in tmpdir to get the names.  They've all been copied
740            # into the (remote) staging dir
741            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
742                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
743
744            # Done with staging, remove the identity used to stage
745            print >>script, "#/bin/rm .ssh/id_rsa"
746
747            # Start commands
748            if elem.get_attribute('portal') and self.portal_startcommand:
749                # Install portal software
750                for l, f in self.portal_software:
751                    base = os.path.basename(f)
752                    stage_file(script, base)
753                    if l: install_tar(script, l, base)
754                    else: install_rpm(script, base)
755
756                # Portals never have a user-specified start command
757                print >>script, self.portal_startcommand
758            elif self.node_startcommand:
759                # XXX: debug
760                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
761                # XXX: debug
762                if elem.get_attribute('startup'):
763                    print >>script, "%s \\$USER '%s'" % \
764                            (self.node_startcommand,
765                                    elem.get_attribute('startup'))
766                else:
767                    print >>script, self.node_startcommand
768            script.close()
769            return sfile, pname
770        else:
771            return None, None
772
773
774    def configure_nodes(self, segment_commands, topo, nodes, user, 
775            pubkey, secretkey, stagingdir, tmpdir):
776        """
777        For each node in the topology, generate a script file that copies
778        software onto it and installs it in the proper places and then runs the
779        startup command (including the federation commands.
780        """
781
782
783
784        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
785            vname = e.name
786            sfile, pname = self.write_node_config_script(e,
787                    nodes.get(vname, { }),
788                    user, pubkey, secretkey, stagingdir, tmpdir)
789            if sfile:
790                if not segment_commands.scp_file(sfile, user, pname):
791                    self.log.error("Could not copy script to %s" % pname)
792            else:
793                self.log.error("Unmapped node: %s" % vname)
794
795    def start_node(self, user, host, node, segment_commands):
796        """
797        Copy an identity to a node for the configuration script to be able to
798        import data and then run the startup script remotely.
799        """
800        # Place an identity on the node so that the copying can succeed
801        segment_commands.scp_file( segment_commands.ssh_privkey_file,
802                user, node, ".ssh/id_rsa")
803        segment_commands.ssh_cmd(user, node, 
804                "sudo /bin/sh ./%s.startup &" % node)
805
806    def start_nodes(self, user, host, nodes, segment_commands):
807        """
808        Start a thread to initialize each node and wait for them to complete.
809        Each thread runs start_node.
810        """
811        threads = [ ]
812        for n in nodes:
813            t = Thread(target=self.start_node, args=(user, host, n, 
814                segment_commands))
815            t.start()
816            threads.append(t)
817
818        done = [not t.isAlive() for t in threads]
819        while not all(done):
820            self.log.info("Waiting for threads %s" % done)
821            time.sleep(10)
822            done = [not t.isAlive() for t in threads]
823
824    def set_up_staging_filespace(self, segment_commands, user, host,
825            stagingdir):
826        """
827        Set up teh staging area on the staging machine.  To reduce the number
828        of ssh commands, we compose a script and execute it remotely.
829        """
830
831        self.log.info("[start_segment]: creating script file")
832        try:
833            sf, scriptname = tempfile.mkstemp()
834            scriptfile = os.fdopen(sf, 'w')
835        except EnvironmentError:
836            return False
837
838        scriptbase = os.path.basename(scriptname)
839
840        # Script the filesystem changes
841        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
842        print >>scriptfile, 'mkdir -p %s' % stagingdir
843        print >>scriptfile, "rm -f %s" % scriptbase
844        scriptfile.close()
845
846        # Move the script to the remote machine
847        # XXX: could collide tempfile names on the remote host
848        if segment_commands.scp_file(scriptname, user, host, scriptbase):
849            os.remove(scriptname)
850        else:
851            return False
852
853        # Execute the script (and the script's last line deletes it)
854        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
855            return False
856
857    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
858        """
859        Protogeni interactions take a context and a protogeni certificate.
860        This establishes both for later calls and returns them.
861        """
862        if os.access(certfile, os.R_OK):
863            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
864        else:
865            self.log.error("[start_segment]: Cannot read certfile: %s" % \
866                    certfile)
867            return None, None
868
869        try:
870            gcred = segment_commands.slice_authority_call('GetCredential', 
871                    {}, ctxt)
872        except segment_commands.ProtoGENIError, e:
873            raise service_error(service_error.federant,
874                    "ProtoGENI: %s" % e)
875
876        return ctxt, gcred
877
878    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
879        """
880        Find a usable slice name by trying random ones until there's no
881        collision.
882        """
883
884        def random_slicename(user):
885            """
886            Return a random slicename by appending 5 letters to the username.
887            """
888            slicename = user
889            for i in range(0,5):
890                slicename += random.choice(string.ascii_letters)
891            return slicename
892
893        while True:
894            slicename = random_slicename(user)
895            try:
896                param = {
897                        'credential': gcred, 
898                        'hrn': slicename,
899                        'type': 'Slice'
900                        }
901
902                if not self.create_debug:
903                    segment_commands.slice_authority_call('Resolve', param, 
904                            ctxt)
905                else:
906                    raise segment_commands.ProtoGENIError(0,0,'Debug')
907            except segment_commands.ProtoGENIError, e:
908                print e
909                break
910
911        return slicename
912
913    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
914        """
915        Create the slice and allocate resources.  If any of this stuff fails,
916        the allocations will time out on PG in short order, so we just raise
917        the service_error.  Return the slice and sliver credentials as well as
918        the manifest.
919        """
920        try:
921            param = {
922                    'credential': gcred, 
923                    'hrn': slicename,
924                    'type': 'Slice'
925                    }
926            slice_cred = segment_commands.slice_authority_call('Register',
927                    param, ctxt)
928            # Resolve the slice to get the URN that PG has assigned it.
929            param = {
930                    'credential': gcred,
931                    'type': 'Slice',
932                    'hrn': slicename
933                    }
934            data = segment_commands.slice_authority_call('Resolve', param,
935                    ctxt)
936            if 'urn' in data:
937                slice_urn = data['urn']
938            else:
939                raise service_error(service_error.federant, 
940                        "No URN returned for slice %s" % slicename)
941
942            if 'creator_urn' in data:
943                creator_urn = data['creator_urn']
944            else:
945                raise service_error(service_error.federant, 
946                        "No creator URN returned for slice %s" % slicename)
947            # Populate the ssh keys (let PG format them)
948            param = {
949                    'credential': gcred, 
950                    }
951            keys =  segment_commands.slice_authority_call('GetKeys', param, 
952                    ctxt)
953            # Create a Sliver
954            param = {
955                    'credentials': [ slice_cred ],
956                    'rspec': rspec, 
957                    'users': [ {
958                        'urn': creator_urn,
959                        'keys': keys, 
960                        },
961                    ],
962                    'slice_urn': slice_urn,
963                    }
964            rv = segment_commands.component_manager_call(
965                    'CreateSliver', param, ctxt)
966
967            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
968            # API hands back a sliver credential.  This just finds the format
969            # of choice.
970            if isinstance(rv, tuple):
971                manifest = rv[1]
972            else:
973                manifest = rv
974        except segment_commands.ProtoGENIError, e:
975            raise service_error(service_error.federant,
976                    "ProtoGENI: %s %s" % (e.code, e))
977
978        return (slice_urn, slice_cred, manifest, rspec)
979
980    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
981            timeout=None):
982        """
983        Wait for the given slice to finish its startup.  Return the final
984        status.
985        """
986        completed_states = ('failed', 'ready')
987        status = 'changing'
988        if timeout is not None:
989            end = time.time() + timeout
990        try:
991            while status not in completed_states:
992                param = { 
993                        'credentials': [ slice_cred ],
994                        'slice_urn': slice_urn,
995                        }
996                r = segment_commands.component_manager_call(
997                        'SliverStatus', param, ctxt)
998                # GENIAPI uses geni_status as the key, so check for both
999                status = r.get('status', r.get('geni_status','changing'))
1000                if status not in completed_states:
1001                    if timeout is not None and time.time() > end:
1002                        return 'timeout'
1003                    time.sleep(30)
1004        except segment_commands.ProtoGENIError, e:
1005            raise service_error(service_error.federant,
1006                    "ProtoGENI: %s %s" % (e.code, e))
1007
1008        return status
1009
1010    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
1011        """
1012        Delete the slice resources.  An error from the service is ignores,
1013        because the soft state will go away anyway.
1014        """
1015        try:
1016            param = { 
1017                    'credentials': [ slice_cred, ],
1018                    'slice_urn': slice_urn,
1019                    }
1020            segment_commands.component_manager_call('DeleteSlice',
1021                    param, ctxt)
1022        except segment_commands.ProtoGENIError, e:
1023            self.log.warn("ProtoGENI: %s" % e)
1024
1025
1026
1027    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1028            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1029            export_certfile, topo, connInfo, services, timeout=0):
1030        """
1031        Start a sub-experiment on a federant.
1032
1033        Get the current state, modify or create as appropriate, ship data
1034        and configs and start the experiment.  There are small ordering
1035        differences based on the initial state of the sub-experiment.
1036        """
1037
1038        # Local software dir
1039        lsoftdir = "%s/software" % tmpdir
1040        host = self.staging_host
1041
1042        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1043                certfile, certpw)
1044
1045        if not ctxt: return False, {}
1046
1047        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1048        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1049        self.log.info("Creating %s" % slicename)
1050        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1051                segment_commands, slicename, rspec, gcred, ctxt)
1052
1053        # With manifest in hand, we can export the portal node names.
1054        if self.create_debug: nodes = self.fake_manifest(topo)
1055        else: nodes = self.manifest_to_dict(manifest)
1056
1057        self.export_store_info(export_certfile, nodes, self.ssh_port,
1058                connInfo)
1059        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1060                ename, connInfo, services, nodes)
1061
1062        # Copy software to the staging machine (done after generation to copy
1063        # those, too)
1064        for d in (tmpdir, lsoftdir):
1065            if os.path.isdir(d):
1066                for f in os.listdir(d):
1067                    if not os.path.isdir("%s/%s" % (d, f)):
1068                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1069                                user, host, "%s/%s" % (stagingdir, f)):
1070                            self.log.error("Scp failed")
1071                            return False, {}
1072
1073
1074        # Now we wait for the nodes to start on PG
1075        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1076                ctxt, timeout=300)
1077        if status == 'failed':
1078            self.log.error('Sliver failed to start on ProtoGENI')
1079            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1080            return False, {}
1081        elif status == 'timeout':
1082            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1083            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1084            return False, {}
1085        else:
1086            # All good: save ProtoGENI info in shared state
1087            self.state_lock.acquire()
1088            self.allocation[aid]['slice_urn'] = slice_urn
1089            self.allocation[aid]['slice_name'] = slicename
1090            self.allocation[aid]['slice_credential'] = slice_cred
1091            self.allocation[aid]['manifest'] = manifest
1092            self.allocation[aid]['rspec'] = rspec
1093            self.allocation[aid]['certfile'] = certfile
1094            self.allocation[aid]['certpw'] = certpw
1095            self.write_state()
1096            self.state_lock.release()
1097
1098        # Now we have configuration to do for ProtoGENI
1099        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1100                secretkey, stagingdir, tmpdir)
1101
1102        self.start_nodes(user, self.staging_host, 
1103                [ n.get('hostname', None) for n in nodes.values()],
1104                segment_commands)
1105
1106        # Everything has gone OK.
1107        return True, dict([(k, n.get('hostname', None)) \
1108                for k, n in nodes.items()])
1109
1110    def generate_rspec(self, topo, softdir, connInfo):
1111
1112        # Force a useful image.  Without picking this the nodes can get
1113        # different images and there is great pain.
1114        def image_filter(e):
1115            if isinstance(e, topdl.Computer):
1116                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1117                        'image+emulab-ops//FEDORA10-STD" />'
1118            else:
1119                return ""
1120        # Main line of generate
1121        t = topo.clone()
1122
1123        starts = { }
1124        # Weed out the things we aren't going to instantiate: Segments, portal
1125        # substrates, and portal interfaces.  (The copy in the for loop allows
1126        # us to delete from e.elements in side the for loop).  While we're
1127        # touching all the elements, we also adjust paths from the original
1128        # testbed to local testbed paths and put the federation commands and
1129        # startcommands into a dict so we can start them manually later.
1130        # ProtoGENI requires setup before the federation commands run, so we
1131        # run them by hand after we've seeded configurations.
1132        for e in [e for e in t.elements]:
1133            if isinstance(e, topdl.Segment):
1134                t.elements.remove(e)
1135            # Fix software paths
1136            for s in getattr(e, 'software', []):
1137                s.location = re.sub("^.*/", softdir, s.location)
1138            if isinstance(e, topdl.Computer):
1139                if e.get_attribute('portal') and self.portal_startcommand:
1140                    # Portals never have a user-specified start command
1141                    starts[e.name] = self.portal_startcommand
1142                elif self.node_startcommand:
1143                    if e.get_attribute('startup'):
1144                        starts[e.name] = "%s \\$USER '%s'" % \
1145                                (self.node_startcommand, 
1146                                        e.get_attribute('startup'))
1147                        e.remove_attribute('startup')
1148                    else:
1149                        starts[e.name] = self.node_startcommand
1150
1151                # Remove portal interfaces
1152                e.interface = [i for i in e.interface \
1153                        if not i.get_attribute('portal')]
1154
1155        t.substrates = [ s.clone() for s in t.substrates ]
1156        t.incorporate_elements()
1157
1158        # Customize the rspec output to use the image we like
1159        filters = [ image_filter ]
1160
1161        # Convert to rspec and return it
1162        exp_rspec = topdl.topology_to_rspec(t, filters)
1163
1164        return exp_rspec
1165
1166    def retrieve_software(self, topo, certfile, softdir):
1167        """
1168        Collect the software that nodes in the topology need loaded and stage
1169        it locally.  This implies retrieving it from the experiment_controller
1170        and placing it into softdir.  Certfile is used to prove that this node
1171        has access to that data (it's the allocation/segment fedid).  Finally
1172        local portal and federation software is also copied to the same staging
1173        directory for simplicity - all software needed for experiment creation
1174        is in softdir.
1175        """
1176        sw = set()
1177        for e in topo.elements:
1178            for s in getattr(e, 'software', []):
1179                sw.add(s.location)
1180        os.mkdir(softdir)
1181        for s in sw:
1182            self.log.debug("Retrieving %s" % s)
1183            try:
1184                get_url(s, certfile, softdir)
1185            except:
1186                t, v, st = sys.exc_info()
1187                raise service_error(service_error.internal,
1188                        "Error retrieving %s: %s" % (s, v))
1189
1190        # Copy local portal node software to the tempdir
1191        for s in (self.portal_software, self.federation_software):
1192            for l, f in s:
1193                base = os.path.basename(f)
1194                copy_file(f, "%s/%s" % (softdir, base))
1195
1196
1197    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1198        """
1199        Gather common configuration files, retrieve or create an experiment
1200        name and project name, and return the ssh_key filenames.  Create an
1201        allocation log bound to the state log variable as well.
1202        """
1203        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1204        ename = None
1205        pubkey_base = None
1206        secretkey_base = None
1207        alloc_log = None
1208
1209        for a in attrs:
1210            if a['attribute'] in configs:
1211                try:
1212                    self.log.debug("Retrieving %s" % a['value'])
1213                    get_url(a['value'], certfile, tmpdir)
1214                except:
1215                    t, v, st = sys.exc_info()
1216                    raise service_error(service_error.internal,
1217                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1218            if a['attribute'] == 'ssh_pubkey':
1219                pubkey_base = a['value'].rpartition('/')[2]
1220            if a['attribute'] == 'ssh_secretkey':
1221                secretkey_base = a['value'].rpartition('/')[2]
1222            if a['attribute'] == 'experiment_name':
1223                ename = a['value']
1224
1225        if not ename:
1226            ename = ""
1227            for i in range(0,5):
1228                ename += random.choice(string.ascii_letters)
1229            self.log.warn("No experiment name: picked one randomly: %s" \
1230                    % ename)
1231
1232        self.state_lock.acquire()
1233        if self.allocation.has_key(aid):
1234            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1235            self.allocation[aid]['experiment'] = ename
1236            self.allocation[aid]['log'] = [ ]
1237            # Create a logger that logs to the experiment's state object as
1238            # well as to the main log file.
1239            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1240            h = logging.StreamHandler(
1241                    list_log.list_log(self.allocation[aid]['log']))
1242            # XXX: there should be a global one of these rather than
1243            # repeating the code.
1244            h.setFormatter(logging.Formatter(
1245                "%(asctime)s %(name)s %(message)s",
1246                        '%d %b %y %H:%M:%S'))
1247            alloc_log.addHandler(h)
1248            self.write_state()
1249        else:
1250            self.log.error("No allocation for %s!?" % aid)
1251        self.state_lock.release()
1252
1253        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1254                cpw, alloc_log)
1255
1256    def finalize_experiment(self, topo, nodes, aid, alloc_id, proof):
1257        # Copy the assigned names into the return topology
1258        rvtopo = topo.clone()
1259        embedding = [ ]
1260        for k, n in nodes.items():
1261            embedding.append({ 
1262                'toponame': k,
1263                'physname': [n ],
1264                })
1265        # Grab the log (this is some anal locking, but better safe than
1266        # sorry)
1267        self.state_lock.acquire()
1268        logv = "".join(self.allocation[aid]['log'])
1269        # It's possible that the StartSegment call gets retried (!).
1270        # if the 'started' key is in the allocation, we'll return it rather
1271        # than redo the setup.
1272        self.allocation[aid]['started'] = { 
1273                'allocID': alloc_id,
1274                'allocationLog': logv,
1275                'segmentdescription': { 
1276                    'topdldescription': rvtopo.to_dict() },
1277                'embedding': embedding,
1278                'proof': proof.to_dict(),
1279                }
1280        retval = copy.deepcopy(self.allocation[aid]['started'])
1281        self.write_state()
1282        self.state_lock.release()
1283
1284        return retval
1285
1286    def StartSegment(self, req, fid):
1287        err = None  # Any service_error generated after tmpdir is created
1288        rv = None   # Return value from segment creation
1289
1290        try:
1291            req = req['StartSegmentRequestBody']
1292            topref = req['segmentdescription']['topdldescription']
1293        except KeyError:
1294            raise service_error(service_error.req, "Badly formed request")
1295
1296        connInfo = req.get('connection', [])
1297        services = req.get('service', [])
1298        auth_attr = req['allocID']['fedid']
1299        aid = "%s" % auth_attr
1300        attrs = req.get('fedAttr', [])
1301        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1302                with_proof=True)
1303        if not access_ok:
1304            raise service_error(service_error.access, "Access denied")
1305        else:
1306            # See if this is a replay of an earlier succeeded StartSegment -
1307            # sometimes SSL kills 'em.  If so, replay the response rather than
1308            # redoing the allocation.
1309            self.state_lock.acquire()
1310            retval = self.allocation[aid].get('started', None)
1311            self.state_lock.release()
1312            if retval:
1313                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1314                        "replaying response")
1315                return retval
1316
1317        if topref:
1318            topo = topdl.Topology(**topref)
1319        else:
1320            raise service_error(service_error.req, 
1321                    "Request missing segmentdescription'")
1322
1323        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1324        try:
1325            tmpdir = tempfile.mkdtemp(prefix="access-")
1326            softdir = "%s/software" % tmpdir
1327        except EnvironmentError:
1328            raise service_error(service_error.internal, "Cannot create tmp dir")
1329
1330        # Try block alllows us to clean up temporary files.
1331        try:
1332            self.retrieve_software(topo, certfile, softdir)
1333            self.configure_userconf(services, tmpdir)
1334            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1335                cpw, alloc_log = self.initialize_experiment_info(attrs,
1336                        aid, certfile, tmpdir)
1337            self.import_store_info(certfile, connInfo)
1338            rspec = self.generate_rspec(topo, "%s/%s/" \
1339                    % (self.staging_dir, ename), connInfo)
1340
1341            segment_commands = self.api_proxy(keyfile=ssh_key,
1342                    debug=self.create_debug, log=alloc_log,
1343                    ch_url = self.ch_url, sa_url=self.sa_url,
1344                    cm_url=self.cm_url)
1345            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1346                    pubkey_base, 
1347                    secretkey_base, ename,
1348                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1349                    certfile, topo, connInfo, services)
1350        except EnvironmentError, e:
1351            err = service_error(service_error.internal, "%s" % e)
1352        except service_error, e:
1353            err = e
1354        except:
1355            t, v, st = sys.exc_info()
1356            err = service_error(service_error.internal, "%s: %s" % \
1357                    (v, traceback.extract_tb(st)))
1358
1359        # Walk up tmpdir, deleting as we go
1360        if self.cleanup: self.remove_dirs(tmpdir)
1361        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1362
1363        if rv:
1364            return self.finalize_experiment(topo, nodes, aid, req['allocID'],
1365                    proof)
1366        elif err:
1367            raise service_error(service_error.federant,
1368                    "Swapin failed: %s" % err)
1369        else:
1370            raise service_error(service_error.federant, "Swapin failed")
1371
1372    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1373            slice_urn, certfile, certpw):
1374        """
1375        Stop a sub experiment by calling swapexp on the federant
1376        """
1377        host = self.staging_host
1378        rv = False
1379        try:
1380            # Clean out tar files: we've gone over quota in the past
1381            if stagingdir:
1382                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1383            if slice_cred:
1384                self.log.error('Removing Sliver on ProtoGENI')
1385                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1386                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1387            return True
1388        except self.ssh_cmd_timeout:
1389            rv = False
1390        return rv
1391
1392    def TerminateSegment(self, req, fid):
1393        try:
1394            req = req['TerminateSegmentRequestBody']
1395        except KeyError:
1396            raise service_error(service_error.req, "Badly formed request")
1397
1398        auth_attr = req['allocID']['fedid']
1399        aid = "%s" % auth_attr
1400        attrs = req.get('fedAttr', [])
1401        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1402                with_proof=True)
1403        if not access_ok:
1404            raise service_error(service_error.access, "Access denied")
1405
1406        self.state_lock.acquire()
1407        if self.allocation.has_key(aid):
1408            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1409            slice_cred = self.allocation[aid].get('slice_credential', None)
1410            slice_urn = self.allocation[aid].get('slice_urn', None)
1411            ename = self.allocation[aid].get('experiment', None)
1412        else:
1413            cf, user, ssh_key, cpw = (None, None, None, None)
1414            slice_cred = None
1415            slice_urn = None
1416            ename = None
1417        self.state_lock.release()
1418
1419        if ename:
1420            staging = "%s/%s" % ( self.staging_dir, ename)
1421        else:
1422            self.log.warn("Can't find experiment name for %s" % aid)
1423            staging = None
1424
1425        segment_commands = self.api_proxy(keyfile=ssh_key,
1426                debug=self.create_debug, ch_url = self.ch_url,
1427                sa_url=self.sa_url, cm_url=self.cm_url)
1428        self.stop_segment(segment_commands, user, staging, slice_cred,
1429                slice_urn, cf, cpw)
1430        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1431
1432    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1433            certfile, certpw):
1434        """
1435        Linear code through the segment renewal calls.
1436        """
1437        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1438        try:
1439            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1440                    time.gmtime(time.time() + interval))
1441            cred = segment_commands.slice_authority_call('GetCredential', 
1442                    {}, ctxt)
1443
1444            param = {
1445                    'credential': scred,
1446                    'expiration': expiration
1447                    }
1448            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1449            param = {
1450                    'credential': cred,
1451                    'urn': slice_urn,
1452                    'type': 'Slice',
1453                    }
1454            new_scred = segment_commands.slice_authority_call('GetCredential',
1455                    param, ctxt)
1456
1457        except segment_commands.ProtoGENIError, e:
1458            self.log.error("Failed to extend slice %s: %s" % (name, e))
1459            return None
1460        try:
1461            param = {
1462                    'credentials': [new_scred,],
1463                    'slice_urn': slice_urn,
1464                    }
1465            r = segment_commands.component_manager_call('RenewSlice', param, 
1466                    ctxt)
1467        except segment_commands.ProtoGENIError, e:
1468            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1469
1470        return new_scred
1471   
1472
1473    def RenewSlices(self):
1474        self.log.info("Scanning for slices to renew")
1475        self.state_lock.acquire()
1476        aids = self.allocation.keys()
1477        self.state_lock.release()
1478
1479        for aid in aids:
1480            self.state_lock.acquire()
1481            if self.allocation.has_key(aid):
1482                name = self.allocation[aid].get('slice_name', None)
1483                scred = self.allocation[aid].get('slice_credential', None)
1484                slice_urn = self.allocation[aid].get('slice_urn', None)
1485                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1486            else:
1487                name = None
1488                scred = None
1489            self.state_lock.release()
1490
1491            if not os.access(cf, os.R_OK):
1492                self.log.error(
1493                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1494                continue
1495
1496            # There's a ProtoGENI slice associated with the segment; renew it.
1497            if name and scred and slice_urn:
1498                segment_commands = self.api_proxy(log=self.log, 
1499                        debug=self.create_debug, keyfile=ssh_key,
1500                        cm_url = self.cm_url, sa_url = self.sa_url,
1501                        ch_url = self.ch_url)
1502                new_scred = self.renew_segment(segment_commands, name, scred, 
1503                        slice_urn, self.renewal_interval, cf, cpw)
1504                if new_scred:
1505                    self.log.info("Slice %s renewed until %s GMT" % \
1506                            (name, time.asctime(time.gmtime(\
1507                                time.time()+self.renewal_interval))))
1508                    self.state_lock.acquire()
1509                    if self.allocation.has_key(aid):
1510                        self.allocation[aid]['slice_credential'] = new_scred
1511                        self.write_state()
1512                    self.state_lock.release()
1513                else:
1514                    self.log.info("Failed to renew slice %s " % name)
1515
1516        # Let's do this all again soon.  (4 tries before the slices time out)   
1517        t = Timer(self.renewal_interval/4, self.RenewSlices)
1518        t.start()
Note: See TracBrowser for help on using the repository browser.