source: fedd/federation/protogeni_access.py @ b7a61ac

axis_examplecompt_changesinfo-ops
Last change on this file since b7a61ac was 3cec20c, checked in by Ted Faber <faber@…>, 14 years ago

ABAC integration and some minor fixes discovered along the 'create_debug' paths.

  • Property mode set to 100644
File size: 47.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from fedid import fedid, generate_fedid
20from authorizer import authorizer, abac_authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28from access import access_base
29from protogeni_proxy import protogeni_proxy
30from geniapi_proxy import geniapi_proxy
31
32import topdl
33import list_log
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    def __init__(self, config=None, auth=None):
52        """
53        Initializer.  Pulls parameters out of the ConfigParser's access section.
54        """
55
56        access_base.__init__(self, config, auth)
57
58        self.domain = config.get("access", "domain")
59        self.userconfdir = config.get("access","userconfdir")
60        self.userconfcmd = config.get("access","userconfcmd")
61        self.userconfurl = config.get("access","userconfurl")
62        self.federation_software = config.get("access", "federation_software")
63        self.portal_software = config.get("access", "portal_software")
64        self.ssh_port = config.get("access","ssh_port") or "22"
65        self.sshd = config.get("access","sshd")
66        self.sshd_config = config.get("access", "sshd_config")
67        self.access_type = config.get("access", "type")
68        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
69        self.staging_host = config.get("access", "staging_host") \
70                or "ops.emulab.net"
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74   
75        self.dragon_endpoint = config.get("access", "dragon")
76        self.dragon_vlans = config.get("access", "dragon_vlans")
77        self.deter_internal = config.get("access", "deter_internal")
78
79        self.tunnel_config = config.getboolean("access", "tunnel_config")
80        self.portal_command = config.get("access", "portal_command")
81        self.portal_image = config.get("access", "portal_image")
82        self.portal_type = config.get("access", "portal_type") or "pc"
83        self.portal_startcommand = config.get("access", "portal_startcommand")
84        self.node_startcommand = config.get("access", "node_startcommand")
85
86        self.federation_software = self.software_list(self.federation_software)
87        self.portal_software = self.software_list(self.portal_software)
88        self.local_seer_software = self.software_list(self.local_seer_software)
89
90        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
91        self.renewal_interval = int(self.renewal_interval) * 60
92
93        self.ch_url = config.get("access", "ch_url")
94        self.sa_url = config.get("access", "sa_url")
95        self.cm_url = config.get("access", "cm_url")
96
97        self.restricted = [ ]
98
99        # read_state in the base_class
100        self.state_lock.acquire()
101        for a  in ('allocation', 'projects', 'keys', 'types'):
102            if a not in self.state:
103                self.state[a] = { }
104        self.allocation = self.state['allocation']
105        self.projects = self.state['projects']
106        self.keys = self.state['keys']
107        self.types = self.state['types']
108        self.state_lock.release()
109
110
111        self.log = logging.getLogger("fedd.access")
112        set_log_level(config, "access", self.log)
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            self.access = { }
122            if accessdb:
123                self.legacy_read_access(accessdb, self.make_access_info)
124            # Add the ownership attributes to the authorizer.  Note that the
125            # indices of the allocation dict are strings, but the attributes are
126            # fedids, so there is a conversion.
127            self.state_lock.acquire()
128            for k in self.state.get('allocation', {}).keys():
129                for o in self.state['allocation'][k].get('owners', []):
130                    self.auth.set_attribute(o, fedid(hexstr=k))
131                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
132
133            self.state_lock.release()
134            self.lookup_access = self.legacy_lookup_access_base
135        elif self.auth_type == 'abac':
136            self.auth = abac_authorizer(load=self.auth_dir)
137            self.access = [ ]
138            if accessdb:
139                self.read_access(accessdb, self.make_access_info)
140        else:
141            raise service_error(service_error.internal, 
142                    "Unknown auth_type: %s" % self.auth_type)
143        api = config.get("access", "api") or "protogeni"
144        if api == "protogeni":
145            self.api_proxy = protogeni_proxy
146        elif api == "geniapi":
147            self.api_proxy = geniapi_proxy
148        else:
149            self.log.debug("Unknown interface, using protogeni")
150            self.api_proxy = protogeni_proxy
151
152        self.call_SetValue = service_caller('SetValue')
153        self.call_GetValue = service_caller('GetValue')
154        self.exports = {
155                'local_seer_control': self.export_local_seer,
156                'seer_master': self.export_seer_master,
157                'hide_hosts': self.export_hide_hosts,
158                }
159
160        if not self.local_seer_image or not self.local_seer_software or \
161                not self.local_seer_start:
162            if 'local_seer_control' in self.exports:
163                del self.exports['local_seer_control']
164
165        if not self.local_seer_image or not self.local_seer_software or \
166                not self.seer_master_start:
167            if 'seer_master' in self.exports:
168                del self.exports['seer_master']
169
170        self.RenewSlices()
171
172        self.soap_services = {\
173            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
174            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
175            'StartSegment': soap_handler("StartSegment", self.StartSegment),
176            'TerminateSegment': soap_handler("TerminateSegment", 
177                self.TerminateSegment),
178            }
179        self.xmlrpc_services =  {\
180            'RequestAccess': xmlrpc_handler('RequestAccess',
181                self.RequestAccess),
182            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
183                self.ReleaseAccess),
184            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
185            'TerminateSegment': xmlrpc_handler('TerminateSegment',
186                self.TerminateSegment),
187            }
188
189    @staticmethod
190    def make_access_info(s):
191        """
192        Split a string of the form (id, id, id, id) ito its constituent tuples
193        and return them as a tuple.  Use to import access info from the
194        access_db.
195        """
196
197        ss = s.strip()
198        if ss.startswith('(') and ss.endswith(')'):
199            l = [ s.strip() for s  in ss[1:-1].split(",")]
200            if len(l) == 4:
201                return tuple(l)
202            else:
203                raise self.parse_error(
204                        "Exactly 4 elements in access info required")
205        else:
206            raise self.parse_error("Expecting parenthezied values")
207
208
209    def get_handler(self, path, fid):
210        self.log.info("Get handler %s %s" % (path, fid))
211        if self.auth.check_attribute(fid, path) and self.userconfdir:
212            return ("%s/%s" % (self.userconfdir, path), "application/binary")
213        else:
214            return (None, None)
215
216    def build_access_response(self, alloc_id, services):
217        """
218        Create the SOAP response.
219
220        Build the dictionary description of the response and use
221        fedd_utils.pack_soap to create the soap message.  ap is the allocate
222        project message returned from a remote project allocation (even if that
223        allocation was done locally).
224        """
225        # Because alloc_id is already a fedd_services_types.IDType_Holder,
226        # there's no need to repack it
227        msg = { 
228                'allocID': alloc_id,
229                'fedAttr': [
230                    { 'attribute': 'domain', 'value': self.domain } , 
231                ]
232            }
233        if self.dragon_endpoint:
234            msg['fedAttr'].append({'attribute': 'dragon',
235                'value': self.dragon_endpoint})
236        if self.deter_internal:
237            msg['fedAttr'].append({'attribute': 'deter_internal',
238                'value': self.deter_internal})
239        #XXX: ??
240        if self.dragon_vlans:
241            msg['fedAttr'].append({'attribute': 'vlans',
242                'value': self.dragon_vlans})
243
244        if services:
245            msg['service'] = services
246        return msg
247
248    def RequestAccess(self, req, fid):
249        """
250        Handle the access request.
251        """
252
253        # The dance to get into the request body
254        if req.has_key('RequestAccessRequestBody'):
255            req = req['RequestAccessRequestBody']
256        else:
257            raise service_error(service_error.req, "No request!?")
258
259        if req.has_key('destinationTestbed'):
260            dt = unpack_id(req['destinationTestbed'])
261
262        # Request for this fedd
263        found, match, owners = self.lookup_access(req, fid)
264        services, svc_state = self.export_services(req.get('service',[]),
265                None, None)
266        # keep track of what's been added
267        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
268        aid = unicode(allocID)
269
270        self.state_lock.acquire()
271        self.allocation[aid] = { }
272        # The protoGENI certificate
273        self.allocation[aid]['credentials'] = found
274        # The list of owner FIDs
275        self.allocation[aid]['owners'] = owners
276        self.write_state()
277        self.state_lock.release()
278        self.auth.set_attribute(fid, allocID)
279        self.auth.set_attribute(allocID, allocID)
280        self.auth.save()
281
282        try:
283            f = open("%s/%s.pem" % (self.certdir, aid), "w")
284            print >>f, alloc_cert
285            f.close()
286        except EnvironmentError, e:
287            raise service_error(service_error.internal, 
288                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
289        return self.build_access_response({ 'fedid': allocID }, None)
290
291
292    def ReleaseAccess(self, req, fid):
293        # The dance to get into the request body
294        if req.has_key('ReleaseAccessRequestBody'):
295            req = req['ReleaseAccessRequestBody']
296        else:
297            raise service_error(service_error.req, "No request!?")
298
299        # Local request
300        try:
301            if req['allocID'].has_key('localname'):
302                auth_attr = aid = req['allocID']['localname']
303            elif req['allocID'].has_key('fedid'):
304                aid = unicode(req['allocID']['fedid'])
305                auth_attr = req['allocID']['fedid']
306            else:
307                raise service_error(service_error.req,
308                        "Only localnames and fedids are understood")
309        except KeyError:
310            raise service_error(service_error.req, "Badly formed request")
311
312        self.log.debug("[access] deallocation requested for %s", aid)
313        if not self.auth.check_attribute(fid, auth_attr):
314            self.log.debug("[access] deallocation denied for %s", aid)
315            raise service_error(service_error.access, "Access Denied")
316
317        self.state_lock.acquire()
318        if self.allocation.has_key(aid):
319            self.log.debug("Found allocation for %s" %aid)
320            del self.allocation[aid]
321            self.write_state()
322            self.state_lock.release()
323            # And remove the access cert
324            cf = "%s/%s.pem" % (self.certdir, aid)
325            self.log.debug("Removing %s" % cf)
326            os.remove(cf)
327            return { 'allocID': req['allocID'] } 
328        else:
329            self.state_lock.release()
330            raise service_error(service_error.req, "No such allocation")
331
332    def manifest_to_dict(self, manifest, ignore_debug=False):
333        """
334        Turn the manifest into a dict were each virtual nodename (i.e. the
335        topdl name) has an entry with the allocated machine in hostname and the
336        interfaces in 'interfaces'.  I love having XML parser code lying
337        around.
338        """
339        if self.create_debug and not ignore_debug: 
340            self.log.debug("Returning null manifest dict")
341            return { }
342
343        # The class allows us to keep a little state - the dict under
344        # consteruction and the current entry in that dict for the interface
345        # element code.
346        class manifest_parser:
347            def __init__(self):
348                self.d = { }
349                self.current_key=None
350
351            # If the element is a node, create a dict entry for it.  If it's an
352            # interface inside a node, add an entry in the interfaces list with
353            # the virtual name and component id.
354            def start_element(self, name, attrs):
355                if name == 'node':
356                    self.current_key = attrs.get('virtual_id',"")
357                    if self.current_key:
358                        self.d[self.current_key] = {
359                                'hostname': attrs.get('hostname', None),
360                                'interfaces': { },
361                                'mac': { }
362                                }
363                elif name == 'interface' and self.current_key:
364                    self.d[self.current_key]['interfaces']\
365                            [attrs.get('virtual_id','')] = \
366                            attrs.get('component_id', None)
367                elif name == 'interface_ref':
368                    # Collect mac address information from an interface_ref.
369                    # These appear after the node info has been parsed.
370                    nid = attrs.get('virtual_node_id', None)
371                    ifid = attrs.get('virtual_interface_id', None)
372                    mac = attrs.get('MAC', None)
373                    self.d[nid]['mac'][ifid] = mac
374            #  When a node is finished, clear current_key
375            def end_element(self, name):
376                if name == 'node': self.current_key = None
377
378        node = { }
379
380        mp = manifest_parser()
381        p = xml.parsers.expat.ParserCreate()
382        # These are bound to the class we just created
383        p.StartElementHandler = mp.start_element
384        p.EndElementHandler = mp.end_element
385
386        p.Parse(manifest)
387        # Make the node dict that the callers expect
388        for k in mp.d:
389            node[k] = mp.d.get('hostname', '')
390        return mp.d
391
392    def fake_manifest(self, topo):
393        """
394        Fake the output of manifest_to_dict with a bunch of generic node an
395        interface names, for debugging.
396        """
397        node = { }
398        for i, e in enumerate([ e for e in topo.elements \
399                if isinstance(e, topdl.Computer)]):
400            node[e.name] = { 
401                    'hostname': "node%03d" % i,
402                    'interfaces': { }
403                    }
404            for j, inf in enumerate(e.interface):
405                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
406
407        return node
408
409
410    def generate_portal_configs(self, topo, pubkey_base, 
411            secretkey_base, tmpdir, leid, connInfo, services, nodes):
412
413        def conninfo_to_dict(key, info):
414            """
415            Make a cpoy of the connection information about key, and flatten it
416            into a single dict by parsing out any feddAttrs.
417            """
418
419            rv = None
420            for i in info:
421                if key == i.get('portal', "") or \
422                        key in [e.get('element', "") \
423                        for e in i.get('member', [])]:
424                    rv = i.copy()
425                    break
426
427            else:
428                return rv
429
430            if 'fedAttr' in rv:
431                for a in rv['fedAttr']:
432                    attr = a.get('attribute', "")
433                    val = a.get('value', "")
434                    if attr and attr not in rv:
435                        rv[attr] = val
436                del rv['fedAttr']
437            return rv
438
439        # XXX: un hardcode this
440        def client_null(f, s):
441            print >>f, "Service: %s" % s['name']
442       
443        def client_seer_master(f, s):
444            print >>f, 'PortalAlias: seer-master'
445
446        def client_smb(f, s):
447            print >>f, "Service: %s" % s['name']
448            smbshare = None
449            smbuser = None
450            smbproj = None
451            for a in s.get('fedAttr', []):
452                if a.get('attribute', '') == 'SMBSHARE':
453                    smbshare = a.get('value', None)
454                elif a.get('attribute', '') == 'SMBUSER':
455                    smbuser = a.get('value', None)
456                elif a.get('attribute', '') == 'SMBPROJ':
457                    smbproj = a.get('value', None)
458
459            if all((smbshare, smbuser, smbproj)):
460                print >>f, "SMBshare: %s" % smbshare
461                print >>f, "ProjectUser: %s" % smbuser
462                print >>f, "ProjectName: %s" % smbproj
463
464        def client_hide_hosts(f, s):
465            for a in s.get('fedAttr', [ ]):
466                if a.get('attribute', "") == 'hosts':
467                    print >>f, 'Hide: %s' % a.get('value', "")
468
469        client_service_out = {
470                'SMB': client_smb,
471                'tmcd': client_null,
472                'seer': client_null,
473                'userconfig': client_null,
474                'project_export': client_null,
475                'seer_master': client_seer_master,
476                'hide_hosts': client_hide_hosts,
477            }
478
479        def client_seer_master_export(f, s):
480            print >>f, "AddedNode: seer-master"
481
482        def client_seer_local_export(f, s):
483            print >>f, "AddedNode: control"
484
485        client_export_service_out = {
486                'seer_master': client_seer_master_export,
487                'local_seer_control': client_seer_local_export,
488            }
489
490        def server_port(f, s):
491            p = urlparse(s.get('server', 'http://localhost'))
492            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
493
494        def server_null(f,s): pass
495
496        def server_seer(f, s):
497            print >>f, 'seer: true'
498
499        server_service_out = {
500                'SMB': server_port,
501                'tmcd': server_port,
502                'userconfig': server_null,
503                'project_export': server_null,
504                'seer': server_seer,
505                'seer_master': server_port,
506                'hide_hosts': server_null,
507            }
508        # XXX: end un hardcode this
509
510
511        seer_out = False
512        client_out = False
513        for e in [ e for e in topo.elements \
514                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
515            myname = e.name
516            type = e.get_attribute('portal_type')
517
518            info = conninfo_to_dict(myname, connInfo)
519
520            if not info:
521                raise service_error(service_error.req,
522                        "No connectivity info for %s" % myname)
523
524            # Translate to physical name (ProtoGENI doesn't have DNS)
525            physname = nodes.get(myname, { }).get('hostname', None)
526            peer = info.get('peer', "")
527            ldomain = self.domain
528            ssh_port = info.get('ssh_port', 22)
529
530            # Collect this for the client.conf file
531            if 'masterexperiment' in info:
532                mproj, meid = info['masterexperiment'].split("/", 1)
533
534            active = info.get('active', 'False')
535
536            if type in ('control', 'both'):
537                testbed = e.get_attribute('testbed')
538                control_gw = myname
539
540            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
541            tunnelconfig = self.tunnel_config
542            try:
543                f = open(cfn, "w")
544                if active == 'True':
545                    print >>f, "active: True"
546                    print >>f, "ssh_port: %s" % ssh_port
547                    if type in ('control', 'both'):
548                        for s in [s for s in services \
549                                if s.get('name', "") in self.imports]:
550                            server_service_out[s['name']](f, s)
551
552                if tunnelconfig:
553                    print >>f, "tunnelip: %s" % tunnelconfig
554                print >>f, "peer: %s" % peer.lower()
555                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
556                        pubkey_base
557                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
558                        secretkey_base
559                f.close()
560            except EnvironmentError, e:
561                raise service_error(service_error.internal,
562                        "Can't write protal config %s: %s" % (cfn, e))
563
564        # Done with portals, write the client config file.
565        try:
566            f = open("%s/client.conf" % tmpdir, "w")
567            if control_gw:
568                print >>f, "ControlGateway: %s" % physname.lower()
569            for s in services:
570                if s.get('name',"") in self.imports and \
571                        s.get('visibility','') == 'import':
572                    client_service_out[s['name']](f, s)
573                if s.get('name', '') in self.exports and \
574                        s.get('visibility', '') == 'export' and \
575                        s['name'] in client_export_service_out:
576                    client_export_service_out[s['name']](f, s)
577            # Seer uses this.
578            if mproj and meid:
579                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
580            f.close()
581        except EnvironmentError, e:
582            raise service_error(service_error.internal,
583                    "Cannot write client.conf: %s" %s)
584
585
586
587    def export_store_info(self, cf, nodes, ssh_port, connInfo):
588        """
589        For the export requests in the connection info, install the peer names
590        at the experiment controller via SetValue calls.
591        """
592
593        for c in connInfo:
594            for p in [ p for p in c.get('parameter', []) \
595                    if p.get('type', '') == 'output']:
596
597                if p.get('name', '') == 'peer':
598                    k = p.get('key', None)
599                    surl = p.get('store', None)
600                    if surl and k and k.index('/') != -1:
601                        if self.create_debug:
602                            req = { 'name': k, 'value': 'debug' }
603                            self.call_SetValue(surl, req, cf)
604                        else:
605                            n = nodes.get(k[k.index('/')+1:], { })
606                            value = n.get('hostname', None)
607                            if value:
608                                req = { 'name': k, 'value': value }
609                                self.call_SetValue(surl, req, cf)
610                            else:
611                                self.log.error("No hostname for %s" % \
612                                        k[k.index('/'):])
613                    else:
614                        self.log.error("Bad export request: %s" % p)
615                elif p.get('name', '') == 'ssh_port':
616                    k = p.get('key', None)
617                    surl = p.get('store', None)
618                    if surl and k:
619                        req = { 'name': k, 'value': ssh_port }
620                        self.call_SetValue(surl, req, cf)
621                    else:
622                        self.log.error("Bad export request: %s" % p)
623                else:
624
625                    self.log.error("Unknown export parameter: %s" % \
626                            p.get('name'))
627                    continue
628
629    def write_node_config_script(self, elem, node, user, pubkey,
630            secretkey, stagingdir, tmpdir):
631        """
632        Write out the configuration script that is to run on the node
633        represented by elem in the topology.  This is called
634        once per node to configure.
635        """
636        # These little functions/functors just make things more readable.  Each
637        # one encapsulates a small task of copying software files or installing
638        # them.
639        class stage_file_type:
640            """
641            Write code copying file sfrom the staging host to the host on which
642            this will run.
643            """
644            def __init__(self, user, host, stagingdir):
645                self.user = user
646                self.host = host
647                self.stagingdir = stagingdir
648                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
649                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
650
651            def __call__(self, script, file, dest="."):
652                # If the file is a full pathname, do not use stagingdir
653                if file.find('/') == -1:
654                    file = "%s/%s" % (self.stagingdir, file)
655                print >>script, "%s %s@%s:%s %s" % \
656                        (self.scp, self.user, self.host, file, dest)
657
658        def install_tar(script, loc, base):
659            """
660            Print code to script to install a tarfile in loc.
661            """
662            tar = "/bin/tar"
663            mkdir="/bin/mkdir"
664
665            print >>script, "%s -p %s" % (mkdir, loc)
666            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
667
668        def install_rpm(script, base):
669            """
670            Print code to script to install an rpm
671            """
672            rpm = "/bin/rpm"
673            print >>script, "%s --install %s" % (rpm, base)
674
675        ifconfig = "/sbin/ifconfig"
676        findif = '/usr/local/etc/emulab/findif'
677        stage_file = stage_file_type(user, self.staging_host, stagingdir)
678        pname = node.get('hostname', None)
679        fed_dir = "/usr/local/federation"
680        fed_etc_dir = "%s/etc" % fed_dir
681        fed_bin_dir = "%s/bin" % fed_dir
682        fed_lib_dir = "%s/lib" % fed_dir
683
684        if pname:
685            sfile = "%s/%s.startup" % (tmpdir, pname)
686            script = open(sfile, "w")
687            # Reset the interfaces to the ones in the topo file
688            for i in [ i for i in elem.interface \
689                    if not i.get_attribute('portal')]:
690                if 'interfaces' in node:
691                    pinf = node['interfaces'].get(i.name, None)
692                else:
693                    pinf = None
694
695                if 'mac' in node:
696                    pmac = node['mac'].get(i.name, None)
697                else:
698                    pmac = None
699                addr = i.get_attribute('ip4_address') 
700                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
701                # The interface names in manifests are not to be trusted, so we
702                # find the interface to configure using the local node's script
703                # to match mac address to interface name.
704                if pinf and addr and pmac:
705                    print >>script, '# %s' % pinf
706                    print >>script, \
707                            "%s `%s %s` %s netmask %s"  % \
708                            (ifconfig, findif, pmac, addr, netmask)
709                else:
710                    self.log.error("Missing interface or address for %s" \
711                            % i.name)
712               
713            for l, f in self.federation_software:
714                base = os.path.basename(f)
715                stage_file(script, base)
716                if l: install_tar(script, l, base)
717                else: install_rpm(script, base)
718
719            for s in elem.software:
720                s_base = s.location.rpartition('/')[2]
721                stage_file(script, s_base)
722                if s.install: install_tar(script, s.install, s_base)
723                else: install_rpm(script, s_base)
724
725            for f in ('hosts', pubkey, secretkey, 'client.conf', 
726                    'userconf'):
727                stage_file(script, f, fed_etc_dir)
728            if self.sshd:
729                stage_file(script, self.sshd, fed_bin_dir)
730            if self.sshd_config:
731                stage_file(script, self.sshd_config, fed_etc_dir)
732
733            # Look in tmpdir to get the names.  They've all been copied
734            # into the (remote) staging dir
735            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
736                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
737
738            # Done with staging, remove the identity used to stage
739            print >>script, "#/bin/rm .ssh/id_rsa"
740
741            # Start commands
742            if elem.get_attribute('portal') and self.portal_startcommand:
743                # Install portal software
744                for l, f in self.portal_software:
745                    base = os.path.basename(f)
746                    stage_file(script, base)
747                    if l: install_tar(script, l, base)
748                    else: install_rpm(script, base)
749
750                # Portals never have a user-specified start command
751                print >>script, self.portal_startcommand
752            elif self.node_startcommand:
753                # XXX: debug
754                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
755                # XXX: debug
756                if elem.get_attribute('startup'):
757                    print >>script, "%s \\$USER '%s'" % \
758                            (self.node_startcommand,
759                                    elem.get_attribute('startup'))
760                else:
761                    print >>script, self.node_startcommand
762            script.close()
763            return sfile, pname
764        else:
765            return None, None
766
767
768    def configure_nodes(self, segment_commands, topo, nodes, user, 
769            pubkey, secretkey, stagingdir, tmpdir):
770        """
771        For each node in the topology, generate a script file that copies
772        software onto it and installs it in the proper places and then runs the
773        startup command (including the federation commands.
774        """
775
776
777
778        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
779            vname = e.name
780            sfile, pname = self.write_node_config_script(e,
781                    nodes.get(vname, { }),
782                    user, pubkey, secretkey, stagingdir, tmpdir)
783            if sfile:
784                if not segment_commands.scp_file(sfile, user, pname):
785                    self.log.error("Could not copy script to %s" % pname)
786            else:
787                self.log.error("Unmapped node: %s" % vname)
788
789    def start_node(self, user, host, node, segment_commands):
790        """
791        Copy an identity to a node for the configuration script to be able to
792        import data and then run the startup script remotely.
793        """
794        # Place an identity on the node so that the copying can succeed
795        segment_commands.scp_file( segment_commands.ssh_privkey_file,
796                user, node, ".ssh/id_rsa")
797        segment_commands.ssh_cmd(user, node, 
798                "sudo /bin/sh ./%s.startup &" % node)
799
800    def start_nodes(self, user, host, nodes, segment_commands):
801        """
802        Start a thread to initialize each node and wait for them to complete.
803        Each thread runs start_node.
804        """
805        threads = [ ]
806        for n in nodes:
807            t = Thread(target=self.start_node, args=(user, host, n, 
808                segment_commands))
809            t.start()
810            threads.append(t)
811
812        done = [not t.isAlive() for t in threads]
813        while not all(done):
814            self.log.info("Waiting for threads %s" % done)
815            time.sleep(10)
816            done = [not t.isAlive() for t in threads]
817
818    def set_up_staging_filespace(self, segment_commands, user, host,
819            stagingdir):
820        """
821        Set up teh staging area on the staging machine.  To reduce the number
822        of ssh commands, we compose a script and execute it remotely.
823        """
824
825        self.log.info("[start_segment]: creating script file")
826        try:
827            sf, scriptname = tempfile.mkstemp()
828            scriptfile = os.fdopen(sf, 'w')
829        except EnvironmentError:
830            return False
831
832        scriptbase = os.path.basename(scriptname)
833
834        # Script the filesystem changes
835        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
836        print >>scriptfile, 'mkdir -p %s' % stagingdir
837        print >>scriptfile, "rm -f %s" % scriptbase
838        scriptfile.close()
839
840        # Move the script to the remote machine
841        # XXX: could collide tempfile names on the remote host
842        if segment_commands.scp_file(scriptname, user, host, scriptbase):
843            os.remove(scriptname)
844        else:
845            return False
846
847        # Execute the script (and the script's last line deletes it)
848        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
849            return False
850
851    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
852        """
853        Protogeni interactions take a context and a protogeni certificate.
854        This establishes both for later calls and returns them.
855        """
856        if os.access(certfile, os.R_OK):
857            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
858        else:
859            self.log.error("[start_segment]: Cannot read certfile: %s" % \
860                    certfile)
861            return None, None
862
863        try:
864            gcred = segment_commands.slice_authority_call('GetCredential', 
865                    {}, ctxt)
866        except segment_commands.ProtoGENIError, e:
867            raise service_error(service_error.federant,
868                    "ProtoGENI: %s" % e)
869
870        return ctxt, gcred
871
872    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
873        """
874        Find a usable slice name by trying random ones until there's no
875        collision.
876        """
877
878        def random_slicename(user):
879            """
880            Return a random slicename by appending 5 letters to the username.
881            """
882            slicename = user
883            for i in range(0,5):
884                slicename += random.choice(string.ascii_letters)
885            return slicename
886
887        while True:
888            slicename = random_slicename(user)
889            try:
890                param = {
891                        'credential': gcred, 
892                        'hrn': slicename,
893                        'type': 'Slice'
894                        }
895
896                if not self.create_debug:
897                    segment_commands.slice_authority_call('Resolve', param, 
898                            ctxt)
899                else:
900                    raise segment_commands.ProtoGENIError(0,0,'Debug')
901            except segment_commands.ProtoGENIError, e:
902                print e
903                break
904
905        return slicename
906
907    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
908        """
909        Create the slice and allocate resources.  If any of this stuff fails,
910        the allocations will time out on PG in short order, so we just raise
911        the service_error.  Return the slice and sliver credentials as well as
912        the manifest.
913        """
914        try:
915            param = {
916                    'credential': gcred, 
917                    'hrn': slicename,
918                    'type': 'Slice'
919                    }
920            slice_cred = segment_commands.slice_authority_call('Register',
921                    param, ctxt)
922            # Resolve the slice to get the URN that PG has assigned it.
923            param = {
924                    'credential': gcred,
925                    'type': 'Slice',
926                    'hrn': slicename
927                    }
928            data = segment_commands.slice_authority_call('Resolve', param,
929                    ctxt)
930            if 'urn' in data:
931                slice_urn = data['urn']
932            else:
933                raise service_error(service_error.federant, 
934                        "No URN returned for slice %s" % slicename)
935
936            if 'creator_urn' in data:
937                creator_urn = data['creator_urn']
938            else:
939                raise service_error(service_error.federant, 
940                        "No creator URN returned for slice %s" % slicename)
941            # Populate the ssh keys (let PG format them)
942            param = {
943                    'credential': gcred, 
944                    }
945            keys =  segment_commands.slice_authority_call('GetKeys', param, 
946                    ctxt)
947            # Create a Sliver
948            param = {
949                    'credentials': [ slice_cred ],
950                    'rspec': rspec, 
951                    'users': [ {
952                        'urn': creator_urn,
953                        'keys': keys, 
954                        },
955                    ],
956                    'slice_urn': slice_urn,
957                    }
958            rv = segment_commands.component_manager_call(
959                    'CreateSliver', param, ctxt)
960
961            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
962            # API hands back a sliver credential.  This just finds the format
963            # of choice.
964            if isinstance(rv, tuple):
965                manifest = rv[1]
966            else:
967                manifest = rv
968        except segment_commands.ProtoGENIError, e:
969            raise service_error(service_error.federant,
970                    "ProtoGENI: %s %s" % (e.code, e))
971
972        return (slice_urn, slice_cred, manifest, rspec)
973
974    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
975            timeout=None):
976        """
977        Wait for the given slice to finish its startup.  Return the final
978        status.
979        """
980        completed_states = ('failed', 'ready')
981        status = 'changing'
982        if timeout is not None:
983            end = time.time() + timeout
984        try:
985            while status not in completed_states:
986                param = { 
987                        'credentials': [ slice_cred ],
988                        'slice_urn': slice_urn,
989                        }
990                r = segment_commands.component_manager_call(
991                        'SliverStatus', param, ctxt)
992                # GENIAPI uses geni_status as the key, so check for both
993                status = r.get('status', r.get('geni_status','changing'))
994                if status not in completed_states:
995                    if timeout is not None and time.time() > end:
996                        return 'timeout'
997                    time.sleep(30)
998        except segment_commands.ProtoGENIError, e:
999            raise service_error(service_error.federant,
1000                    "ProtoGENI: %s %s" % (e.code, e))
1001
1002        return status
1003
1004    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
1005        """
1006        Delete the slice resources.  An error from the service is ignores,
1007        because the soft state will go away anyway.
1008        """
1009        try:
1010            param = { 
1011                    'credentials': [ slice_cred, ],
1012                    'slice_urn': slice_urn,
1013                    }
1014            segment_commands.component_manager_call('DeleteSlice',
1015                    param, ctxt)
1016        except segment_commands.ProtoGENIError, e:
1017            self.log.warn("ProtoGENI: %s" % e)
1018
1019
1020
1021    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1022            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1023            export_certfile, topo, connInfo, services, timeout=0):
1024        """
1025        Start a sub-experiment on a federant.
1026
1027        Get the current state, modify or create as appropriate, ship data
1028        and configs and start the experiment.  There are small ordering
1029        differences based on the initial state of the sub-experiment.
1030        """
1031
1032        # Local software dir
1033        lsoftdir = "%s/software" % tmpdir
1034        host = self.staging_host
1035
1036        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1037                certfile, certpw)
1038
1039        if not ctxt: return False, {}
1040
1041        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1042        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1043        self.log.info("Creating %s" % slicename)
1044        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1045                segment_commands, slicename, rspec, gcred, ctxt)
1046
1047        # With manifest in hand, we can export the portal node names.
1048        if self.create_debug: nodes = self.fake_manifest(topo)
1049        else: nodes = self.manifest_to_dict(manifest)
1050
1051        self.export_store_info(export_certfile, nodes, self.ssh_port,
1052                connInfo)
1053        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1054                ename, connInfo, services, nodes)
1055
1056        # Copy software to the staging machine (done after generation to copy
1057        # those, too)
1058        for d in (tmpdir, lsoftdir):
1059            if os.path.isdir(d):
1060                for f in os.listdir(d):
1061                    if not os.path.isdir("%s/%s" % (d, f)):
1062                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1063                                user, host, "%s/%s" % (stagingdir, f)):
1064                            self.log.error("Scp failed")
1065                            return False, {}
1066
1067
1068        # Now we wait for the nodes to start on PG
1069        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1070                ctxt, timeout=300)
1071        if status == 'failed':
1072            self.log.error('Sliver failed to start on ProtoGENI')
1073            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1074            return False, {}
1075        elif status == 'timeout':
1076            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1077            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1078            return False, {}
1079        else:
1080            # All good: save ProtoGENI info in shared state
1081            self.state_lock.acquire()
1082            self.allocation[aid]['slice_urn'] = slice_urn
1083            self.allocation[aid]['slice_name'] = slicename
1084            self.allocation[aid]['slice_credential'] = slice_cred
1085            self.allocation[aid]['manifest'] = manifest
1086            self.allocation[aid]['rspec'] = rspec
1087            self.allocation[aid]['certfile'] = certfile
1088            self.allocation[aid]['certpw'] = certpw
1089            self.write_state()
1090            self.state_lock.release()
1091
1092        # Now we have configuration to do for ProtoGENI
1093        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1094                secretkey, stagingdir, tmpdir)
1095
1096        self.start_nodes(user, self.staging_host, 
1097                [ n.get('hostname', None) for n in nodes.values()],
1098                segment_commands)
1099
1100        # Everything has gone OK.
1101        return True, dict([(k, n.get('hostname', None)) \
1102                for k, n in nodes.items()])
1103
1104    def generate_rspec(self, topo, softdir, connInfo):
1105
1106        # Force a useful image.  Without picking this the nodes can get
1107        # different images and there is great pain.
1108        def image_filter(e):
1109            if isinstance(e, topdl.Computer):
1110                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1111                        'image+emulab-ops//FEDORA10-STD" />'
1112            else:
1113                return ""
1114        # Main line of generate
1115        t = topo.clone()
1116
1117        starts = { }
1118        # Weed out the things we aren't going to instantiate: Segments, portal
1119        # substrates, and portal interfaces.  (The copy in the for loop allows
1120        # us to delete from e.elements in side the for loop).  While we're
1121        # touching all the elements, we also adjust paths from the original
1122        # testbed to local testbed paths and put the federation commands and
1123        # startcommands into a dict so we can start them manually later.
1124        # ProtoGENI requires setup before the federation commands run, so we
1125        # run them by hand after we've seeded configurations.
1126        for e in [e for e in t.elements]:
1127            if isinstance(e, topdl.Segment):
1128                t.elements.remove(e)
1129            # Fix software paths
1130            for s in getattr(e, 'software', []):
1131                s.location = re.sub("^.*/", softdir, s.location)
1132            if isinstance(e, topdl.Computer):
1133                if e.get_attribute('portal') and self.portal_startcommand:
1134                    # Portals never have a user-specified start command
1135                    starts[e.name] = self.portal_startcommand
1136                elif self.node_startcommand:
1137                    if e.get_attribute('startup'):
1138                        starts[e.name] = "%s \\$USER '%s'" % \
1139                                (self.node_startcommand, 
1140                                        e.get_attribute('startup'))
1141                        e.remove_attribute('startup')
1142                    else:
1143                        starts[e.name] = self.node_startcommand
1144
1145                # Remove portal interfaces
1146                e.interface = [i for i in e.interface \
1147                        if not i.get_attribute('portal')]
1148
1149        t.substrates = [ s.clone() for s in t.substrates ]
1150        t.incorporate_elements()
1151
1152        # Customize the rspec output to use the image we like
1153        filters = [ image_filter ]
1154
1155        # Convert to rspec and return it
1156        exp_rspec = topdl.topology_to_rspec(t, filters)
1157
1158        return exp_rspec
1159
1160    def retrieve_software(self, topo, certfile, softdir):
1161        """
1162        Collect the software that nodes in the topology need loaded and stage
1163        it locally.  This implies retrieving it from the experiment_controller
1164        and placing it into softdir.  Certfile is used to prove that this node
1165        has access to that data (it's the allocation/segment fedid).  Finally
1166        local portal and federation software is also copied to the same staging
1167        directory for simplicity - all software needed for experiment creation
1168        is in softdir.
1169        """
1170        sw = set()
1171        for e in topo.elements:
1172            for s in getattr(e, 'software', []):
1173                sw.add(s.location)
1174        os.mkdir(softdir)
1175        for s in sw:
1176            self.log.debug("Retrieving %s" % s)
1177            try:
1178                get_url(s, certfile, softdir)
1179            except:
1180                t, v, st = sys.exc_info()
1181                raise service_error(service_error.internal,
1182                        "Error retrieving %s: %s" % (s, v))
1183
1184        # Copy local portal node software to the tempdir
1185        for s in (self.portal_software, self.federation_software):
1186            for l, f in s:
1187                base = os.path.basename(f)
1188                copy_file(f, "%s/%s" % (softdir, base))
1189
1190
1191    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1192        """
1193        Gather common configuration files, retrieve or create an experiment
1194        name and project name, and return the ssh_key filenames.  Create an
1195        allocation log bound to the state log variable as well.
1196        """
1197        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1198        ename = None
1199        pubkey_base = None
1200        secretkey_base = None
1201        alloc_log = None
1202
1203        for a in attrs:
1204            if a['attribute'] in configs:
1205                try:
1206                    self.log.debug("Retrieving %s" % a['value'])
1207                    get_url(a['value'], certfile, tmpdir)
1208                except:
1209                    t, v, st = sys.exc_info()
1210                    raise service_error(service_error.internal,
1211                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1212            if a['attribute'] == 'ssh_pubkey':
1213                pubkey_base = a['value'].rpartition('/')[2]
1214            if a['attribute'] == 'ssh_secretkey':
1215                secretkey_base = a['value'].rpartition('/')[2]
1216            if a['attribute'] == 'experiment_name':
1217                ename = a['value']
1218
1219        if not ename:
1220            ename = ""
1221            for i in range(0,5):
1222                ename += random.choice(string.ascii_letters)
1223            self.log.warn("No experiment name: picked one randomly: %s" \
1224                    % ename)
1225
1226        self.state_lock.acquire()
1227        if self.allocation.has_key(aid):
1228            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1229            self.allocation[aid]['experiment'] = ename
1230            self.allocation[aid]['log'] = [ ]
1231            # Create a logger that logs to the experiment's state object as
1232            # well as to the main log file.
1233            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1234            h = logging.StreamHandler(
1235                    list_log.list_log(self.allocation[aid]['log']))
1236            # XXX: there should be a global one of these rather than
1237            # repeating the code.
1238            h.setFormatter(logging.Formatter(
1239                "%(asctime)s %(name)s %(message)s",
1240                        '%d %b %y %H:%M:%S'))
1241            alloc_log.addHandler(h)
1242            self.write_state()
1243        else:
1244            self.log.error("No allocation for %s!?" % aid)
1245        self.state_lock.release()
1246
1247        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1248                cpw, alloc_log)
1249
1250    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1251        # Copy the assigned names into the return topology
1252        rvtopo = topo.clone()
1253        embedding = [ ]
1254        for k, n in nodes.items():
1255            embedding.append({ 
1256                'toponame': k,
1257                'physname': [n ],
1258                })
1259        # Grab the log (this is some anal locking, but better safe than
1260        # sorry)
1261        self.state_lock.acquire()
1262        logv = "".join(self.allocation[aid]['log'])
1263        # It's possible that the StartSegment call gets retried (!).
1264        # if the 'started' key is in the allocation, we'll return it rather
1265        # than redo the setup.
1266        self.allocation[aid]['started'] = { 
1267                'allocID': alloc_id,
1268                'allocationLog': logv,
1269                'segmentdescription': { 
1270                    'topdldescription': rvtopo.to_dict() },
1271                'embedding': embedding,
1272                }
1273        retval = copy.deepcopy(self.allocation[aid]['started'])
1274        self.write_state()
1275        self.state_lock.release()
1276
1277        return retval
1278
1279    def StartSegment(self, req, fid):
1280        err = None  # Any service_error generated after tmpdir is created
1281        rv = None   # Return value from segment creation
1282
1283        try:
1284            req = req['StartSegmentRequestBody']
1285            topref = req['segmentdescription']['topdldescription']
1286        except KeyError:
1287            raise service_error(service_error.req, "Badly formed request")
1288
1289        connInfo = req.get('connection', [])
1290        services = req.get('service', [])
1291        auth_attr = req['allocID']['fedid']
1292        aid = "%s" % auth_attr
1293        attrs = req.get('fedAttr', [])
1294        if not self.auth.check_attribute(fid, auth_attr):
1295            raise service_error(service_error.access, "Access denied")
1296        else:
1297            # See if this is a replay of an earlier succeeded StartSegment -
1298            # sometimes SSL kills 'em.  If so, replay the response rather than
1299            # redoing the allocation.
1300            self.state_lock.acquire()
1301            retval = self.allocation[aid].get('started', None)
1302            self.state_lock.release()
1303            if retval:
1304                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1305                        "replaying response")
1306                return retval
1307
1308        if topref:
1309            topo = topdl.Topology(**topref)
1310        else:
1311            raise service_error(service_error.req, 
1312                    "Request missing segmentdescription'")
1313
1314        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1315        try:
1316            tmpdir = tempfile.mkdtemp(prefix="access-")
1317            softdir = "%s/software" % tmpdir
1318        except EnvironmentError:
1319            raise service_error(service_error.internal, "Cannot create tmp dir")
1320
1321        # Try block alllows us to clean up temporary files.
1322        try:
1323            self.retrieve_software(topo, certfile, softdir)
1324            self.configure_userconf(services, tmpdir)
1325            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1326                cpw, alloc_log = self.initialize_experiment_info(attrs,
1327                        aid, certfile, tmpdir)
1328            self.import_store_info(certfile, connInfo)
1329            rspec = self.generate_rspec(topo, "%s/%s/" \
1330                    % (self.staging_dir, ename), connInfo)
1331
1332            segment_commands = self.api_proxy(keyfile=ssh_key,
1333                    debug=self.create_debug, log=alloc_log,
1334                    ch_url = self.ch_url, sa_url=self.sa_url,
1335                    cm_url=self.cm_url)
1336            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1337                    pubkey_base, 
1338                    secretkey_base, ename,
1339                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1340                    certfile, topo, connInfo, services)
1341        except EnvironmentError, e:
1342            err = service_error(service_error.internal, "%s" % e)
1343        except service_error, e:
1344            err = e
1345        except:
1346            t, v, st = sys.exc_info()
1347            err = service_error(service_error.internal, "%s: %s" % \
1348                    (v, traceback.extract_tb(st)))
1349
1350        # Walk up tmpdir, deleting as we go
1351        if self.cleanup: self.remove_dirs(tmpdir)
1352        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1353
1354        if rv:
1355            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1356        elif err:
1357            raise service_error(service_error.federant,
1358                    "Swapin failed: %s" % err)
1359        else:
1360            raise service_error(service_error.federant, "Swapin failed")
1361
1362    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1363            slice_urn, certfile, certpw):
1364        """
1365        Stop a sub experiment by calling swapexp on the federant
1366        """
1367        host = self.staging_host
1368        rv = False
1369        try:
1370            # Clean out tar files: we've gone over quota in the past
1371            if stagingdir:
1372                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1373            if slice_cred:
1374                self.log.error('Removing Sliver on ProtoGENI')
1375                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1376                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1377            return True
1378        except self.ssh_cmd_timeout:
1379            rv = False
1380        return rv
1381
1382    def TerminateSegment(self, req, fid):
1383        try:
1384            req = req['TerminateSegmentRequestBody']
1385        except KeyError:
1386            raise service_error(service_error.req, "Badly formed request")
1387
1388        auth_attr = req['allocID']['fedid']
1389        aid = "%s" % auth_attr
1390        attrs = req.get('fedAttr', [])
1391        if not self.auth.check_attribute(fid, auth_attr):
1392            raise service_error(service_error.access, "Access denied")
1393
1394        self.state_lock.acquire()
1395        if self.allocation.has_key(aid):
1396            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1397            slice_cred = self.allocation[aid].get('slice_credential', None)
1398            slice_urn = self.allocation[aid].get('slice_urn', None)
1399            ename = self.allocation[aid].get('experiment', None)
1400        else:
1401            cf, user, ssh_key, cpw = (None, None, None, None)
1402            slice_cred = None
1403            slice_urn = None
1404            ename = None
1405        self.state_lock.release()
1406
1407        if ename:
1408            staging = "%s/%s" % ( self.staging_dir, ename)
1409        else:
1410            self.log.warn("Can't find experiment name for %s" % aid)
1411            staging = None
1412
1413        segment_commands = self.api_proxy(keyfile=ssh_key,
1414                debug=self.create_debug, ch_url = self.ch_url,
1415                sa_url=self.sa_url, cm_url=self.cm_url)
1416        self.stop_segment(segment_commands, user, staging, slice_cred,
1417                slice_urn, cf, cpw)
1418        return { 'allocID': req['allocID'] }
1419
1420    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1421            certfile, certpw):
1422        """
1423        Linear code through the segment renewal calls.
1424        """
1425        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1426        try:
1427            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1428                    time.gmtime(time.time() + interval))
1429            cred = segment_commands.slice_authority_call('GetCredential', 
1430                    {}, ctxt)
1431
1432            param = {
1433                    'credential': scred,
1434                    'expiration': expiration
1435                    }
1436            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1437            param = {
1438                    'credential': cred,
1439                    'urn': slice_urn,
1440                    'type': 'Slice',
1441                    }
1442            new_scred = segment_commands.slice_authority_call('GetCredential',
1443                    param, ctxt)
1444
1445        except segment_commands.ProtoGENIError, e:
1446            self.log.error("Failed to extend slice %s: %s" % (name, e))
1447            return None
1448        try:
1449            param = {
1450                    'credentials': [new_scred,],
1451                    'slice_urn': slice_urn,
1452                    }
1453            r = segment_commands.component_manager_call('RenewSlice', param, 
1454                    ctxt)
1455        except segment_commands.ProtoGENIError, e:
1456            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1457
1458        return new_scred
1459   
1460
1461    def RenewSlices(self):
1462        self.log.info("Scanning for slices to renew")
1463        self.state_lock.acquire()
1464        aids = self.allocation.keys()
1465        self.state_lock.release()
1466
1467        for aid in aids:
1468            self.state_lock.acquire()
1469            if self.allocation.has_key(aid):
1470                name = self.allocation[aid].get('slice_name', None)
1471                scred = self.allocation[aid].get('slice_credential', None)
1472                slice_urn = self.allocation[aid].get('slice_urn', None)
1473                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1474            else:
1475                name = None
1476                scred = None
1477            self.state_lock.release()
1478
1479            if not os.access(cf, os.R_OK):
1480                self.log.error(
1481                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1482                continue
1483
1484            # There's a ProtoGENI slice associated with the segment; renew it.
1485            if name and scred and slice_urn:
1486                segment_commands = self.api_proxy(log=self.log, 
1487                        debug=self.create_debug, keyfile=ssh_key,
1488                        cm_url = self.cm_url, sa_url = self.sa_url,
1489                        ch_url = self.ch_url)
1490                new_scred = self.renew_segment(segment_commands, name, scred, 
1491                        slice_urn, self.renewal_interval, cf, cpw)
1492                if new_scred:
1493                    self.log.info("Slice %s renewed until %s GMT" % \
1494                            (name, time.asctime(time.gmtime(\
1495                                time.time()+self.renewal_interval))))
1496                    self.state_lock.acquire()
1497                    if self.allocation.has_key(aid):
1498                        self.allocation[aid]['slice_credential'] = new_scred
1499                        self.write_state()
1500                    self.state_lock.release()
1501                else:
1502                    self.log.info("Failed to renew slice %s " % name)
1503
1504        # Let's do this all again soon.  (4 tries before the slices time out)   
1505        t = Timer(self.renewal_interval/4, self.RenewSlices)
1506        t.start()
Note: See TracBrowser for help on using the repository browser.