source: fedd/federation/protogeni_access.py @ 63c6664

axis_examplecompt_changesinfo-ops
Last change on this file since 63c6664 was 6fd2b29, checked in by Ted Faber <faber@…>, 14 years ago

Missing initialization

  • Property mode set to 100644
File size: 47.7 KB
RevLine 
[c119839]1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
[dd3e38b]6import time
[c119839]7import string
8import copy
9import pickle
10import logging
11import subprocess
[42cd8a7]12import random
[3551ae1]13import traceback
[1b6cc95]14import xml.parsers.expat
[c119839]15
[42cd8a7]16from threading import Thread, Timer, Lock
[c119839]17
18from util import *
19from fedid import fedid, generate_fedid
[3cec20c]20from authorizer import authorizer, abac_authorizer
[c119839]21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
[3551ae1]28from access import access_base
[208797c]29from protogeni_proxy import protogeni_proxy
30from geniapi_proxy import geniapi_proxy
[3551ae1]31
[c119839]32import topdl
33import list_log
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
[3551ae1]43class access(access_base):
[c119839]44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    def __init__(self, config=None, auth=None):
52        """
53        Initializer.  Pulls parameters out of the ConfigParser's access section.
54        """
55
[3551ae1]56        access_base.__init__(self, config, auth)
[c119839]57
58        self.domain = config.get("access", "domain")
59        self.userconfdir = config.get("access","userconfdir")
60        self.userconfcmd = config.get("access","userconfcmd")
61        self.userconfurl = config.get("access","userconfurl")
[9b3627e]62        self.federation_software = config.get("access", "federation_software")
63        self.portal_software = config.get("access", "portal_software")
[c119839]64        self.ssh_port = config.get("access","ssh_port") or "22"
65        self.sshd = config.get("access","sshd")
66        self.sshd_config = config.get("access", "sshd_config")
67        self.access_type = config.get("access", "type")
68        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
69        self.staging_host = config.get("access", "staging_host") \
70                or "ops.emulab.net"
[a65a65a]71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
[9b3627e]74   
[a65a65a]75        self.dragon_endpoint = config.get("access", "dragon")
76        self.dragon_vlans = config.get("access", "dragon_vlans")
77        self.deter_internal = config.get("access", "deter_internal")
78
79        self.tunnel_config = config.getboolean("access", "tunnel_config")
80        self.portal_command = config.get("access", "portal_command")
81        self.portal_image = config.get("access", "portal_image")
82        self.portal_type = config.get("access", "portal_type") or "pc"
83        self.portal_startcommand = config.get("access", "portal_startcommand")
84        self.node_startcommand = config.get("access", "node_startcommand")
85
[3551ae1]86        self.federation_software = self.software_list(self.federation_software)
87        self.portal_software = self.software_list(self.portal_software)
88        self.local_seer_software = self.software_list(self.local_seer_software)
[c119839]89
[310d419]90        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
91        self.renewal_interval = int(self.renewal_interval) * 60
[dd3e38b]92
[c119839]93        self.ch_url = config.get("access", "ch_url")
94        self.sa_url = config.get("access", "sa_url")
95        self.cm_url = config.get("access", "cm_url")
96
97        self.restricted = [ ]
98
[3551ae1]99        # read_state in the base_class
100        self.state_lock.acquire()
101        for a  in ('allocation', 'projects', 'keys', 'types'):
102            if a not in self.state:
103                self.state[a] = { }
104        self.allocation = self.state['allocation']
105        self.projects = self.state['projects']
106        self.keys = self.state['keys']
107        self.types = self.state['types']
108        self.state_lock.release()
[c119839]109
110
[3551ae1]111        self.log = logging.getLogger("fedd.access")
112        set_log_level(config, "access", self.log)
[c119839]113
[3cec20c]114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            self.access = { }
122            if accessdb:
123                self.legacy_read_access(accessdb, self.make_access_info)
124            # Add the ownership attributes to the authorizer.  Note that the
125            # indices of the allocation dict are strings, but the attributes are
126            # fedids, so there is a conversion.
127            self.state_lock.acquire()
128            for k in self.state.get('allocation', {}).keys():
129                for o in self.state['allocation'][k].get('owners', []):
130                    self.auth.set_attribute(o, fedid(hexstr=k))
131                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
[3551ae1]132
[3cec20c]133            self.state_lock.release()
134            self.lookup_access = self.legacy_lookup_access_base
135        elif self.auth_type == 'abac':
136            self.auth = abac_authorizer(load=self.auth_dir)
137            self.access = [ ]
138            if accessdb:
139                self.read_access(accessdb, self.make_access_info)
140        else:
141            raise service_error(service_error.internal, 
142                    "Unknown auth_type: %s" % self.auth_type)
[208797c]143        api = config.get("access", "api") or "protogeni"
144        if api == "protogeni":
145            self.api_proxy = protogeni_proxy
146        elif api == "geniapi":
147            self.api_proxy = geniapi_proxy
148        else:
149            self.log.debug("Unknown interface, using protogeni")
150            self.api_proxy = protogeni_proxy
151
[c119839]152        self.call_SetValue = service_caller('SetValue')
153        self.call_GetValue = service_caller('GetValue')
[3551ae1]154        self.exports = {
155                'local_seer_control': self.export_local_seer,
156                'seer_master': self.export_seer_master,
157                'hide_hosts': self.export_hide_hosts,
158                }
159
160        if not self.local_seer_image or not self.local_seer_software or \
161                not self.local_seer_start:
162            if 'local_seer_control' in self.exports:
163                del self.exports['local_seer_control']
164
165        if not self.local_seer_image or not self.local_seer_software or \
166                not self.seer_master_start:
167            if 'seer_master' in self.exports:
168                del self.exports['seer_master']
[c119839]169
[dd3e38b]170        self.RenewSlices()
171
[c119839]172        self.soap_services = {\
173            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
174            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
175            'StartSegment': soap_handler("StartSegment", self.StartSegment),
176            'TerminateSegment': soap_handler("TerminateSegment", 
177                self.TerminateSegment),
178            }
179        self.xmlrpc_services =  {\
180            'RequestAccess': xmlrpc_handler('RequestAccess',
181                self.RequestAccess),
182            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
183                self.ReleaseAccess),
184            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
185            'TerminateSegment': xmlrpc_handler('TerminateSegment',
186                self.TerminateSegment),
187            }
188
[3551ae1]189    @staticmethod
190    def make_access_info(s):
191        """
192        Split a string of the form (id, id, id, id) ito its constituent tuples
193        and return them as a tuple.  Use to import access info from the
194        access_db.
195        """
[c119839]196
[3551ae1]197        ss = s.strip()
198        if ss.startswith('(') and ss.endswith(')'):
199            l = [ s.strip() for s  in ss[1:-1].split(",")]
200            if len(l) == 4:
201                return tuple(l)
202            else:
203                raise self.parse_error(
204                        "Exactly 4 elements in access info required")
205        else:
206            raise self.parse_error("Expecting parenthezied values")
[c119839]207
208
209    def get_handler(self, path, fid):
210        self.log.info("Get handler %s %s" % (path, fid))
211        if self.auth.check_attribute(fid, path) and self.userconfdir:
212            return ("%s/%s" % (self.userconfdir, path), "application/binary")
213        else:
214            return (None, None)
215
[3551ae1]216    def build_access_response(self, alloc_id, services):
[c119839]217        """
218        Create the SOAP response.
219
220        Build the dictionary description of the response and use
221        fedd_utils.pack_soap to create the soap message.  ap is the allocate
222        project message returned from a remote project allocation (even if that
223        allocation was done locally).
224        """
225        # Because alloc_id is already a fedd_services_types.IDType_Holder,
226        # there's no need to repack it
227        msg = { 
228                'allocID': alloc_id,
229                'fedAttr': [
230                    { 'attribute': 'domain', 'value': self.domain } , 
231                ]
232            }
[a65a65a]233        if self.dragon_endpoint:
234            msg['fedAttr'].append({'attribute': 'dragon',
235                'value': self.dragon_endpoint})
236        if self.deter_internal:
237            msg['fedAttr'].append({'attribute': 'deter_internal',
238                'value': self.deter_internal})
239        #XXX: ??
240        if self.dragon_vlans:
241            msg['fedAttr'].append({'attribute': 'vlans',
242                'value': self.dragon_vlans})
[c119839]243
244        if services:
245            msg['service'] = services
246        return msg
247
248    def RequestAccess(self, req, fid):
249        """
[3551ae1]250        Handle the access request.
[c119839]251        """
252
253        # The dance to get into the request body
254        if req.has_key('RequestAccessRequestBody'):
255            req = req['RequestAccessRequestBody']
256        else:
257            raise service_error(service_error.req, "No request!?")
258
259        if req.has_key('destinationTestbed'):
260            dt = unpack_id(req['destinationTestbed'])
261
[3551ae1]262        # Request for this fedd
[3cec20c]263        found, match, owners = self.lookup_access(req, fid)
[3551ae1]264        services, svc_state = self.export_services(req.get('service',[]),
265                None, None)
266        # keep track of what's been added
267        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
268        aid = unicode(allocID)
[c119839]269
[3551ae1]270        self.state_lock.acquire()
271        self.allocation[aid] = { }
272        # The protoGENI certificate
273        self.allocation[aid]['credentials'] = found
274        # The list of owner FIDs
[3cec20c]275        self.allocation[aid]['owners'] = owners
[3551ae1]276        self.write_state()
277        self.state_lock.release()
278        self.auth.set_attribute(fid, allocID)
279        self.auth.set_attribute(allocID, allocID)
[3cec20c]280        self.auth.save()
[3551ae1]281
282        try:
283            f = open("%s/%s.pem" % (self.certdir, aid), "w")
284            print >>f, alloc_cert
285            f.close()
286        except EnvironmentError, e:
287            raise service_error(service_error.internal, 
288                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
289        return self.build_access_response({ 'fedid': allocID }, None)
[c119839]290
291
292    def ReleaseAccess(self, req, fid):
293        # The dance to get into the request body
294        if req.has_key('ReleaseAccessRequestBody'):
295            req = req['ReleaseAccessRequestBody']
296        else:
297            raise service_error(service_error.req, "No request!?")
298
[3551ae1]299        # Local request
300        try:
301            if req['allocID'].has_key('localname'):
302                auth_attr = aid = req['allocID']['localname']
303            elif req['allocID'].has_key('fedid'):
304                aid = unicode(req['allocID']['fedid'])
305                auth_attr = req['allocID']['fedid']
[c119839]306            else:
307                raise service_error(service_error.req,
[3551ae1]308                        "Only localnames and fedids are understood")
309        except KeyError:
310            raise service_error(service_error.req, "Badly formed request")
[c119839]311
[3551ae1]312        self.log.debug("[access] deallocation requested for %s", aid)
313        if not self.auth.check_attribute(fid, auth_attr):
314            self.log.debug("[access] deallocation denied for %s", aid)
315            raise service_error(service_error.access, "Access Denied")
[c119839]316
[3551ae1]317        self.state_lock.acquire()
318        if self.allocation.has_key(aid):
319            self.log.debug("Found allocation for %s" %aid)
320            del self.allocation[aid]
321            self.write_state()
322            self.state_lock.release()
323            # And remove the access cert
324            cf = "%s/%s.pem" % (self.certdir, aid)
325            self.log.debug("Removing %s" % cf)
326            os.remove(cf)
327            return { 'allocID': req['allocID'] } 
328        else:
329            self.state_lock.release()
330            raise service_error(service_error.req, "No such allocation")
[c119839]331
[42cd8a7]332    def manifest_to_dict(self, manifest, ignore_debug=False):
[37ed9a5]333        """
334        Turn the manifest into a dict were each virtual nodename (i.e. the
335        topdl name) has an entry with the allocated machine in hostname and the
336        interfaces in 'interfaces'.  I love having XML parser code lying
337        around.
338        """
[42cd8a7]339        if self.create_debug and not ignore_debug: 
340            self.log.debug("Returning null manifest dict")
341            return { }
342
343        # The class allows us to keep a little state - the dict under
344        # consteruction and the current entry in that dict for the interface
345        # element code.
346        class manifest_parser:
347            def __init__(self):
348                self.d = { }
349                self.current_key=None
350
351            # If the element is a node, create a dict entry for it.  If it's an
352            # interface inside a node, add an entry in the interfaces list with
353            # the virtual name and component id.
354            def start_element(self, name, attrs):
355                if name == 'node':
356                    self.current_key = attrs.get('virtual_id',"")
357                    if self.current_key:
358                        self.d[self.current_key] = {
359                                'hostname': attrs.get('hostname', None),
[814b5e5]360                                'interfaces': { },
361                                'mac': { }
[42cd8a7]362                                }
363                elif name == 'interface' and self.current_key:
364                    self.d[self.current_key]['interfaces']\
365                            [attrs.get('virtual_id','')] = \
366                            attrs.get('component_id', None)
[814b5e5]367                elif name == 'interface_ref':
368                    # Collect mac address information from an interface_ref.
369                    # These appear after the node info has been parsed.
370                    nid = attrs.get('virtual_node_id', None)
371                    ifid = attrs.get('virtual_interface_id', None)
372                    mac = attrs.get('MAC', None)
373                    self.d[nid]['mac'][ifid] = mac
[42cd8a7]374            #  When a node is finished, clear current_key
375            def end_element(self, name):
376                if name == 'node': self.current_key = None
377
378        node = { }
379
380        mp = manifest_parser()
381        p = xml.parsers.expat.ParserCreate()
382        # These are bound to the class we just created
383        p.StartElementHandler = mp.start_element
384        p.EndElementHandler = mp.end_element
385
386        p.Parse(manifest)
387        # Make the node dict that the callers expect
388        for k in mp.d:
389            node[k] = mp.d.get('hostname', '')
390        return mp.d
391
392    def fake_manifest(self, topo):
[37ed9a5]393        """
394        Fake the output of manifest_to_dict with a bunch of generic node an
395        interface names, for debugging.
396        """
[42cd8a7]397        node = { }
398        for i, e in enumerate([ e for e in topo.elements \
399                if isinstance(e, topdl.Computer)]):
400            node[e.name] = { 
401                    'hostname': "node%03d" % i,
402                    'interfaces': { }
403                    }
404            for j, inf in enumerate(e.interface):
405                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
406
407        return node
408
409
410    def generate_portal_configs(self, topo, pubkey_base, 
411            secretkey_base, tmpdir, leid, connInfo, services, nodes):
412
413        def conninfo_to_dict(key, info):
414            """
415            Make a cpoy of the connection information about key, and flatten it
416            into a single dict by parsing out any feddAttrs.
417            """
418
419            rv = None
420            for i in info:
421                if key == i.get('portal', "") or \
422                        key in [e.get('element', "") \
423                        for e in i.get('member', [])]:
424                    rv = i.copy()
425                    break
426
427            else:
428                return rv
429
430            if 'fedAttr' in rv:
431                for a in rv['fedAttr']:
432                    attr = a.get('attribute', "")
433                    val = a.get('value', "")
434                    if attr and attr not in rv:
435                        rv[attr] = val
436                del rv['fedAttr']
437            return rv
438
439        # XXX: un hardcode this
440        def client_null(f, s):
441            print >>f, "Service: %s" % s['name']
442       
443        def client_seer_master(f, s):
444            print >>f, 'PortalAlias: seer-master'
445
446        def client_smb(f, s):
447            print >>f, "Service: %s" % s['name']
448            smbshare = None
449            smbuser = None
450            smbproj = None
451            for a in s.get('fedAttr', []):
452                if a.get('attribute', '') == 'SMBSHARE':
453                    smbshare = a.get('value', None)
454                elif a.get('attribute', '') == 'SMBUSER':
455                    smbuser = a.get('value', None)
456                elif a.get('attribute', '') == 'SMBPROJ':
457                    smbproj = a.get('value', None)
458
459            if all((smbshare, smbuser, smbproj)):
460                print >>f, "SMBshare: %s" % smbshare
461                print >>f, "ProjectUser: %s" % smbuser
462                print >>f, "ProjectName: %s" % smbproj
463
464        def client_hide_hosts(f, s):
465            for a in s.get('fedAttr', [ ]):
466                if a.get('attribute', "") == 'hosts':
467                    print >>f, 'Hide: %s' % a.get('value', "")
468
469        client_service_out = {
470                'SMB': client_smb,
471                'tmcd': client_null,
472                'seer': client_null,
473                'userconfig': client_null,
474                'project_export': client_null,
475                'seer_master': client_seer_master,
476                'hide_hosts': client_hide_hosts,
477            }
478
479        def client_seer_master_export(f, s):
480            print >>f, "AddedNode: seer-master"
481
482        def client_seer_local_export(f, s):
483            print >>f, "AddedNode: control"
484
485        client_export_service_out = {
486                'seer_master': client_seer_master_export,
487                'local_seer_control': client_seer_local_export,
488            }
489
490        def server_port(f, s):
491            p = urlparse(s.get('server', 'http://localhost'))
492            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
493
494        def server_null(f,s): pass
495
496        def server_seer(f, s):
497            print >>f, 'seer: true'
498
499        server_service_out = {
500                'SMB': server_port,
501                'tmcd': server_port,
502                'userconfig': server_null,
503                'project_export': server_null,
504                'seer': server_seer,
505                'seer_master': server_port,
506                'hide_hosts': server_null,
507            }
508        # XXX: end un hardcode this
509
510
511        seer_out = False
512        client_out = False
[6fd2b29]513        control_gw = None
[42cd8a7]514        for e in [ e for e in topo.elements \
515                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
516            myname = e.name
517            type = e.get_attribute('portal_type')
518
519            info = conninfo_to_dict(myname, connInfo)
520
521            if not info:
522                raise service_error(service_error.req,
523                        "No connectivity info for %s" % myname)
524
525            # Translate to physical name (ProtoGENI doesn't have DNS)
526            physname = nodes.get(myname, { }).get('hostname', None)
527            peer = info.get('peer', "")
528            ldomain = self.domain
529            ssh_port = info.get('ssh_port', 22)
530
531            # Collect this for the client.conf file
532            if 'masterexperiment' in info:
533                mproj, meid = info['masterexperiment'].split("/", 1)
534
535            active = info.get('active', 'False')
536
537            if type in ('control', 'both'):
538                testbed = e.get_attribute('testbed')
539                control_gw = myname
540
541            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
542            tunnelconfig = self.tunnel_config
543            try:
544                f = open(cfn, "w")
545                if active == 'True':
546                    print >>f, "active: True"
547                    print >>f, "ssh_port: %s" % ssh_port
548                    if type in ('control', 'both'):
549                        for s in [s for s in services \
550                                if s.get('name', "") in self.imports]:
551                            server_service_out[s['name']](f, s)
552
553                if tunnelconfig:
554                    print >>f, "tunnelip: %s" % tunnelconfig
555                print >>f, "peer: %s" % peer.lower()
556                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
557                        pubkey_base
558                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
559                        secretkey_base
560                f.close()
561            except EnvironmentError, e:
562                raise service_error(service_error.internal,
563                        "Can't write protal config %s: %s" % (cfn, e))
564
565        # Done with portals, write the client config file.
566        try:
567            f = open("%s/client.conf" % tmpdir, "w")
568            if control_gw:
569                print >>f, "ControlGateway: %s" % physname.lower()
570            for s in services:
571                if s.get('name',"") in self.imports and \
572                        s.get('visibility','') == 'import':
573                    client_service_out[s['name']](f, s)
574                if s.get('name', '') in self.exports and \
575                        s.get('visibility', '') == 'export' and \
576                        s['name'] in client_export_service_out:
577                    client_export_service_out[s['name']](f, s)
578            # Seer uses this.
579            if mproj and meid:
580                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
581            f.close()
582        except EnvironmentError, e:
583            raise service_error(service_error.internal,
584                    "Cannot write client.conf: %s" %s)
585
586
587
588    def export_store_info(self, cf, nodes, ssh_port, connInfo):
589        """
590        For the export requests in the connection info, install the peer names
591        at the experiment controller via SetValue calls.
592        """
593
594        for c in connInfo:
595            for p in [ p for p in c.get('parameter', []) \
596                    if p.get('type', '') == 'output']:
597
598                if p.get('name', '') == 'peer':
599                    k = p.get('key', None)
600                    surl = p.get('store', None)
601                    if surl and k and k.index('/') != -1:
602                        if self.create_debug:
603                            req = { 'name': k, 'value': 'debug' }
604                            self.call_SetValue(surl, req, cf)
605                        else:
606                            n = nodes.get(k[k.index('/')+1:], { })
607                            value = n.get('hostname', None)
608                            if value:
609                                req = { 'name': k, 'value': value }
610                                self.call_SetValue(surl, req, cf)
611                            else:
612                                self.log.error("No hostname for %s" % \
613                                        k[k.index('/'):])
614                    else:
615                        self.log.error("Bad export request: %s" % p)
616                elif p.get('name', '') == 'ssh_port':
617                    k = p.get('key', None)
618                    surl = p.get('store', None)
619                    if surl and k:
620                        req = { 'name': k, 'value': ssh_port }
621                        self.call_SetValue(surl, req, cf)
622                    else:
623                        self.log.error("Bad export request: %s" % p)
624                else:
625
626                    self.log.error("Unknown export parameter: %s" % \
627                            p.get('name'))
628                    continue
629
[37ed9a5]630    def write_node_config_script(self, elem, node, user, pubkey,
631            secretkey, stagingdir, tmpdir):
632        """
633        Write out the configuration script that is to run on the node
634        represented by elem in the topology.  This is called
635        once per node to configure.
636        """
637        # These little functions/functors just make things more readable.  Each
638        # one encapsulates a small task of copying software files or installing
639        # them.
[42cd8a7]640        class stage_file_type:
[37ed9a5]641            """
642            Write code copying file sfrom the staging host to the host on which
643            this will run.
644            """
[42cd8a7]645            def __init__(self, user, host, stagingdir):
646                self.user = user
647                self.host = host
648                self.stagingdir = stagingdir
649                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
650                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
651
652            def __call__(self, script, file, dest="."):
653                # If the file is a full pathname, do not use stagingdir
654                if file.find('/') == -1:
655                    file = "%s/%s" % (self.stagingdir, file)
656                print >>script, "%s %s@%s:%s %s" % \
657                        (self.scp, self.user, self.host, file, dest)
658
659        def install_tar(script, loc, base):
[37ed9a5]660            """
661            Print code to script to install a tarfile in loc.
662            """
[42cd8a7]663            tar = "/bin/tar"
664            mkdir="/bin/mkdir"
665
666            print >>script, "%s -p %s" % (mkdir, loc)
667            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
668
669        def install_rpm(script, base):
[37ed9a5]670            """
671            Print code to script to install an rpm
672            """
[42cd8a7]673            rpm = "/bin/rpm"
674            print >>script, "%s --install %s" % (rpm, base)
675
[37ed9a5]676        ifconfig = "/sbin/ifconfig"
[814b5e5]677        findif = '/usr/local/etc/emulab/findif'
[37ed9a5]678        stage_file = stage_file_type(user, self.staging_host, stagingdir)
679        pname = node.get('hostname', None)
[42cd8a7]680        fed_dir = "/usr/local/federation"
681        fed_etc_dir = "%s/etc" % fed_dir
682        fed_bin_dir = "%s/bin" % fed_dir
683        fed_lib_dir = "%s/lib" % fed_dir
684
[37ed9a5]685        if pname:
686            sfile = "%s/%s.startup" % (tmpdir, pname)
687            script = open(sfile, "w")
688            # Reset the interfaces to the ones in the topo file
689            for i in [ i for i in elem.interface \
690                    if not i.get_attribute('portal')]:
[3cec20c]691                if 'interfaces' in node:
692                    pinf = node['interfaces'].get(i.name, None)
693                else:
694                    pinf = None
695
696                if 'mac' in node:
697                    pmac = node['mac'].get(i.name, None)
698                else:
699                    pmac = None
[37ed9a5]700                addr = i.get_attribute('ip4_address') 
701                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
[814b5e5]702                # The interface names in manifests are not to be trusted, so we
703                # find the interface to configure using the local node's script
704                # to match mac address to interface name.
705                if pinf and addr and pmac:
706                    print >>script, '# %s' % pinf
[37ed9a5]707                    print >>script, \
[814b5e5]708                            "%s `%s %s` %s netmask %s"  % \
709                            (ifconfig, findif, pmac, addr, netmask)
[37ed9a5]710                else:
711                    self.log.error("Missing interface or address for %s" \
712                            % i.name)
713               
714            for l, f in self.federation_software:
715                base = os.path.basename(f)
716                stage_file(script, base)
717                if l: install_tar(script, l, base)
718                else: install_rpm(script, base)
719
720            for s in elem.software:
721                s_base = s.location.rpartition('/')[2]
722                stage_file(script, s_base)
723                if s.install: install_tar(script, s.install, s_base)
724                else: install_rpm(script, s_base)
725
726            for f in ('hosts', pubkey, secretkey, 'client.conf', 
727                    'userconf'):
728                stage_file(script, f, fed_etc_dir)
729            if self.sshd:
730                stage_file(script, self.sshd, fed_bin_dir)
731            if self.sshd_config:
732                stage_file(script, self.sshd_config, fed_etc_dir)
733
734            # Look in tmpdir to get the names.  They've all been copied
735            # into the (remote) staging dir
736            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
737                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
738
[c2f92c5]739            # Done with staging, remove the identity used to stage
740            print >>script, "#/bin/rm .ssh/id_rsa"
[37ed9a5]741
742            # Start commands
743            if elem.get_attribute('portal') and self.portal_startcommand:
744                # Install portal software
745                for l, f in self.portal_software:
[42cd8a7]746                    base = os.path.basename(f)
747                    stage_file(script, base)
748                    if l: install_tar(script, l, base)
749                    else: install_rpm(script, base)
750
[37ed9a5]751                # Portals never have a user-specified start command
752                print >>script, self.portal_startcommand
753            elif self.node_startcommand:
754                # XXX: debug
[1b6cc95]755                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
[37ed9a5]756                # XXX: debug
757                if elem.get_attribute('startup'):
758                    print >>script, "%s \\$USER '%s'" % \
759                            (self.node_startcommand,
760                                    elem.get_attribute('startup'))
761                else:
762                    print >>script, self.node_startcommand
763            script.close()
764            return sfile, pname
765        else:
766            return None, None
[42cd8a7]767
[37ed9a5]768
769    def configure_nodes(self, segment_commands, topo, nodes, user, 
770            pubkey, secretkey, stagingdir, tmpdir):
771        """
772        For each node in the topology, generate a script file that copies
773        software onto it and installs it in the proper places and then runs the
774        startup command (including the federation commands.
775        """
776
777
778
779        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
780            vname = e.name
781            sfile, pname = self.write_node_config_script(e,
782                    nodes.get(vname, { }),
783                    user, pubkey, secretkey, stagingdir, tmpdir)
784            if sfile:
785                if not segment_commands.scp_file(sfile, user, pname):
[42cd8a7]786                    self.log.error("Could not copy script to %s" % pname)
787            else:
788                self.log.error("Unmapped node: %s" % vname)
789
790    def start_node(self, user, host, node, segment_commands):
[37ed9a5]791        """
792        Copy an identity to a node for the configuration script to be able to
793        import data and then run the startup script remotely.
794        """
[42cd8a7]795        # Place an identity on the node so that the copying can succeed
[c2f92c5]796        segment_commands.scp_file( segment_commands.ssh_privkey_file,
797                user, node, ".ssh/id_rsa")
[42cd8a7]798        segment_commands.ssh_cmd(user, node, 
799                "sudo /bin/sh ./%s.startup &" % node)
800
801    def start_nodes(self, user, host, nodes, segment_commands):
[37ed9a5]802        """
803        Start a thread to initialize each node and wait for them to complete.
804        Each thread runs start_node.
805        """
[42cd8a7]806        threads = [ ]
807        for n in nodes:
808            t = Thread(target=self.start_node, args=(user, host, n, 
809                segment_commands))
810            t.start()
811            threads.append(t)
812
813        done = [not t.isAlive() for t in threads]
814        while not all(done):
815            self.log.info("Waiting for threads %s" % done)
816            time.sleep(10)
817            done = [not t.isAlive() for t in threads]
818
[37ed9a5]819    def set_up_staging_filespace(self, segment_commands, user, host,
820            stagingdir):
[42cd8a7]821        """
[37ed9a5]822        Set up teh staging area on the staging machine.  To reduce the number
823        of ssh commands, we compose a script and execute it remotely.
[42cd8a7]824        """
825
826        self.log.info("[start_segment]: creating script file")
827        try:
828            sf, scriptname = tempfile.mkstemp()
829            scriptfile = os.fdopen(sf, 'w')
830        except EnvironmentError:
831            return False
832
833        scriptbase = os.path.basename(scriptname)
834
835        # Script the filesystem changes
836        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
837        print >>scriptfile, 'mkdir -p %s' % stagingdir
838        print >>scriptfile, "rm -f %s" % scriptbase
839        scriptfile.close()
840
841        # Move the script to the remote machine
842        # XXX: could collide tempfile names on the remote host
843        if segment_commands.scp_file(scriptname, user, host, scriptbase):
844            os.remove(scriptname)
845        else:
846            return False
847
848        # Execute the script (and the script's last line deletes it)
849        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
850            return False
851
[37ed9a5]852    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
853        """
854        Protogeni interactions take a context and a protogeni certificate.
855        This establishes both for later calls and returns them.
856        """
857        if os.access(certfile, os.R_OK):
858            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
859        else:
860            self.log.error("[start_segment]: Cannot read certfile: %s" % \
861                    certfile)
862            return None, None
863
[42cd8a7]864        try:
[88dbe63]865            gcred = segment_commands.slice_authority_call('GetCredential', 
866                    {}, ctxt)
867        except segment_commands.ProtoGENIError, e:
[42cd8a7]868            raise service_error(service_error.federant,
869                    "ProtoGENI: %s" % e)
[37ed9a5]870
871        return ctxt, gcred
872
873    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
874        """
875        Find a usable slice name by trying random ones until there's no
876        collision.
877        """
878
879        def random_slicename(user):
880            """
881            Return a random slicename by appending 5 letters to the username.
882            """
883            slicename = user
884            for i in range(0,5):
885                slicename += random.choice(string.ascii_letters)
886            return slicename
887
[42cd8a7]888        while True:
889            slicename = random_slicename(user)
890            try:
891                param = {
892                        'credential': gcred, 
893                        'hrn': slicename,
894                        'type': 'Slice'
895                        }
[3cec20c]896
897                if not self.create_debug:
898                    segment_commands.slice_authority_call('Resolve', param, 
899                            ctxt)
900                else:
901                    raise segment_commands.ProtoGENIError(0,0,'Debug')
[42cd8a7]902            except segment_commands.ProtoGENIError, e:
903                print e
904                break
905
[37ed9a5]906        return slicename
907
908    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
909        """
910        Create the slice and allocate resources.  If any of this stuff fails,
911        the allocations will time out on PG in short order, so we just raise
912        the service_error.  Return the slice and sliver credentials as well as
913        the manifest.
914        """
[42cd8a7]915        try:
916            param = {
917                    'credential': gcred, 
918                    'hrn': slicename,
919                    'type': 'Slice'
920                    }
[88dbe63]921            slice_cred = segment_commands.slice_authority_call('Register',
[37ed9a5]922                    param, ctxt)
[d49c11c]923            # Resolve the slice to get the URN that PG has assigned it.
924            param = {
925                    'credential': gcred,
926                    'type': 'Slice',
927                    'hrn': slicename
928                    }
[88dbe63]929            data = segment_commands.slice_authority_call('Resolve', param,
[d49c11c]930                    ctxt)
931            if 'urn' in data:
932                slice_urn = data['urn']
933            else:
934                raise service_error(service_error.federant, 
[3cec20c]935                        "No URN returned for slice %s" % slicename)
[4875e93]936
937            if 'creator_urn' in data:
938                creator_urn = data['creator_urn']
939            else:
940                raise service_error(service_error.federant, 
[3cec20c]941                        "No creator URN returned for slice %s" % slicename)
[42cd8a7]942            # Populate the ssh keys (let PG format them)
943            param = {
944                    'credential': gcred, 
945                    }
[88dbe63]946            keys =  segment_commands.slice_authority_call('GetKeys', param, 
[37ed9a5]947                    ctxt)
[d49c11c]948            # Create a Sliver
[42cd8a7]949            param = {
[d49c11c]950                    'credentials': [ slice_cred ],
951                    'rspec': rspec, 
[4875e93]952                    'users': [ {
953                        'urn': creator_urn,
954                        'keys': keys, 
955                        },
956                    ],
[d49c11c]957                    'slice_urn': slice_urn,
[42cd8a7]958                    }
[208797c]959            rv = segment_commands.component_manager_call(
[d49c11c]960                    'CreateSliver', param, ctxt)
[208797c]961
962            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
963            # API hands back a sliver credential.  This just finds the format
964            # of choice.
965            if isinstance(rv, tuple):
966                manifest = rv[1]
967            else:
968                manifest = rv
[42cd8a7]969        except segment_commands.ProtoGENIError, e:
970            raise service_error(service_error.federant,
971                    "ProtoGENI: %s %s" % (e.code, e))
972
[add53ea]973        return (slice_urn, slice_cred, manifest, rspec)
[37ed9a5]974
[4875e93]975    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
[d49c11c]976            timeout=None):
[37ed9a5]977        """
978        Wait for the given slice to finish its startup.  Return the final
979        status.
980        """
[d49c11c]981        completed_states = ('failed', 'ready')
982        status = 'changing'
[c2f92c5]983        if timeout is not None:
984            end = time.time() + timeout
[37ed9a5]985        try:
[d49c11c]986            while status not in completed_states:
[37ed9a5]987                param = { 
[4875e93]988                        'credentials': [ slice_cred ],
[d49c11c]989                        'slice_urn': slice_urn,
[37ed9a5]990                        }
[88dbe63]991                r = segment_commands.component_manager_call(
[d49c11c]992                        'SliverStatus', param, ctxt)
[208797c]993                # GENIAPI uses geni_status as the key, so check for both
994                status = r.get('status', r.get('geni_status','changing'))
[d49c11c]995                if status not in completed_states:
[c2f92c5]996                    if timeout is not None and time.time() > end:
997                        return 'timeout'
[37ed9a5]998                    time.sleep(30)
999        except segment_commands.ProtoGENIError, e:
1000            raise service_error(service_error.federant,
1001                    "ProtoGENI: %s %s" % (e.code, e))
1002
1003        return status
1004
[d49c11c]1005    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
[37ed9a5]1006        """
1007        Delete the slice resources.  An error from the service is ignores,
1008        because the soft state will go away anyway.
1009        """
1010        try:
[d49c11c]1011            param = { 
1012                    'credentials': [ slice_cred, ],
1013                    'slice_urn': slice_urn,
1014                    }
[88dbe63]1015            segment_commands.component_manager_call('DeleteSlice',
[37ed9a5]1016                    param, ctxt)
1017        except segment_commands.ProtoGENIError, e:
1018            self.log.warn("ProtoGENI: %s" % e)
1019
1020
1021
1022    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1023            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1024            export_certfile, topo, connInfo, services, timeout=0):
1025        """
1026        Start a sub-experiment on a federant.
1027
1028        Get the current state, modify or create as appropriate, ship data
1029        and configs and start the experiment.  There are small ordering
1030        differences based on the initial state of the sub-experiment.
1031        """
1032
1033        # Local software dir
1034        lsoftdir = "%s/software" % tmpdir
1035        host = self.staging_host
1036
1037        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1038                certfile, certpw)
1039
[814b5e5]1040        if not ctxt: return False, {}
[37ed9a5]1041
1042        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1043        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1044        self.log.info("Creating %s" % slicename)
[add53ea]1045        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
[37ed9a5]1046                segment_commands, slicename, rspec, gcred, ctxt)
1047
[42cd8a7]1048        # With manifest in hand, we can export the portal node names.
1049        if self.create_debug: nodes = self.fake_manifest(topo)
1050        else: nodes = self.manifest_to_dict(manifest)
1051
1052        self.export_store_info(export_certfile, nodes, self.ssh_port,
1053                connInfo)
1054        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1055                ename, connInfo, services, nodes)
1056
1057        # Copy software to the staging machine (done after generation to copy
1058        # those, too)
1059        for d in (tmpdir, lsoftdir):
1060            if os.path.isdir(d):
1061                for f in os.listdir(d):
1062                    if not os.path.isdir("%s/%s" % (d, f)):
1063                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1064                                user, host, "%s/%s" % (stagingdir, f)):
1065                            self.log.error("Scp failed")
[814b5e5]1066                            return False, {}
[42cd8a7]1067
1068
1069        # Now we wait for the nodes to start on PG
[4875e93]1070        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
[d49c11c]1071                ctxt, timeout=300)
[42cd8a7]1072        if status == 'failed':
1073            self.log.error('Sliver failed to start on ProtoGENI')
[d49c11c]1074            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
[814b5e5]1075            return False, {}
[c2f92c5]1076        elif status == 'timeout':
1077            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
[d49c11c]1078            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
[814b5e5]1079            return False, {}
[42cd8a7]1080        else:
[37ed9a5]1081            # All good: save ProtoGENI info in shared state
[42cd8a7]1082            self.state_lock.acquire()
[d49c11c]1083            self.allocation[aid]['slice_urn'] = slice_urn
[42cd8a7]1084            self.allocation[aid]['slice_name'] = slicename
1085            self.allocation[aid]['slice_credential'] = slice_cred
1086            self.allocation[aid]['manifest'] = manifest
[add53ea]1087            self.allocation[aid]['rspec'] = rspec
[42cd8a7]1088            self.allocation[aid]['certfile'] = certfile
1089            self.allocation[aid]['certpw'] = certpw
1090            self.write_state()
1091            self.state_lock.release()
1092
1093        # Now we have configuration to do for ProtoGENI
[37ed9a5]1094        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1095                secretkey, stagingdir, tmpdir)
[42cd8a7]1096
1097        self.start_nodes(user, self.staging_host, 
1098                [ n.get('hostname', None) for n in nodes.values()],
1099                segment_commands)
1100
1101        # Everything has gone OK.
1102        return True, dict([(k, n.get('hostname', None)) \
1103                for k, n in nodes.items()])
1104
[3551ae1]1105    def generate_rspec(self, topo, softdir, connInfo):
[c2f92c5]1106
1107        # Force a useful image.  Without picking this the nodes can get
1108        # different images and there is great pain.
1109        def image_filter(e):
1110            if isinstance(e, topdl.Computer):
1111                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1112                        'image+emulab-ops//FEDORA10-STD" />'
1113            else:
1114                return ""
1115        # Main line of generate
[c119839]1116        t = topo.clone()
1117
1118        starts = { }
1119        # Weed out the things we aren't going to instantiate: Segments, portal
1120        # substrates, and portal interfaces.  (The copy in the for loop allows
1121        # us to delete from e.elements in side the for loop).  While we're
1122        # touching all the elements, we also adjust paths from the original
1123        # testbed to local testbed paths and put the federation commands and
1124        # startcommands into a dict so we can start them manually later.
1125        # ProtoGENI requires setup before the federation commands run, so we
1126        # run them by hand after we've seeded configurations.
1127        for e in [e for e in t.elements]:
1128            if isinstance(e, topdl.Segment):
1129                t.elements.remove(e)
1130            # Fix software paths
1131            for s in getattr(e, 'software', []):
1132                s.location = re.sub("^.*/", softdir, s.location)
1133            if isinstance(e, topdl.Computer):
[a65a65a]1134                if e.get_attribute('portal') and self.portal_startcommand:
[c119839]1135                    # Portals never have a user-specified start command
[a65a65a]1136                    starts[e.name] = self.portal_startcommand
1137                elif self.node_startcommand:
[c119839]1138                    if e.get_attribute('startup'):
[a65a65a]1139                        starts[e.name] = "%s \\$USER '%s'" % \
1140                                (self.node_startcommand, 
1141                                        e.get_attribute('startup'))
[c119839]1142                        e.remove_attribute('startup')
1143                    else:
[a65a65a]1144                        starts[e.name] = self.node_startcommand
[c119839]1145
1146                # Remove portal interfaces
1147                e.interface = [i for i in e.interface \
1148                        if not i.get_attribute('portal')]
1149
1150        t.substrates = [ s.clone() for s in t.substrates ]
1151        t.incorporate_elements()
1152
[c2f92c5]1153        # Customize the rspec output to use the image we like
1154        filters = [ image_filter ]
[c119839]1155
1156        # Convert to rspec and return it
1157        exp_rspec = topdl.topology_to_rspec(t, filters)
1158
1159        return exp_rspec
1160
[3551ae1]1161    def retrieve_software(self, topo, certfile, softdir):
1162        """
1163        Collect the software that nodes in the topology need loaded and stage
1164        it locally.  This implies retrieving it from the experiment_controller
1165        and placing it into softdir.  Certfile is used to prove that this node
1166        has access to that data (it's the allocation/segment fedid).  Finally
1167        local portal and federation software is also copied to the same staging
1168        directory for simplicity - all software needed for experiment creation
1169        is in softdir.
1170        """
1171        sw = set()
1172        for e in topo.elements:
1173            for s in getattr(e, 'software', []):
1174                sw.add(s.location)
1175        os.mkdir(softdir)
1176        for s in sw:
1177            self.log.debug("Retrieving %s" % s)
1178            try:
1179                get_url(s, certfile, softdir)
1180            except:
1181                t, v, st = sys.exc_info()
1182                raise service_error(service_error.internal,
1183                        "Error retrieving %s: %s" % (s, v))
1184
1185        # Copy local portal node software to the tempdir
1186        for s in (self.portal_software, self.federation_software):
1187            for l, f in s:
1188                base = os.path.basename(f)
1189                copy_file(f, "%s/%s" % (softdir, base))
1190
1191
1192    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1193        """
1194        Gather common configuration files, retrieve or create an experiment
1195        name and project name, and return the ssh_key filenames.  Create an
1196        allocation log bound to the state log variable as well.
1197        """
[c119839]1198        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
[3551ae1]1199        ename = None
1200        pubkey_base = None
1201        secretkey_base = None
1202        alloc_log = None
1203
1204        for a in attrs:
1205            if a['attribute'] in configs:
1206                try:
1207                    self.log.debug("Retrieving %s" % a['value'])
1208                    get_url(a['value'], certfile, tmpdir)
1209                except:
1210                    t, v, st = sys.exc_info()
1211                    raise service_error(service_error.internal,
1212                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1213            if a['attribute'] == 'ssh_pubkey':
1214                pubkey_base = a['value'].rpartition('/')[2]
1215            if a['attribute'] == 'ssh_secretkey':
1216                secretkey_base = a['value'].rpartition('/')[2]
1217            if a['attribute'] == 'experiment_name':
1218                ename = a['value']
1219
1220        if not ename:
1221            ename = ""
1222            for i in range(0,5):
1223                ename += random.choice(string.ascii_letters)
1224            self.log.warn("No experiment name: picked one randomly: %s" \
1225                    % ename)
1226
1227        self.state_lock.acquire()
1228        if self.allocation.has_key(aid):
1229            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1230            self.allocation[aid]['experiment'] = ename
1231            self.allocation[aid]['log'] = [ ]
1232            # Create a logger that logs to the experiment's state object as
1233            # well as to the main log file.
1234            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1235            h = logging.StreamHandler(
1236                    list_log.list_log(self.allocation[aid]['log']))
1237            # XXX: there should be a global one of these rather than
1238            # repeating the code.
1239            h.setFormatter(logging.Formatter(
1240                "%(asctime)s %(name)s %(message)s",
1241                        '%d %b %y %H:%M:%S'))
1242            alloc_log.addHandler(h)
1243            self.write_state()
1244        else:
1245            self.log.error("No allocation for %s!?" % aid)
1246        self.state_lock.release()
[c119839]1247
[3551ae1]1248        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1249                cpw, alloc_log)
1250
[42cd8a7]1251    def finalize_experiment(self, topo, nodes, aid, alloc_id):
[3551ae1]1252        # Copy the assigned names into the return topology
1253        rvtopo = topo.clone()
1254        embedding = [ ]
[42cd8a7]1255        for k, n in nodes.items():
[3551ae1]1256            embedding.append({ 
[42cd8a7]1257                'toponame': k,
[1b6cc95]1258                'physname': [n ],
[3551ae1]1259                })
1260        # Grab the log (this is some anal locking, but better safe than
1261        # sorry)
1262        self.state_lock.acquire()
1263        logv = "".join(self.allocation[aid]['log'])
1264        # It's possible that the StartSegment call gets retried (!).
1265        # if the 'started' key is in the allocation, we'll return it rather
1266        # than redo the setup.
1267        self.allocation[aid]['started'] = { 
1268                'allocID': alloc_id,
1269                'allocationLog': logv,
1270                'segmentdescription': { 
1271                    'topdldescription': rvtopo.to_dict() },
1272                'embedding': embedding,
1273                }
1274        retval = copy.deepcopy(self.allocation[aid]['started'])
1275        self.write_state()
1276        self.state_lock.release()
1277
1278        return retval
1279
1280    def StartSegment(self, req, fid):
[c119839]1281        err = None  # Any service_error generated after tmpdir is created
1282        rv = None   # Return value from segment creation
1283
1284        try:
1285            req = req['StartSegmentRequestBody']
[3551ae1]1286            topref = req['segmentdescription']['topdldescription']
[c119839]1287        except KeyError:
1288            raise service_error(service_error.req, "Badly formed request")
1289
1290        connInfo = req.get('connection', [])
1291        services = req.get('service', [])
1292        auth_attr = req['allocID']['fedid']
1293        aid = "%s" % auth_attr
1294        attrs = req.get('fedAttr', [])
1295        if not self.auth.check_attribute(fid, auth_attr):
1296            raise service_error(service_error.access, "Access denied")
[cd06678]1297        else:
1298            # See if this is a replay of an earlier succeeded StartSegment -
1299            # sometimes SSL kills 'em.  If so, replay the response rather than
1300            # redoing the allocation.
1301            self.state_lock.acquire()
1302            retval = self.allocation[aid].get('started', None)
1303            self.state_lock.release()
1304            if retval:
1305                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1306                        "replaying response")
1307                return retval
[c119839]1308
[3551ae1]1309        if topref:
1310            topo = topdl.Topology(**topref)
[c119839]1311        else:
1312            raise service_error(service_error.req, 
1313                    "Request missing segmentdescription'")
1314
1315        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1316        try:
1317            tmpdir = tempfile.mkdtemp(prefix="access-")
1318            softdir = "%s/software" % tmpdir
[d3c8759]1319        except EnvironmentError:
[c119839]1320            raise service_error(service_error.internal, "Cannot create tmp dir")
1321
1322        # Try block alllows us to clean up temporary files.
1323        try:
[3551ae1]1324            self.retrieve_software(topo, certfile, softdir)
1325            self.configure_userconf(services, tmpdir)
1326            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1327                cpw, alloc_log = self.initialize_experiment_info(attrs,
1328                        aid, certfile, tmpdir)
[c119839]1329            self.import_store_info(certfile, connInfo)
1330            rspec = self.generate_rspec(topo, "%s/%s/" \
[3551ae1]1331                    % (self.staging_dir, ename), connInfo)
[c119839]1332
[208797c]1333            segment_commands = self.api_proxy(keyfile=ssh_key,
[c119839]1334                    debug=self.create_debug, log=alloc_log,
1335                    ch_url = self.ch_url, sa_url=self.sa_url,
1336                    cm_url=self.cm_url)
[42cd8a7]1337            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1338                    pubkey_base, 
1339                    secretkey_base, ename,
[c119839]1340                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
[593e901]1341                    certfile, topo, connInfo, services)
[37ed9a5]1342        except EnvironmentError, e:
[3551ae1]1343            err = service_error(service_error.internal, "%s" % e)
[c119839]1344        except service_error, e:
1345            err = e
[3551ae1]1346        except:
1347            t, v, st = sys.exc_info()
1348            err = service_error(service_error.internal, "%s: %s" % \
1349                    (v, traceback.extract_tb(st)))
[c119839]1350
1351        # Walk up tmpdir, deleting as we go
[3551ae1]1352        if self.cleanup: self.remove_dirs(tmpdir)
1353        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
[c119839]1354
1355        if rv:
[42cd8a7]1356            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
[c119839]1357        elif err:
1358            raise service_error(service_error.federant,
1359                    "Swapin failed: %s" % err)
1360        else:
1361            raise service_error(service_error.federant, "Swapin failed")
1362
[42cd8a7]1363    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
[d49c11c]1364            slice_urn, certfile, certpw):
[42cd8a7]1365        """
1366        Stop a sub experiment by calling swapexp on the federant
1367        """
1368        host = self.staging_host
1369        rv = False
1370        try:
1371            # Clean out tar files: we've gone over quota in the past
1372            if stagingdir:
1373                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1374            if slice_cred:
1375                self.log.error('Removing Sliver on ProtoGENI')
1376                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
[d49c11c]1377                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
[42cd8a7]1378            return True
1379        except self.ssh_cmd_timeout:
1380            rv = False
1381        return rv
1382
[c119839]1383    def TerminateSegment(self, req, fid):
1384        try:
1385            req = req['TerminateSegmentRequestBody']
1386        except KeyError:
1387            raise service_error(service_error.req, "Badly formed request")
1388
1389        auth_attr = req['allocID']['fedid']
1390        aid = "%s" % auth_attr
1391        attrs = req.get('fedAttr', [])
1392        if not self.auth.check_attribute(fid, auth_attr):
1393            raise service_error(service_error.access, "Access denied")
1394
1395        self.state_lock.acquire()
1396        if self.allocation.has_key(aid):
1397            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1398            slice_cred = self.allocation[aid].get('slice_credential', None)
[d49c11c]1399            slice_urn = self.allocation[aid].get('slice_urn', None)
[c119839]1400            ename = self.allocation[aid].get('experiment', None)
1401        else:
1402            cf, user, ssh_key, cpw = (None, None, None, None)
1403            slice_cred = None
[4875e93]1404            slice_urn = None
[c119839]1405            ename = None
1406        self.state_lock.release()
1407
1408        if ename:
1409            staging = "%s/%s" % ( self.staging_dir, ename)
1410        else:
1411            self.log.warn("Can't find experiment name for %s" % aid)
1412            staging = None
1413
[208797c]1414        segment_commands = self.api_proxy(keyfile=ssh_key,
[42cd8a7]1415                debug=self.create_debug, ch_url = self.ch_url,
1416                sa_url=self.sa_url, cm_url=self.cm_url)
[d49c11c]1417        self.stop_segment(segment_commands, user, staging, slice_cred,
1418                slice_urn, cf, cpw)
[c119839]1419        return { 'allocID': req['allocID'] }
[dd3e38b]1420
[d49c11c]1421    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
[42cd8a7]1422            certfile, certpw):
[37ed9a5]1423        """
1424        Linear code through the segment renewal calls.
1425        """
[42cd8a7]1426        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1427        try:
1428            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1429                    time.gmtime(time.time() + interval))
[88dbe63]1430            cred = segment_commands.slice_authority_call('GetCredential', 
[37ed9a5]1431                    {}, ctxt)
[42cd8a7]1432
1433            param = {
1434                    'credential': scred,
1435                    'expiration': expiration
1436                    }
[88dbe63]1437            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
[42cd8a7]1438            param = {
1439                    'credential': cred,
[814b5e5]1440                    'urn': slice_urn,
[42cd8a7]1441                    'type': 'Slice',
1442                    }
[88dbe63]1443            new_scred = segment_commands.slice_authority_call('GetCredential',
[37ed9a5]1444                    param, ctxt)
[42cd8a7]1445
1446        except segment_commands.ProtoGENIError, e:
1447            self.log.error("Failed to extend slice %s: %s" % (name, e))
1448            return None
1449        try:
1450            param = {
[d49c11c]1451                    'credentials': [new_scred,],
1452                    'slice_urn': slice_urn,
[42cd8a7]1453                    }
[88dbe63]1454            r = segment_commands.component_manager_call('RenewSlice', param, 
1455                    ctxt)
[42cd8a7]1456        except segment_commands.ProtoGENIError, e:
1457            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1458
1459        return new_scred
1460   
1461
[dd3e38b]1462    def RenewSlices(self):
1463        self.log.info("Scanning for slices to renew")
1464        self.state_lock.acquire()
1465        aids = self.allocation.keys()
1466        self.state_lock.release()
1467
1468        for aid in aids:
1469            self.state_lock.acquire()
1470            if self.allocation.has_key(aid):
1471                name = self.allocation[aid].get('slice_name', None)
1472                scred = self.allocation[aid].get('slice_credential', None)
[d49c11c]1473                slice_urn = self.allocation[aid].get('slice_urn', None)
[dd3e38b]1474                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1475            else:
1476                name = None
1477                scred = None
1478            self.state_lock.release()
1479
[3551ae1]1480            if not os.access(cf, os.R_OK):
1481                self.log.error(
1482                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1483                continue
1484
[dd3e38b]1485            # There's a ProtoGENI slice associated with the segment; renew it.
[d49c11c]1486            if name and scred and slice_urn:
[208797c]1487                segment_commands = self.api_proxy(log=self.log, 
[dd3e38b]1488                        debug=self.create_debug, keyfile=ssh_key,
1489                        cm_url = self.cm_url, sa_url = self.sa_url,
1490                        ch_url = self.ch_url)
[42cd8a7]1491                new_scred = self.renew_segment(segment_commands, name, scred, 
[d49c11c]1492                        slice_urn, self.renewal_interval, cf, cpw)
[dd3e38b]1493                if new_scred:
1494                    self.log.info("Slice %s renewed until %s GMT" % \
1495                            (name, time.asctime(time.gmtime(\
1496                                time.time()+self.renewal_interval))))
1497                    self.state_lock.acquire()
1498                    if self.allocation.has_key(aid):
1499                        self.allocation[aid]['slice_credential'] = new_scred
[4875e93]1500                        self.write_state()
[dd3e38b]1501                    self.state_lock.release()
1502                else:
1503                    self.log.info("Failed to renew slice %s " % name)
1504
1505        # Let's do this all again soon.  (4 tries before the slices time out)   
1506        t = Timer(self.renewal_interval/4, self.RenewSlices)
1507        t.start()
Note: See TracBrowser for help on using the repository browser.