source: fedd/federation/protogeni_access.py @ cd60510

axis_examplecompt_changesinfo-ops
Last change on this file since cd60510 was 35a5879, checked in by Mike Ryan <mikeryan@…>, 14 years ago

inherit from legacy_access so legacy access checking works

  • Property mode set to 100644
File size: 47.8 KB
RevLine 
[c119839]1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
[dd3e38b]6import time
[c119839]7import string
8import copy
9import pickle
10import logging
11import subprocess
[42cd8a7]12import random
[3551ae1]13import traceback
[1b6cc95]14import xml.parsers.expat
[c119839]15
[42cd8a7]16from threading import Thread, Timer, Lock
[c119839]17
18from util import *
19from fedid import fedid, generate_fedid
[3cec20c]20from authorizer import authorizer, abac_authorizer
[c119839]21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
[3551ae1]28from access import access_base
[35a5879]29from legacy_access import legacy_access
[208797c]30from protogeni_proxy import protogeni_proxy
31from geniapi_proxy import geniapi_proxy
[3551ae1]32
[c119839]33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
[35a5879]44class access(access_base, legacy_access):
[c119839]45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
[3551ae1]57        access_base.__init__(self, config, auth)
[c119839]58
59        self.domain = config.get("access", "domain")
60        self.userconfdir = config.get("access","userconfdir")
61        self.userconfcmd = config.get("access","userconfcmd")
62        self.userconfurl = config.get("access","userconfurl")
[9b3627e]63        self.federation_software = config.get("access", "federation_software")
64        self.portal_software = config.get("access", "portal_software")
[c119839]65        self.ssh_port = config.get("access","ssh_port") or "22"
66        self.sshd = config.get("access","sshd")
67        self.sshd_config = config.get("access", "sshd_config")
68        self.access_type = config.get("access", "type")
69        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
70        self.staging_host = config.get("access", "staging_host") \
71                or "ops.emulab.net"
[a65a65a]72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
[9b3627e]75   
[a65a65a]76        self.dragon_endpoint = config.get("access", "dragon")
77        self.dragon_vlans = config.get("access", "dragon_vlans")
78        self.deter_internal = config.get("access", "deter_internal")
79
80        self.tunnel_config = config.getboolean("access", "tunnel_config")
81        self.portal_command = config.get("access", "portal_command")
82        self.portal_image = config.get("access", "portal_image")
83        self.portal_type = config.get("access", "portal_type") or "pc"
84        self.portal_startcommand = config.get("access", "portal_startcommand")
85        self.node_startcommand = config.get("access", "node_startcommand")
86
[3551ae1]87        self.federation_software = self.software_list(self.federation_software)
88        self.portal_software = self.software_list(self.portal_software)
89        self.local_seer_software = self.software_list(self.local_seer_software)
[c119839]90
[310d419]91        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
92        self.renewal_interval = int(self.renewal_interval) * 60
[dd3e38b]93
[c119839]94        self.ch_url = config.get("access", "ch_url")
95        self.sa_url = config.get("access", "sa_url")
96        self.cm_url = config.get("access", "cm_url")
97
98        self.restricted = [ ]
99
[3551ae1]100        # read_state in the base_class
101        self.state_lock.acquire()
102        for a  in ('allocation', 'projects', 'keys', 'types'):
103            if a not in self.state:
104                self.state[a] = { }
105        self.allocation = self.state['allocation']
106        self.projects = self.state['projects']
107        self.keys = self.state['keys']
108        self.types = self.state['types']
109        self.state_lock.release()
[c119839]110
111
[3551ae1]112        self.log = logging.getLogger("fedd.access")
113        set_log_level(config, "access", self.log)
[c119839]114
[3cec20c]115        # authorization information
116        self.auth_type = config.get('access', 'auth_type') \
117                or 'legacy'
118        self.auth_dir = config.get('access', 'auth_dir')
119        accessdb = config.get("access", "accessdb")
120        # initialize the authorization system
121        if self.auth_type == 'legacy':
122            self.access = { }
123            if accessdb:
124                self.legacy_read_access(accessdb, self.make_access_info)
125            # Add the ownership attributes to the authorizer.  Note that the
126            # indices of the allocation dict are strings, but the attributes are
127            # fedids, so there is a conversion.
128            self.state_lock.acquire()
129            for k in self.state.get('allocation', {}).keys():
130                for o in self.state['allocation'][k].get('owners', []):
131                    self.auth.set_attribute(o, fedid(hexstr=k))
132                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
[3551ae1]133
[3cec20c]134            self.state_lock.release()
135            self.lookup_access = self.legacy_lookup_access_base
136        elif self.auth_type == 'abac':
137            self.auth = abac_authorizer(load=self.auth_dir)
138            self.access = [ ]
139            if accessdb:
140                self.read_access(accessdb, self.make_access_info)
141        else:
142            raise service_error(service_error.internal, 
143                    "Unknown auth_type: %s" % self.auth_type)
[208797c]144        api = config.get("access", "api") or "protogeni"
145        if api == "protogeni":
146            self.api_proxy = protogeni_proxy
147        elif api == "geniapi":
148            self.api_proxy = geniapi_proxy
149        else:
150            self.log.debug("Unknown interface, using protogeni")
151            self.api_proxy = protogeni_proxy
152
[c119839]153        self.call_SetValue = service_caller('SetValue')
154        self.call_GetValue = service_caller('GetValue')
[3551ae1]155        self.exports = {
156                'local_seer_control': self.export_local_seer,
157                'seer_master': self.export_seer_master,
158                'hide_hosts': self.export_hide_hosts,
159                }
160
161        if not self.local_seer_image or not self.local_seer_software or \
162                not self.local_seer_start:
163            if 'local_seer_control' in self.exports:
164                del self.exports['local_seer_control']
165
166        if not self.local_seer_image or not self.local_seer_software or \
167                not self.seer_master_start:
168            if 'seer_master' in self.exports:
169                del self.exports['seer_master']
[c119839]170
[dd3e38b]171        self.RenewSlices()
172
[c119839]173        self.soap_services = {\
174            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
175            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
176            'StartSegment': soap_handler("StartSegment", self.StartSegment),
177            'TerminateSegment': soap_handler("TerminateSegment", 
178                self.TerminateSegment),
179            }
180        self.xmlrpc_services =  {\
181            'RequestAccess': xmlrpc_handler('RequestAccess',
182                self.RequestAccess),
183            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
184                self.ReleaseAccess),
185            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
186            'TerminateSegment': xmlrpc_handler('TerminateSegment',
187                self.TerminateSegment),
188            }
189
[3551ae1]190    @staticmethod
191    def make_access_info(s):
192        """
193        Split a string of the form (id, id, id, id) ito its constituent tuples
194        and return them as a tuple.  Use to import access info from the
195        access_db.
196        """
[c119839]197
[3551ae1]198        ss = s.strip()
199        if ss.startswith('(') and ss.endswith(')'):
200            l = [ s.strip() for s  in ss[1:-1].split(",")]
201            if len(l) == 4:
202                return tuple(l)
203            else:
204                raise self.parse_error(
205                        "Exactly 4 elements in access info required")
206        else:
207            raise self.parse_error("Expecting parenthezied values")
[c119839]208
209
210    def get_handler(self, path, fid):
211        self.log.info("Get handler %s %s" % (path, fid))
212        if self.auth.check_attribute(fid, path) and self.userconfdir:
213            return ("%s/%s" % (self.userconfdir, path), "application/binary")
214        else:
215            return (None, None)
216
[3551ae1]217    def build_access_response(self, alloc_id, services):
[c119839]218        """
219        Create the SOAP response.
220
221        Build the dictionary description of the response and use
222        fedd_utils.pack_soap to create the soap message.  ap is the allocate
223        project message returned from a remote project allocation (even if that
224        allocation was done locally).
225        """
226        # Because alloc_id is already a fedd_services_types.IDType_Holder,
227        # there's no need to repack it
228        msg = { 
229                'allocID': alloc_id,
230                'fedAttr': [
231                    { 'attribute': 'domain', 'value': self.domain } , 
232                ]
233            }
[a65a65a]234        if self.dragon_endpoint:
235            msg['fedAttr'].append({'attribute': 'dragon',
236                'value': self.dragon_endpoint})
237        if self.deter_internal:
238            msg['fedAttr'].append({'attribute': 'deter_internal',
239                'value': self.deter_internal})
240        #XXX: ??
241        if self.dragon_vlans:
242            msg['fedAttr'].append({'attribute': 'vlans',
243                'value': self.dragon_vlans})
[c119839]244
245        if services:
246            msg['service'] = services
247        return msg
248
249    def RequestAccess(self, req, fid):
250        """
[3551ae1]251        Handle the access request.
[c119839]252        """
253
254        # The dance to get into the request body
255        if req.has_key('RequestAccessRequestBody'):
256            req = req['RequestAccessRequestBody']
257        else:
258            raise service_error(service_error.req, "No request!?")
259
260        if req.has_key('destinationTestbed'):
261            dt = unpack_id(req['destinationTestbed'])
262
[3551ae1]263        # Request for this fedd
[3cec20c]264        found, match, owners = self.lookup_access(req, fid)
[3551ae1]265        services, svc_state = self.export_services(req.get('service',[]),
266                None, None)
267        # keep track of what's been added
268        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
269        aid = unicode(allocID)
[c119839]270
[3551ae1]271        self.state_lock.acquire()
272        self.allocation[aid] = { }
273        # The protoGENI certificate
274        self.allocation[aid]['credentials'] = found
275        # The list of owner FIDs
[3cec20c]276        self.allocation[aid]['owners'] = owners
[3551ae1]277        self.write_state()
278        self.state_lock.release()
279        self.auth.set_attribute(fid, allocID)
280        self.auth.set_attribute(allocID, allocID)
[3cec20c]281        self.auth.save()
[3551ae1]282
283        try:
284            f = open("%s/%s.pem" % (self.certdir, aid), "w")
285            print >>f, alloc_cert
286            f.close()
287        except EnvironmentError, e:
288            raise service_error(service_error.internal, 
289                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
290        return self.build_access_response({ 'fedid': allocID }, None)
[c119839]291
292
293    def ReleaseAccess(self, req, fid):
294        # The dance to get into the request body
295        if req.has_key('ReleaseAccessRequestBody'):
296            req = req['ReleaseAccessRequestBody']
297        else:
298            raise service_error(service_error.req, "No request!?")
299
[3551ae1]300        # Local request
301        try:
302            if req['allocID'].has_key('localname'):
303                auth_attr = aid = req['allocID']['localname']
304            elif req['allocID'].has_key('fedid'):
305                aid = unicode(req['allocID']['fedid'])
306                auth_attr = req['allocID']['fedid']
[c119839]307            else:
308                raise service_error(service_error.req,
[3551ae1]309                        "Only localnames and fedids are understood")
310        except KeyError:
311            raise service_error(service_error.req, "Badly formed request")
[c119839]312
[3551ae1]313        self.log.debug("[access] deallocation requested for %s", aid)
314        if not self.auth.check_attribute(fid, auth_attr):
315            self.log.debug("[access] deallocation denied for %s", aid)
316            raise service_error(service_error.access, "Access Denied")
[c119839]317
[3551ae1]318        self.state_lock.acquire()
319        if self.allocation.has_key(aid):
320            self.log.debug("Found allocation for %s" %aid)
321            del self.allocation[aid]
322            self.write_state()
323            self.state_lock.release()
324            # And remove the access cert
325            cf = "%s/%s.pem" % (self.certdir, aid)
326            self.log.debug("Removing %s" % cf)
327            os.remove(cf)
328            return { 'allocID': req['allocID'] } 
329        else:
330            self.state_lock.release()
331            raise service_error(service_error.req, "No such allocation")
[c119839]332
[42cd8a7]333    def manifest_to_dict(self, manifest, ignore_debug=False):
[37ed9a5]334        """
335        Turn the manifest into a dict were each virtual nodename (i.e. the
336        topdl name) has an entry with the allocated machine in hostname and the
337        interfaces in 'interfaces'.  I love having XML parser code lying
338        around.
339        """
[42cd8a7]340        if self.create_debug and not ignore_debug: 
341            self.log.debug("Returning null manifest dict")
342            return { }
343
344        # The class allows us to keep a little state - the dict under
345        # consteruction and the current entry in that dict for the interface
346        # element code.
347        class manifest_parser:
348            def __init__(self):
349                self.d = { }
350                self.current_key=None
351
352            # If the element is a node, create a dict entry for it.  If it's an
353            # interface inside a node, add an entry in the interfaces list with
354            # the virtual name and component id.
355            def start_element(self, name, attrs):
356                if name == 'node':
357                    self.current_key = attrs.get('virtual_id',"")
358                    if self.current_key:
359                        self.d[self.current_key] = {
360                                'hostname': attrs.get('hostname', None),
[814b5e5]361                                'interfaces': { },
362                                'mac': { }
[42cd8a7]363                                }
364                elif name == 'interface' and self.current_key:
365                    self.d[self.current_key]['interfaces']\
366                            [attrs.get('virtual_id','')] = \
367                            attrs.get('component_id', None)
[814b5e5]368                elif name == 'interface_ref':
369                    # Collect mac address information from an interface_ref.
370                    # These appear after the node info has been parsed.
371                    nid = attrs.get('virtual_node_id', None)
372                    ifid = attrs.get('virtual_interface_id', None)
373                    mac = attrs.get('MAC', None)
374                    self.d[nid]['mac'][ifid] = mac
[42cd8a7]375            #  When a node is finished, clear current_key
376            def end_element(self, name):
377                if name == 'node': self.current_key = None
378
379        node = { }
380
381        mp = manifest_parser()
382        p = xml.parsers.expat.ParserCreate()
383        # These are bound to the class we just created
384        p.StartElementHandler = mp.start_element
385        p.EndElementHandler = mp.end_element
386
387        p.Parse(manifest)
388        # Make the node dict that the callers expect
389        for k in mp.d:
390            node[k] = mp.d.get('hostname', '')
391        return mp.d
392
393    def fake_manifest(self, topo):
[37ed9a5]394        """
395        Fake the output of manifest_to_dict with a bunch of generic node an
396        interface names, for debugging.
397        """
[42cd8a7]398        node = { }
399        for i, e in enumerate([ e for e in topo.elements \
400                if isinstance(e, topdl.Computer)]):
401            node[e.name] = { 
402                    'hostname': "node%03d" % i,
403                    'interfaces': { }
404                    }
405            for j, inf in enumerate(e.interface):
406                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
407
408        return node
409
410
411    def generate_portal_configs(self, topo, pubkey_base, 
412            secretkey_base, tmpdir, leid, connInfo, services, nodes):
413
414        def conninfo_to_dict(key, info):
415            """
416            Make a cpoy of the connection information about key, and flatten it
417            into a single dict by parsing out any feddAttrs.
418            """
419
420            rv = None
421            for i in info:
422                if key == i.get('portal', "") or \
423                        key in [e.get('element', "") \
424                        for e in i.get('member', [])]:
425                    rv = i.copy()
426                    break
427
428            else:
429                return rv
430
431            if 'fedAttr' in rv:
432                for a in rv['fedAttr']:
433                    attr = a.get('attribute', "")
434                    val = a.get('value', "")
435                    if attr and attr not in rv:
436                        rv[attr] = val
437                del rv['fedAttr']
438            return rv
439
440        # XXX: un hardcode this
441        def client_null(f, s):
442            print >>f, "Service: %s" % s['name']
443       
444        def client_seer_master(f, s):
445            print >>f, 'PortalAlias: seer-master'
446
447        def client_smb(f, s):
448            print >>f, "Service: %s" % s['name']
449            smbshare = None
450            smbuser = None
451            smbproj = None
452            for a in s.get('fedAttr', []):
453                if a.get('attribute', '') == 'SMBSHARE':
454                    smbshare = a.get('value', None)
455                elif a.get('attribute', '') == 'SMBUSER':
456                    smbuser = a.get('value', None)
457                elif a.get('attribute', '') == 'SMBPROJ':
458                    smbproj = a.get('value', None)
459
460            if all((smbshare, smbuser, smbproj)):
461                print >>f, "SMBshare: %s" % smbshare
462                print >>f, "ProjectUser: %s" % smbuser
463                print >>f, "ProjectName: %s" % smbproj
464
465        def client_hide_hosts(f, s):
466            for a in s.get('fedAttr', [ ]):
467                if a.get('attribute', "") == 'hosts':
468                    print >>f, 'Hide: %s' % a.get('value', "")
469
470        client_service_out = {
471                'SMB': client_smb,
472                'tmcd': client_null,
473                'seer': client_null,
474                'userconfig': client_null,
475                'project_export': client_null,
476                'seer_master': client_seer_master,
477                'hide_hosts': client_hide_hosts,
478            }
479
480        def client_seer_master_export(f, s):
481            print >>f, "AddedNode: seer-master"
482
483        def client_seer_local_export(f, s):
484            print >>f, "AddedNode: control"
485
486        client_export_service_out = {
487                'seer_master': client_seer_master_export,
488                'local_seer_control': client_seer_local_export,
489            }
490
491        def server_port(f, s):
492            p = urlparse(s.get('server', 'http://localhost'))
493            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
494
495        def server_null(f,s): pass
496
497        def server_seer(f, s):
498            print >>f, 'seer: true'
499
500        server_service_out = {
501                'SMB': server_port,
502                'tmcd': server_port,
503                'userconfig': server_null,
504                'project_export': server_null,
505                'seer': server_seer,
506                'seer_master': server_port,
507                'hide_hosts': server_null,
508            }
509        # XXX: end un hardcode this
510
511
512        seer_out = False
513        client_out = False
[6fd2b29]514        control_gw = None
[42cd8a7]515        for e in [ e for e in topo.elements \
516                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
517            myname = e.name
518            type = e.get_attribute('portal_type')
519
520            info = conninfo_to_dict(myname, connInfo)
521
522            if not info:
523                raise service_error(service_error.req,
524                        "No connectivity info for %s" % myname)
525
526            # Translate to physical name (ProtoGENI doesn't have DNS)
527            physname = nodes.get(myname, { }).get('hostname', None)
528            peer = info.get('peer', "")
529            ldomain = self.domain
530            ssh_port = info.get('ssh_port', 22)
531
532            # Collect this for the client.conf file
533            if 'masterexperiment' in info:
534                mproj, meid = info['masterexperiment'].split("/", 1)
535
536            active = info.get('active', 'False')
537
538            if type in ('control', 'both'):
539                testbed = e.get_attribute('testbed')
540                control_gw = myname
541
542            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
543            tunnelconfig = self.tunnel_config
544            try:
545                f = open(cfn, "w")
546                if active == 'True':
547                    print >>f, "active: True"
548                    print >>f, "ssh_port: %s" % ssh_port
549                    if type in ('control', 'both'):
550                        for s in [s for s in services \
551                                if s.get('name', "") in self.imports]:
552                            server_service_out[s['name']](f, s)
553
554                if tunnelconfig:
555                    print >>f, "tunnelip: %s" % tunnelconfig
556                print >>f, "peer: %s" % peer.lower()
557                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
558                        pubkey_base
559                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
560                        secretkey_base
561                f.close()
562            except EnvironmentError, e:
563                raise service_error(service_error.internal,
564                        "Can't write protal config %s: %s" % (cfn, e))
565
566        # Done with portals, write the client config file.
567        try:
568            f = open("%s/client.conf" % tmpdir, "w")
569            if control_gw:
570                print >>f, "ControlGateway: %s" % physname.lower()
571            for s in services:
572                if s.get('name',"") in self.imports and \
573                        s.get('visibility','') == 'import':
574                    client_service_out[s['name']](f, s)
575                if s.get('name', '') in self.exports and \
576                        s.get('visibility', '') == 'export' and \
577                        s['name'] in client_export_service_out:
578                    client_export_service_out[s['name']](f, s)
579            # Seer uses this.
580            if mproj and meid:
581                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
582            f.close()
583        except EnvironmentError, e:
584            raise service_error(service_error.internal,
585                    "Cannot write client.conf: %s" %s)
586
587
588
589    def export_store_info(self, cf, nodes, ssh_port, connInfo):
590        """
591        For the export requests in the connection info, install the peer names
592        at the experiment controller via SetValue calls.
593        """
594
595        for c in connInfo:
596            for p in [ p for p in c.get('parameter', []) \
597                    if p.get('type', '') == 'output']:
598
599                if p.get('name', '') == 'peer':
600                    k = p.get('key', None)
601                    surl = p.get('store', None)
602                    if surl and k and k.index('/') != -1:
603                        if self.create_debug:
604                            req = { 'name': k, 'value': 'debug' }
605                            self.call_SetValue(surl, req, cf)
606                        else:
607                            n = nodes.get(k[k.index('/')+1:], { })
608                            value = n.get('hostname', None)
609                            if value:
610                                req = { 'name': k, 'value': value }
611                                self.call_SetValue(surl, req, cf)
612                            else:
613                                self.log.error("No hostname for %s" % \
614                                        k[k.index('/'):])
615                    else:
616                        self.log.error("Bad export request: %s" % p)
617                elif p.get('name', '') == 'ssh_port':
618                    k = p.get('key', None)
619                    surl = p.get('store', None)
620                    if surl and k:
621                        req = { 'name': k, 'value': ssh_port }
622                        self.call_SetValue(surl, req, cf)
623                    else:
624                        self.log.error("Bad export request: %s" % p)
625                else:
626
627                    self.log.error("Unknown export parameter: %s" % \
628                            p.get('name'))
629                    continue
630
[37ed9a5]631    def write_node_config_script(self, elem, node, user, pubkey,
632            secretkey, stagingdir, tmpdir):
633        """
634        Write out the configuration script that is to run on the node
635        represented by elem in the topology.  This is called
636        once per node to configure.
637        """
638        # These little functions/functors just make things more readable.  Each
639        # one encapsulates a small task of copying software files or installing
640        # them.
[42cd8a7]641        class stage_file_type:
[37ed9a5]642            """
643            Write code copying file sfrom the staging host to the host on which
644            this will run.
645            """
[42cd8a7]646            def __init__(self, user, host, stagingdir):
647                self.user = user
648                self.host = host
649                self.stagingdir = stagingdir
650                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
651                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
652
653            def __call__(self, script, file, dest="."):
654                # If the file is a full pathname, do not use stagingdir
655                if file.find('/') == -1:
656                    file = "%s/%s" % (self.stagingdir, file)
657                print >>script, "%s %s@%s:%s %s" % \
658                        (self.scp, self.user, self.host, file, dest)
659
660        def install_tar(script, loc, base):
[37ed9a5]661            """
662            Print code to script to install a tarfile in loc.
663            """
[42cd8a7]664            tar = "/bin/tar"
665            mkdir="/bin/mkdir"
666
667            print >>script, "%s -p %s" % (mkdir, loc)
668            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
669
670        def install_rpm(script, base):
[37ed9a5]671            """
672            Print code to script to install an rpm
673            """
[42cd8a7]674            rpm = "/bin/rpm"
675            print >>script, "%s --install %s" % (rpm, base)
676
[37ed9a5]677        ifconfig = "/sbin/ifconfig"
[814b5e5]678        findif = '/usr/local/etc/emulab/findif'
[37ed9a5]679        stage_file = stage_file_type(user, self.staging_host, stagingdir)
680        pname = node.get('hostname', None)
[42cd8a7]681        fed_dir = "/usr/local/federation"
682        fed_etc_dir = "%s/etc" % fed_dir
683        fed_bin_dir = "%s/bin" % fed_dir
684        fed_lib_dir = "%s/lib" % fed_dir
685
[37ed9a5]686        if pname:
687            sfile = "%s/%s.startup" % (tmpdir, pname)
688            script = open(sfile, "w")
689            # Reset the interfaces to the ones in the topo file
690            for i in [ i for i in elem.interface \
691                    if not i.get_attribute('portal')]:
[3cec20c]692                if 'interfaces' in node:
693                    pinf = node['interfaces'].get(i.name, None)
694                else:
695                    pinf = None
696
697                if 'mac' in node:
698                    pmac = node['mac'].get(i.name, None)
699                else:
700                    pmac = None
[37ed9a5]701                addr = i.get_attribute('ip4_address') 
702                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
[814b5e5]703                # The interface names in manifests are not to be trusted, so we
704                # find the interface to configure using the local node's script
705                # to match mac address to interface name.
706                if pinf and addr and pmac:
707                    print >>script, '# %s' % pinf
[37ed9a5]708                    print >>script, \
[814b5e5]709                            "%s `%s %s` %s netmask %s"  % \
710                            (ifconfig, findif, pmac, addr, netmask)
[37ed9a5]711                else:
712                    self.log.error("Missing interface or address for %s" \
713                            % i.name)
714               
715            for l, f in self.federation_software:
716                base = os.path.basename(f)
717                stage_file(script, base)
718                if l: install_tar(script, l, base)
719                else: install_rpm(script, base)
720
721            for s in elem.software:
722                s_base = s.location.rpartition('/')[2]
723                stage_file(script, s_base)
724                if s.install: install_tar(script, s.install, s_base)
725                else: install_rpm(script, s_base)
726
727            for f in ('hosts', pubkey, secretkey, 'client.conf', 
728                    'userconf'):
729                stage_file(script, f, fed_etc_dir)
730            if self.sshd:
731                stage_file(script, self.sshd, fed_bin_dir)
732            if self.sshd_config:
733                stage_file(script, self.sshd_config, fed_etc_dir)
734
735            # Look in tmpdir to get the names.  They've all been copied
736            # into the (remote) staging dir
737            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
738                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
739
[c2f92c5]740            # Done with staging, remove the identity used to stage
741            print >>script, "#/bin/rm .ssh/id_rsa"
[37ed9a5]742
743            # Start commands
744            if elem.get_attribute('portal') and self.portal_startcommand:
745                # Install portal software
746                for l, f in self.portal_software:
[42cd8a7]747                    base = os.path.basename(f)
748                    stage_file(script, base)
749                    if l: install_tar(script, l, base)
750                    else: install_rpm(script, base)
751
[37ed9a5]752                # Portals never have a user-specified start command
753                print >>script, self.portal_startcommand
754            elif self.node_startcommand:
755                # XXX: debug
[1b6cc95]756                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
[37ed9a5]757                # XXX: debug
758                if elem.get_attribute('startup'):
759                    print >>script, "%s \\$USER '%s'" % \
760                            (self.node_startcommand,
761                                    elem.get_attribute('startup'))
762                else:
763                    print >>script, self.node_startcommand
764            script.close()
765            return sfile, pname
766        else:
767            return None, None
[42cd8a7]768
[37ed9a5]769
770    def configure_nodes(self, segment_commands, topo, nodes, user, 
771            pubkey, secretkey, stagingdir, tmpdir):
772        """
773        For each node in the topology, generate a script file that copies
774        software onto it and installs it in the proper places and then runs the
775        startup command (including the federation commands.
776        """
777
778
779
780        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
781            vname = e.name
782            sfile, pname = self.write_node_config_script(e,
783                    nodes.get(vname, { }),
784                    user, pubkey, secretkey, stagingdir, tmpdir)
785            if sfile:
786                if not segment_commands.scp_file(sfile, user, pname):
[42cd8a7]787                    self.log.error("Could not copy script to %s" % pname)
788            else:
789                self.log.error("Unmapped node: %s" % vname)
790
791    def start_node(self, user, host, node, segment_commands):
[37ed9a5]792        """
793        Copy an identity to a node for the configuration script to be able to
794        import data and then run the startup script remotely.
795        """
[42cd8a7]796        # Place an identity on the node so that the copying can succeed
[c2f92c5]797        segment_commands.scp_file( segment_commands.ssh_privkey_file,
798                user, node, ".ssh/id_rsa")
[42cd8a7]799        segment_commands.ssh_cmd(user, node, 
800                "sudo /bin/sh ./%s.startup &" % node)
801
802    def start_nodes(self, user, host, nodes, segment_commands):
[37ed9a5]803        """
804        Start a thread to initialize each node and wait for them to complete.
805        Each thread runs start_node.
806        """
[42cd8a7]807        threads = [ ]
808        for n in nodes:
809            t = Thread(target=self.start_node, args=(user, host, n, 
810                segment_commands))
811            t.start()
812            threads.append(t)
813
814        done = [not t.isAlive() for t in threads]
815        while not all(done):
816            self.log.info("Waiting for threads %s" % done)
817            time.sleep(10)
818            done = [not t.isAlive() for t in threads]
819
[37ed9a5]820    def set_up_staging_filespace(self, segment_commands, user, host,
821            stagingdir):
[42cd8a7]822        """
[37ed9a5]823        Set up teh staging area on the staging machine.  To reduce the number
824        of ssh commands, we compose a script and execute it remotely.
[42cd8a7]825        """
826
827        self.log.info("[start_segment]: creating script file")
828        try:
829            sf, scriptname = tempfile.mkstemp()
830            scriptfile = os.fdopen(sf, 'w')
831        except EnvironmentError:
832            return False
833
834        scriptbase = os.path.basename(scriptname)
835
836        # Script the filesystem changes
837        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
838        print >>scriptfile, 'mkdir -p %s' % stagingdir
839        print >>scriptfile, "rm -f %s" % scriptbase
840        scriptfile.close()
841
842        # Move the script to the remote machine
843        # XXX: could collide tempfile names on the remote host
844        if segment_commands.scp_file(scriptname, user, host, scriptbase):
845            os.remove(scriptname)
846        else:
847            return False
848
849        # Execute the script (and the script's last line deletes it)
850        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
851            return False
852
[37ed9a5]853    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
854        """
855        Protogeni interactions take a context and a protogeni certificate.
856        This establishes both for later calls and returns them.
857        """
858        if os.access(certfile, os.R_OK):
859            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
860        else:
861            self.log.error("[start_segment]: Cannot read certfile: %s" % \
862                    certfile)
863            return None, None
864
[42cd8a7]865        try:
[88dbe63]866            gcred = segment_commands.slice_authority_call('GetCredential', 
867                    {}, ctxt)
868        except segment_commands.ProtoGENIError, e:
[42cd8a7]869            raise service_error(service_error.federant,
870                    "ProtoGENI: %s" % e)
[37ed9a5]871
872        return ctxt, gcred
873
874    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
875        """
876        Find a usable slice name by trying random ones until there's no
877        collision.
878        """
879
880        def random_slicename(user):
881            """
882            Return a random slicename by appending 5 letters to the username.
883            """
884            slicename = user
885            for i in range(0,5):
886                slicename += random.choice(string.ascii_letters)
887            return slicename
888
[42cd8a7]889        while True:
890            slicename = random_slicename(user)
891            try:
892                param = {
893                        'credential': gcred, 
894                        'hrn': slicename,
895                        'type': 'Slice'
896                        }
[3cec20c]897
898                if not self.create_debug:
899                    segment_commands.slice_authority_call('Resolve', param, 
900                            ctxt)
901                else:
902                    raise segment_commands.ProtoGENIError(0,0,'Debug')
[42cd8a7]903            except segment_commands.ProtoGENIError, e:
904                print e
905                break
906
[37ed9a5]907        return slicename
908
909    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
910        """
911        Create the slice and allocate resources.  If any of this stuff fails,
912        the allocations will time out on PG in short order, so we just raise
913        the service_error.  Return the slice and sliver credentials as well as
914        the manifest.
915        """
[42cd8a7]916        try:
917            param = {
918                    'credential': gcred, 
919                    'hrn': slicename,
920                    'type': 'Slice'
921                    }
[88dbe63]922            slice_cred = segment_commands.slice_authority_call('Register',
[37ed9a5]923                    param, ctxt)
[d49c11c]924            # Resolve the slice to get the URN that PG has assigned it.
925            param = {
926                    'credential': gcred,
927                    'type': 'Slice',
928                    'hrn': slicename
929                    }
[88dbe63]930            data = segment_commands.slice_authority_call('Resolve', param,
[d49c11c]931                    ctxt)
932            if 'urn' in data:
933                slice_urn = data['urn']
934            else:
935                raise service_error(service_error.federant, 
[3cec20c]936                        "No URN returned for slice %s" % slicename)
[4875e93]937
938            if 'creator_urn' in data:
939                creator_urn = data['creator_urn']
940            else:
941                raise service_error(service_error.federant, 
[3cec20c]942                        "No creator URN returned for slice %s" % slicename)
[42cd8a7]943            # Populate the ssh keys (let PG format them)
944            param = {
945                    'credential': gcred, 
946                    }
[88dbe63]947            keys =  segment_commands.slice_authority_call('GetKeys', param, 
[37ed9a5]948                    ctxt)
[d49c11c]949            # Create a Sliver
[42cd8a7]950            param = {
[d49c11c]951                    'credentials': [ slice_cred ],
952                    'rspec': rspec, 
[4875e93]953                    'users': [ {
954                        'urn': creator_urn,
955                        'keys': keys, 
956                        },
957                    ],
[d49c11c]958                    'slice_urn': slice_urn,
[42cd8a7]959                    }
[208797c]960            rv = segment_commands.component_manager_call(
[d49c11c]961                    'CreateSliver', param, ctxt)
[208797c]962
963            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
964            # API hands back a sliver credential.  This just finds the format
965            # of choice.
966            if isinstance(rv, tuple):
967                manifest = rv[1]
968            else:
969                manifest = rv
[42cd8a7]970        except segment_commands.ProtoGENIError, e:
971            raise service_error(service_error.federant,
972                    "ProtoGENI: %s %s" % (e.code, e))
973
[add53ea]974        return (slice_urn, slice_cred, manifest, rspec)
[37ed9a5]975
[4875e93]976    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
[d49c11c]977            timeout=None):
[37ed9a5]978        """
979        Wait for the given slice to finish its startup.  Return the final
980        status.
981        """
[d49c11c]982        completed_states = ('failed', 'ready')
983        status = 'changing'
[c2f92c5]984        if timeout is not None:
985            end = time.time() + timeout
[37ed9a5]986        try:
[d49c11c]987            while status not in completed_states:
[37ed9a5]988                param = { 
[4875e93]989                        'credentials': [ slice_cred ],
[d49c11c]990                        'slice_urn': slice_urn,
[37ed9a5]991                        }
[88dbe63]992                r = segment_commands.component_manager_call(
[d49c11c]993                        'SliverStatus', param, ctxt)
[208797c]994                # GENIAPI uses geni_status as the key, so check for both
995                status = r.get('status', r.get('geni_status','changing'))
[d49c11c]996                if status not in completed_states:
[c2f92c5]997                    if timeout is not None and time.time() > end:
998                        return 'timeout'
[37ed9a5]999                    time.sleep(30)
1000        except segment_commands.ProtoGENIError, e:
1001            raise service_error(service_error.federant,
1002                    "ProtoGENI: %s %s" % (e.code, e))
1003
1004        return status
1005
[d49c11c]1006    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
[37ed9a5]1007        """
1008        Delete the slice resources.  An error from the service is ignores,
1009        because the soft state will go away anyway.
1010        """
1011        try:
[d49c11c]1012            param = { 
1013                    'credentials': [ slice_cred, ],
1014                    'slice_urn': slice_urn,
1015                    }
[88dbe63]1016            segment_commands.component_manager_call('DeleteSlice',
[37ed9a5]1017                    param, ctxt)
1018        except segment_commands.ProtoGENIError, e:
1019            self.log.warn("ProtoGENI: %s" % e)
1020
1021
1022
1023    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1024            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1025            export_certfile, topo, connInfo, services, timeout=0):
1026        """
1027        Start a sub-experiment on a federant.
1028
1029        Get the current state, modify or create as appropriate, ship data
1030        and configs and start the experiment.  There are small ordering
1031        differences based on the initial state of the sub-experiment.
1032        """
1033
1034        # Local software dir
1035        lsoftdir = "%s/software" % tmpdir
1036        host = self.staging_host
1037
1038        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1039                certfile, certpw)
1040
[814b5e5]1041        if not ctxt: return False, {}
[37ed9a5]1042
1043        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1044        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1045        self.log.info("Creating %s" % slicename)
[add53ea]1046        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
[37ed9a5]1047                segment_commands, slicename, rspec, gcred, ctxt)
1048
[42cd8a7]1049        # With manifest in hand, we can export the portal node names.
1050        if self.create_debug: nodes = self.fake_manifest(topo)
1051        else: nodes = self.manifest_to_dict(manifest)
1052
1053        self.export_store_info(export_certfile, nodes, self.ssh_port,
1054                connInfo)
1055        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1056                ename, connInfo, services, nodes)
1057
1058        # Copy software to the staging machine (done after generation to copy
1059        # those, too)
1060        for d in (tmpdir, lsoftdir):
1061            if os.path.isdir(d):
1062                for f in os.listdir(d):
1063                    if not os.path.isdir("%s/%s" % (d, f)):
1064                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1065                                user, host, "%s/%s" % (stagingdir, f)):
1066                            self.log.error("Scp failed")
[814b5e5]1067                            return False, {}
[42cd8a7]1068
1069
1070        # Now we wait for the nodes to start on PG
[4875e93]1071        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
[d49c11c]1072                ctxt, timeout=300)
[42cd8a7]1073        if status == 'failed':
1074            self.log.error('Sliver failed to start on ProtoGENI')
[d49c11c]1075            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
[814b5e5]1076            return False, {}
[c2f92c5]1077        elif status == 'timeout':
1078            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
[d49c11c]1079            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
[814b5e5]1080            return False, {}
[42cd8a7]1081        else:
[37ed9a5]1082            # All good: save ProtoGENI info in shared state
[42cd8a7]1083            self.state_lock.acquire()
[d49c11c]1084            self.allocation[aid]['slice_urn'] = slice_urn
[42cd8a7]1085            self.allocation[aid]['slice_name'] = slicename
1086            self.allocation[aid]['slice_credential'] = slice_cred
1087            self.allocation[aid]['manifest'] = manifest
[add53ea]1088            self.allocation[aid]['rspec'] = rspec
[42cd8a7]1089            self.allocation[aid]['certfile'] = certfile
1090            self.allocation[aid]['certpw'] = certpw
1091            self.write_state()
1092            self.state_lock.release()
1093
1094        # Now we have configuration to do for ProtoGENI
[37ed9a5]1095        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1096                secretkey, stagingdir, tmpdir)
[42cd8a7]1097
1098        self.start_nodes(user, self.staging_host, 
1099                [ n.get('hostname', None) for n in nodes.values()],
1100                segment_commands)
1101
1102        # Everything has gone OK.
1103        return True, dict([(k, n.get('hostname', None)) \
1104                for k, n in nodes.items()])
1105
[3551ae1]1106    def generate_rspec(self, topo, softdir, connInfo):
[c2f92c5]1107
1108        # Force a useful image.  Without picking this the nodes can get
1109        # different images and there is great pain.
1110        def image_filter(e):
1111            if isinstance(e, topdl.Computer):
1112                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1113                        'image+emulab-ops//FEDORA10-STD" />'
1114            else:
1115                return ""
1116        # Main line of generate
[c119839]1117        t = topo.clone()
1118
1119        starts = { }
1120        # Weed out the things we aren't going to instantiate: Segments, portal
1121        # substrates, and portal interfaces.  (The copy in the for loop allows
1122        # us to delete from e.elements in side the for loop).  While we're
1123        # touching all the elements, we also adjust paths from the original
1124        # testbed to local testbed paths and put the federation commands and
1125        # startcommands into a dict so we can start them manually later.
1126        # ProtoGENI requires setup before the federation commands run, so we
1127        # run them by hand after we've seeded configurations.
1128        for e in [e for e in t.elements]:
1129            if isinstance(e, topdl.Segment):
1130                t.elements.remove(e)
1131            # Fix software paths
1132            for s in getattr(e, 'software', []):
1133                s.location = re.sub("^.*/", softdir, s.location)
1134            if isinstance(e, topdl.Computer):
[a65a65a]1135                if e.get_attribute('portal') and self.portal_startcommand:
[c119839]1136                    # Portals never have a user-specified start command
[a65a65a]1137                    starts[e.name] = self.portal_startcommand
1138                elif self.node_startcommand:
[c119839]1139                    if e.get_attribute('startup'):
[a65a65a]1140                        starts[e.name] = "%s \\$USER '%s'" % \
1141                                (self.node_startcommand, 
1142                                        e.get_attribute('startup'))
[c119839]1143                        e.remove_attribute('startup')
1144                    else:
[a65a65a]1145                        starts[e.name] = self.node_startcommand
[c119839]1146
1147                # Remove portal interfaces
1148                e.interface = [i for i in e.interface \
1149                        if not i.get_attribute('portal')]
1150
1151        t.substrates = [ s.clone() for s in t.substrates ]
1152        t.incorporate_elements()
1153
[c2f92c5]1154        # Customize the rspec output to use the image we like
1155        filters = [ image_filter ]
[c119839]1156
1157        # Convert to rspec and return it
1158        exp_rspec = topdl.topology_to_rspec(t, filters)
1159
1160        return exp_rspec
1161
[3551ae1]1162    def retrieve_software(self, topo, certfile, softdir):
1163        """
1164        Collect the software that nodes in the topology need loaded and stage
1165        it locally.  This implies retrieving it from the experiment_controller
1166        and placing it into softdir.  Certfile is used to prove that this node
1167        has access to that data (it's the allocation/segment fedid).  Finally
1168        local portal and federation software is also copied to the same staging
1169        directory for simplicity - all software needed for experiment creation
1170        is in softdir.
1171        """
1172        sw = set()
1173        for e in topo.elements:
1174            for s in getattr(e, 'software', []):
1175                sw.add(s.location)
1176        os.mkdir(softdir)
1177        for s in sw:
1178            self.log.debug("Retrieving %s" % s)
1179            try:
1180                get_url(s, certfile, softdir)
1181            except:
1182                t, v, st = sys.exc_info()
1183                raise service_error(service_error.internal,
1184                        "Error retrieving %s: %s" % (s, v))
1185
1186        # Copy local portal node software to the tempdir
1187        for s in (self.portal_software, self.federation_software):
1188            for l, f in s:
1189                base = os.path.basename(f)
1190                copy_file(f, "%s/%s" % (softdir, base))
1191
1192
1193    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1194        """
1195        Gather common configuration files, retrieve or create an experiment
1196        name and project name, and return the ssh_key filenames.  Create an
1197        allocation log bound to the state log variable as well.
1198        """
[c119839]1199        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
[3551ae1]1200        ename = None
1201        pubkey_base = None
1202        secretkey_base = None
1203        alloc_log = None
1204
1205        for a in attrs:
1206            if a['attribute'] in configs:
1207                try:
1208                    self.log.debug("Retrieving %s" % a['value'])
1209                    get_url(a['value'], certfile, tmpdir)
1210                except:
1211                    t, v, st = sys.exc_info()
1212                    raise service_error(service_error.internal,
1213                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1214            if a['attribute'] == 'ssh_pubkey':
1215                pubkey_base = a['value'].rpartition('/')[2]
1216            if a['attribute'] == 'ssh_secretkey':
1217                secretkey_base = a['value'].rpartition('/')[2]
1218            if a['attribute'] == 'experiment_name':
1219                ename = a['value']
1220
1221        if not ename:
1222            ename = ""
1223            for i in range(0,5):
1224                ename += random.choice(string.ascii_letters)
1225            self.log.warn("No experiment name: picked one randomly: %s" \
1226                    % ename)
1227
1228        self.state_lock.acquire()
1229        if self.allocation.has_key(aid):
1230            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1231            self.allocation[aid]['experiment'] = ename
1232            self.allocation[aid]['log'] = [ ]
1233            # Create a logger that logs to the experiment's state object as
1234            # well as to the main log file.
1235            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1236            h = logging.StreamHandler(
1237                    list_log.list_log(self.allocation[aid]['log']))
1238            # XXX: there should be a global one of these rather than
1239            # repeating the code.
1240            h.setFormatter(logging.Formatter(
1241                "%(asctime)s %(name)s %(message)s",
1242                        '%d %b %y %H:%M:%S'))
1243            alloc_log.addHandler(h)
1244            self.write_state()
1245        else:
1246            self.log.error("No allocation for %s!?" % aid)
1247        self.state_lock.release()
[c119839]1248
[3551ae1]1249        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1250                cpw, alloc_log)
1251
[42cd8a7]1252    def finalize_experiment(self, topo, nodes, aid, alloc_id):
[3551ae1]1253        # Copy the assigned names into the return topology
1254        rvtopo = topo.clone()
1255        embedding = [ ]
[42cd8a7]1256        for k, n in nodes.items():
[3551ae1]1257            embedding.append({ 
[42cd8a7]1258                'toponame': k,
[1b6cc95]1259                'physname': [n ],
[3551ae1]1260                })
1261        # Grab the log (this is some anal locking, but better safe than
1262        # sorry)
1263        self.state_lock.acquire()
1264        logv = "".join(self.allocation[aid]['log'])
1265        # It's possible that the StartSegment call gets retried (!).
1266        # if the 'started' key is in the allocation, we'll return it rather
1267        # than redo the setup.
1268        self.allocation[aid]['started'] = { 
1269                'allocID': alloc_id,
1270                'allocationLog': logv,
1271                'segmentdescription': { 
1272                    'topdldescription': rvtopo.to_dict() },
1273                'embedding': embedding,
1274                }
1275        retval = copy.deepcopy(self.allocation[aid]['started'])
1276        self.write_state()
1277        self.state_lock.release()
1278
1279        return retval
1280
1281    def StartSegment(self, req, fid):
[c119839]1282        err = None  # Any service_error generated after tmpdir is created
1283        rv = None   # Return value from segment creation
1284
1285        try:
1286            req = req['StartSegmentRequestBody']
[3551ae1]1287            topref = req['segmentdescription']['topdldescription']
[c119839]1288        except KeyError:
1289            raise service_error(service_error.req, "Badly formed request")
1290
1291        connInfo = req.get('connection', [])
1292        services = req.get('service', [])
1293        auth_attr = req['allocID']['fedid']
1294        aid = "%s" % auth_attr
1295        attrs = req.get('fedAttr', [])
1296        if not self.auth.check_attribute(fid, auth_attr):
1297            raise service_error(service_error.access, "Access denied")
[cd06678]1298        else:
1299            # See if this is a replay of an earlier succeeded StartSegment -
1300            # sometimes SSL kills 'em.  If so, replay the response rather than
1301            # redoing the allocation.
1302            self.state_lock.acquire()
1303            retval = self.allocation[aid].get('started', None)
1304            self.state_lock.release()
1305            if retval:
1306                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1307                        "replaying response")
1308                return retval
[c119839]1309
[3551ae1]1310        if topref:
1311            topo = topdl.Topology(**topref)
[c119839]1312        else:
1313            raise service_error(service_error.req, 
1314                    "Request missing segmentdescription'")
1315
1316        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1317        try:
1318            tmpdir = tempfile.mkdtemp(prefix="access-")
1319            softdir = "%s/software" % tmpdir
[d3c8759]1320        except EnvironmentError:
[c119839]1321            raise service_error(service_error.internal, "Cannot create tmp dir")
1322
1323        # Try block alllows us to clean up temporary files.
1324        try:
[3551ae1]1325            self.retrieve_software(topo, certfile, softdir)
1326            self.configure_userconf(services, tmpdir)
1327            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1328                cpw, alloc_log = self.initialize_experiment_info(attrs,
1329                        aid, certfile, tmpdir)
[c119839]1330            self.import_store_info(certfile, connInfo)
1331            rspec = self.generate_rspec(topo, "%s/%s/" \
[3551ae1]1332                    % (self.staging_dir, ename), connInfo)
[c119839]1333
[208797c]1334            segment_commands = self.api_proxy(keyfile=ssh_key,
[c119839]1335                    debug=self.create_debug, log=alloc_log,
1336                    ch_url = self.ch_url, sa_url=self.sa_url,
1337                    cm_url=self.cm_url)
[42cd8a7]1338            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1339                    pubkey_base, 
1340                    secretkey_base, ename,
[c119839]1341                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
[593e901]1342                    certfile, topo, connInfo, services)
[37ed9a5]1343        except EnvironmentError, e:
[3551ae1]1344            err = service_error(service_error.internal, "%s" % e)
[c119839]1345        except service_error, e:
1346            err = e
[3551ae1]1347        except:
1348            t, v, st = sys.exc_info()
1349            err = service_error(service_error.internal, "%s: %s" % \
1350                    (v, traceback.extract_tb(st)))
[c119839]1351
1352        # Walk up tmpdir, deleting as we go
[3551ae1]1353        if self.cleanup: self.remove_dirs(tmpdir)
1354        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
[c119839]1355
1356        if rv:
[42cd8a7]1357            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
[c119839]1358        elif err:
1359            raise service_error(service_error.federant,
1360                    "Swapin failed: %s" % err)
1361        else:
1362            raise service_error(service_error.federant, "Swapin failed")
1363
[42cd8a7]1364    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
[d49c11c]1365            slice_urn, certfile, certpw):
[42cd8a7]1366        """
1367        Stop a sub experiment by calling swapexp on the federant
1368        """
1369        host = self.staging_host
1370        rv = False
1371        try:
1372            # Clean out tar files: we've gone over quota in the past
1373            if stagingdir:
1374                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1375            if slice_cred:
1376                self.log.error('Removing Sliver on ProtoGENI')
1377                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
[d49c11c]1378                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
[42cd8a7]1379            return True
1380        except self.ssh_cmd_timeout:
1381            rv = False
1382        return rv
1383
[c119839]1384    def TerminateSegment(self, req, fid):
1385        try:
1386            req = req['TerminateSegmentRequestBody']
1387        except KeyError:
1388            raise service_error(service_error.req, "Badly formed request")
1389
1390        auth_attr = req['allocID']['fedid']
1391        aid = "%s" % auth_attr
1392        attrs = req.get('fedAttr', [])
1393        if not self.auth.check_attribute(fid, auth_attr):
1394            raise service_error(service_error.access, "Access denied")
1395
1396        self.state_lock.acquire()
1397        if self.allocation.has_key(aid):
1398            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1399            slice_cred = self.allocation[aid].get('slice_credential', None)
[d49c11c]1400            slice_urn = self.allocation[aid].get('slice_urn', None)
[c119839]1401            ename = self.allocation[aid].get('experiment', None)
1402        else:
1403            cf, user, ssh_key, cpw = (None, None, None, None)
1404            slice_cred = None
[4875e93]1405            slice_urn = None
[c119839]1406            ename = None
1407        self.state_lock.release()
1408
1409        if ename:
1410            staging = "%s/%s" % ( self.staging_dir, ename)
1411        else:
1412            self.log.warn("Can't find experiment name for %s" % aid)
1413            staging = None
1414
[208797c]1415        segment_commands = self.api_proxy(keyfile=ssh_key,
[42cd8a7]1416                debug=self.create_debug, ch_url = self.ch_url,
1417                sa_url=self.sa_url, cm_url=self.cm_url)
[d49c11c]1418        self.stop_segment(segment_commands, user, staging, slice_cred,
1419                slice_urn, cf, cpw)
[c119839]1420        return { 'allocID': req['allocID'] }
[dd3e38b]1421
[d49c11c]1422    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
[42cd8a7]1423            certfile, certpw):
[37ed9a5]1424        """
1425        Linear code through the segment renewal calls.
1426        """
[42cd8a7]1427        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1428        try:
1429            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1430                    time.gmtime(time.time() + interval))
[88dbe63]1431            cred = segment_commands.slice_authority_call('GetCredential', 
[37ed9a5]1432                    {}, ctxt)
[42cd8a7]1433
1434            param = {
1435                    'credential': scred,
1436                    'expiration': expiration
1437                    }
[88dbe63]1438            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
[42cd8a7]1439            param = {
1440                    'credential': cred,
[814b5e5]1441                    'urn': slice_urn,
[42cd8a7]1442                    'type': 'Slice',
1443                    }
[88dbe63]1444            new_scred = segment_commands.slice_authority_call('GetCredential',
[37ed9a5]1445                    param, ctxt)
[42cd8a7]1446
1447        except segment_commands.ProtoGENIError, e:
1448            self.log.error("Failed to extend slice %s: %s" % (name, e))
1449            return None
1450        try:
1451            param = {
[d49c11c]1452                    'credentials': [new_scred,],
1453                    'slice_urn': slice_urn,
[42cd8a7]1454                    }
[88dbe63]1455            r = segment_commands.component_manager_call('RenewSlice', param, 
1456                    ctxt)
[42cd8a7]1457        except segment_commands.ProtoGENIError, e:
1458            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1459
1460        return new_scred
1461   
1462
[dd3e38b]1463    def RenewSlices(self):
1464        self.log.info("Scanning for slices to renew")
1465        self.state_lock.acquire()
1466        aids = self.allocation.keys()
1467        self.state_lock.release()
1468
1469        for aid in aids:
1470            self.state_lock.acquire()
1471            if self.allocation.has_key(aid):
1472                name = self.allocation[aid].get('slice_name', None)
1473                scred = self.allocation[aid].get('slice_credential', None)
[d49c11c]1474                slice_urn = self.allocation[aid].get('slice_urn', None)
[dd3e38b]1475                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1476            else:
1477                name = None
1478                scred = None
1479            self.state_lock.release()
1480
[3551ae1]1481            if not os.access(cf, os.R_OK):
1482                self.log.error(
1483                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1484                continue
1485
[dd3e38b]1486            # There's a ProtoGENI slice associated with the segment; renew it.
[d49c11c]1487            if name and scred and slice_urn:
[208797c]1488                segment_commands = self.api_proxy(log=self.log, 
[dd3e38b]1489                        debug=self.create_debug, keyfile=ssh_key,
1490                        cm_url = self.cm_url, sa_url = self.sa_url,
1491                        ch_url = self.ch_url)
[42cd8a7]1492                new_scred = self.renew_segment(segment_commands, name, scred, 
[d49c11c]1493                        slice_urn, self.renewal_interval, cf, cpw)
[dd3e38b]1494                if new_scred:
1495                    self.log.info("Slice %s renewed until %s GMT" % \
1496                            (name, time.asctime(time.gmtime(\
1497                                time.time()+self.renewal_interval))))
1498                    self.state_lock.acquire()
1499                    if self.allocation.has_key(aid):
1500                        self.allocation[aid]['slice_credential'] = new_scred
[4875e93]1501                        self.write_state()
[dd3e38b]1502                    self.state_lock.release()
1503                else:
1504                    self.log.info("Failed to renew slice %s " % name)
1505
1506        # Let's do this all again soon.  (4 tries before the slices time out)   
1507        t = Timer(self.renewal_interval/4, self.RenewSlices)
1508        t.start()
Note: See TracBrowser for help on using the repository browser.