source: fedd/federation/protogeni_access.py @ 88dbe63

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 88dbe63 was 88dbe63, checked in by Ted Faber <faber@…>, 14 years ago

Expand the protogeni_proxy class for derivation

  • Property mode set to 100644
File size: 48.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17from M2Crypto.SSL import SSLError
18
19from util import *
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from access import access_base
31from proxy_segment import proxy_segment
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class protogeni_proxy(proxy_segment):
45    class ProtoGENIError(Exception): 
46        def __init__(self, op, code, output):
47            Exception.__init__(self, output)
48            self.op = op
49            self.code = code
50            self.output = output
51
52    def __init__(self, log=None, keyfile=None, debug=False, 
53            ch_url=None, sa_url=None, cm_url=None):
54        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
55
56        self.ProtoGENIError = protogeni_proxy.ProtoGENIError
57        self.ch_url = ch_url
58        self.sa_url = sa_url
59        self.cm_url = cm_url
60
61        self.call_SetValue = service_caller('SetValue')
62
63        self.debug_fail = ['Resolve']
64        self.debug_response = {
65                'RedeemTicket': ("XML blob1", "XML blob2"),
66                'SliceStatus': { 'status': 'ready' },
67            }
68
69
70    def pg_call(self, url, method, params, context):
71        max_retries = 5
72        retries = 0
73
74        s = service_caller(method, request_body_name="", strict=False)
75        self.log.debug("Calling %s %s" % (url, method))
76        if not self.debug:
77            while retries < max_retries:
78                r = s.call_xmlrpc_service(url, params, context=context)
79                code = r.get('code', -1)
80                if code == 0:
81                    # Success leaves the loop here
82                    return r.get('value', None)
83                elif code == 14 and retries +1 < max_retries:
84                    # Busy resource
85                    retries+= 1
86                    self.log.info("Resource busy, retrying in 30 secs")
87                    time.sleep(30)
88                else:
89                    # NB: out of retries falls through to here
90                    raise self.ProtoGENIError(op=method, 
91                            code=r.get('code', 'unknown'), 
92                            output=r.get('output', 'No output'))
93        else:
94            if method in self.debug_fail:
95                raise self.ProtoGENIError(op=method, code='unknown',
96                        output='No output')
97            elif self.debug_response.has_key(method):
98                return self.debug_response[method]
99            else:
100                return "%s XML blob" % method
101
102    def clearinghouse_call(self, method, params, context):
103        return self.pg_call(self.ch_url, method, params, context)
104
105    def slice_authority_call(self, method, params, context):
106        return self.pg_call(self.sa_url, method, params, context)
107
108    def component_manager_call(self, method, params, context):
109        return self.pg_call(self.cm_url, method, params, context)
110
111
112
113
114
115class access(access_base):
116    """
117    The implementation of access control based on mapping users to projects.
118
119    Users can be mapped to existing projects or have projects created
120    dynamically.  This implements both direct requests and proxies.
121    """
122
123    def __init__(self, config=None, auth=None):
124        """
125        Initializer.  Pulls parameters out of the ConfigParser's access section.
126        """
127
128        access_base.__init__(self, config, auth)
129
130        self.domain = config.get("access", "domain")
131        self.userconfdir = config.get("access","userconfdir")
132        self.userconfcmd = config.get("access","userconfcmd")
133        self.userconfurl = config.get("access","userconfurl")
134        self.federation_software = config.get("access", "federation_software")
135        self.portal_software = config.get("access", "portal_software")
136        self.ssh_port = config.get("access","ssh_port") or "22"
137        self.sshd = config.get("access","sshd")
138        self.sshd_config = config.get("access", "sshd_config")
139        self.access_type = config.get("access", "type")
140        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
141        self.staging_host = config.get("access", "staging_host") \
142                or "ops.emulab.net"
143        self.local_seer_software = config.get("access", "local_seer_software")
144        self.local_seer_image = config.get("access", "local_seer_image")
145        self.local_seer_start = config.get("access", "local_seer_start")
146   
147        self.dragon_endpoint = config.get("access", "dragon")
148        self.dragon_vlans = config.get("access", "dragon_vlans")
149        self.deter_internal = config.get("access", "deter_internal")
150
151        self.tunnel_config = config.getboolean("access", "tunnel_config")
152        self.portal_command = config.get("access", "portal_command")
153        self.portal_image = config.get("access", "portal_image")
154        self.portal_type = config.get("access", "portal_type") or "pc"
155        self.portal_startcommand = config.get("access", "portal_startcommand")
156        self.node_startcommand = config.get("access", "node_startcommand")
157
158        self.federation_software = self.software_list(self.federation_software)
159        self.portal_software = self.software_list(self.portal_software)
160        self.local_seer_software = self.software_list(self.local_seer_software)
161
162        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
163        self.renewal_interval = int(self.renewal_interval) * 60
164
165        self.ch_url = config.get("access", "ch_url")
166        self.sa_url = config.get("access", "sa_url")
167        self.cm_url = config.get("access", "cm_url")
168
169        self.restricted = [ ]
170
171        # read_state in the base_class
172        self.state_lock.acquire()
173        for a  in ('allocation', 'projects', 'keys', 'types'):
174            if a not in self.state:
175                self.state[a] = { }
176        self.allocation = self.state['allocation']
177        self.projects = self.state['projects']
178        self.keys = self.state['keys']
179        self.types = self.state['types']
180        # Add the ownership attributes to the authorizer.  Note that the
181        # indices of the allocation dict are strings, but the attributes are
182        # fedids, so there is a conversion.
183        for k in self.state.get('allocation', {}).keys():
184            for o in self.state['allocation'][k].get('owners', []):
185                self.auth.set_attribute(o, fedid(hexstr=k))
186            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
187
188        self.state_lock.release()
189
190
191        self.log = logging.getLogger("fedd.access")
192        set_log_level(config, "access", self.log)
193
194        self.access = { }
195        if config.has_option("access", "accessdb"):
196            self.read_access(config.get("access", "accessdb"), 
197                    access_obj=self.make_access_info)
198
199        self.lookup_access = self.lookup_access_base
200
201        self.call_SetValue = service_caller('SetValue')
202        self.call_GetValue = service_caller('GetValue')
203        self.exports = {
204                'local_seer_control': self.export_local_seer,
205                'seer_master': self.export_seer_master,
206                'hide_hosts': self.export_hide_hosts,
207                }
208
209        if not self.local_seer_image or not self.local_seer_software or \
210                not self.local_seer_start:
211            if 'local_seer_control' in self.exports:
212                del self.exports['local_seer_control']
213
214        if not self.local_seer_image or not self.local_seer_software or \
215                not self.seer_master_start:
216            if 'seer_master' in self.exports:
217                del self.exports['seer_master']
218
219        self.RenewSlices()
220
221        self.soap_services = {\
222            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
223            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
224            'StartSegment': soap_handler("StartSegment", self.StartSegment),
225            'TerminateSegment': soap_handler("TerminateSegment", 
226                self.TerminateSegment),
227            }
228        self.xmlrpc_services =  {\
229            'RequestAccess': xmlrpc_handler('RequestAccess',
230                self.RequestAccess),
231            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
232                self.ReleaseAccess),
233            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
234            'TerminateSegment': xmlrpc_handler('TerminateSegment',
235                self.TerminateSegment),
236            }
237
238    @staticmethod
239    def make_access_info(s):
240        """
241        Split a string of the form (id, id, id, id) ito its constituent tuples
242        and return them as a tuple.  Use to import access info from the
243        access_db.
244        """
245
246        ss = s.strip()
247        if ss.startswith('(') and ss.endswith(')'):
248            l = [ s.strip() for s  in ss[1:-1].split(",")]
249            if len(l) == 4:
250                return tuple(l)
251            else:
252                raise self.parse_error(
253                        "Exactly 4 elements in access info required")
254        else:
255            raise self.parse_error("Expecting parenthezied values")
256
257
258    def get_handler(self, path, fid):
259        self.log.info("Get handler %s %s" % (path, fid))
260        if self.auth.check_attribute(fid, path) and self.userconfdir:
261            return ("%s/%s" % (self.userconfdir, path), "application/binary")
262        else:
263            return (None, None)
264
265    def build_access_response(self, alloc_id, services):
266        """
267        Create the SOAP response.
268
269        Build the dictionary description of the response and use
270        fedd_utils.pack_soap to create the soap message.  ap is the allocate
271        project message returned from a remote project allocation (even if that
272        allocation was done locally).
273        """
274        # Because alloc_id is already a fedd_services_types.IDType_Holder,
275        # there's no need to repack it
276        msg = { 
277                'allocID': alloc_id,
278                'fedAttr': [
279                    { 'attribute': 'domain', 'value': self.domain } , 
280                ]
281            }
282        if self.dragon_endpoint:
283            msg['fedAttr'].append({'attribute': 'dragon',
284                'value': self.dragon_endpoint})
285        if self.deter_internal:
286            msg['fedAttr'].append({'attribute': 'deter_internal',
287                'value': self.deter_internal})
288        #XXX: ??
289        if self.dragon_vlans:
290            msg['fedAttr'].append({'attribute': 'vlans',
291                'value': self.dragon_vlans})
292
293        if services:
294            msg['service'] = services
295        return msg
296
297    def RequestAccess(self, req, fid):
298        """
299        Handle the access request.
300        """
301
302        # The dance to get into the request body
303        if req.has_key('RequestAccessRequestBody'):
304            req = req['RequestAccessRequestBody']
305        else:
306            raise service_error(service_error.req, "No request!?")
307
308        if req.has_key('destinationTestbed'):
309            dt = unpack_id(req['destinationTestbed'])
310
311        # Request for this fedd
312        found, match = self.lookup_access(req, fid)
313        services, svc_state = self.export_services(req.get('service',[]),
314                None, None)
315        # keep track of what's been added
316        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
317        aid = unicode(allocID)
318
319        self.state_lock.acquire()
320        self.allocation[aid] = { }
321        # The protoGENI certificate
322        self.allocation[aid]['credentials'] = found
323        # The list of owner FIDs
324        self.allocation[aid]['owners'] = [ fid ]
325        self.write_state()
326        self.state_lock.release()
327        self.auth.set_attribute(fid, allocID)
328        self.auth.set_attribute(allocID, allocID)
329
330        try:
331            f = open("%s/%s.pem" % (self.certdir, aid), "w")
332            print >>f, alloc_cert
333            f.close()
334        except EnvironmentError, e:
335            raise service_error(service_error.internal, 
336                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
337        return self.build_access_response({ 'fedid': allocID }, None)
338
339
340    def ReleaseAccess(self, req, fid):
341        # The dance to get into the request body
342        if req.has_key('ReleaseAccessRequestBody'):
343            req = req['ReleaseAccessRequestBody']
344        else:
345            raise service_error(service_error.req, "No request!?")
346
347        # Local request
348        try:
349            if req['allocID'].has_key('localname'):
350                auth_attr = aid = req['allocID']['localname']
351            elif req['allocID'].has_key('fedid'):
352                aid = unicode(req['allocID']['fedid'])
353                auth_attr = req['allocID']['fedid']
354            else:
355                raise service_error(service_error.req,
356                        "Only localnames and fedids are understood")
357        except KeyError:
358            raise service_error(service_error.req, "Badly formed request")
359
360        self.log.debug("[access] deallocation requested for %s", aid)
361        if not self.auth.check_attribute(fid, auth_attr):
362            self.log.debug("[access] deallocation denied for %s", aid)
363            raise service_error(service_error.access, "Access Denied")
364
365        self.state_lock.acquire()
366        if self.allocation.has_key(aid):
367            self.log.debug("Found allocation for %s" %aid)
368            del self.allocation[aid]
369            self.write_state()
370            self.state_lock.release()
371            # And remove the access cert
372            cf = "%s/%s.pem" % (self.certdir, aid)
373            self.log.debug("Removing %s" % cf)
374            os.remove(cf)
375            return { 'allocID': req['allocID'] } 
376        else:
377            self.state_lock.release()
378            raise service_error(service_error.req, "No such allocation")
379
380    def manifest_to_dict(self, manifest, ignore_debug=False):
381        """
382        Turn the manifest into a dict were each virtual nodename (i.e. the
383        topdl name) has an entry with the allocated machine in hostname and the
384        interfaces in 'interfaces'.  I love having XML parser code lying
385        around.
386        """
387        if self.create_debug and not ignore_debug: 
388            self.log.debug("Returning null manifest dict")
389            return { }
390
391        # The class allows us to keep a little state - the dict under
392        # consteruction and the current entry in that dict for the interface
393        # element code.
394        class manifest_parser:
395            def __init__(self):
396                self.d = { }
397                self.current_key=None
398
399            # If the element is a node, create a dict entry for it.  If it's an
400            # interface inside a node, add an entry in the interfaces list with
401            # the virtual name and component id.
402            def start_element(self, name, attrs):
403                if name == 'node':
404                    self.current_key = attrs.get('virtual_id',"")
405                    if self.current_key:
406                        self.d[self.current_key] = {
407                                'hostname': attrs.get('hostname', None),
408                                'interfaces': { }
409                                }
410                elif name == 'interface' and self.current_key:
411                    self.d[self.current_key]['interfaces']\
412                            [attrs.get('virtual_id','')] = \
413                            attrs.get('component_id', None)
414            #  When a node is finished, clear current_key
415            def end_element(self, name):
416                if name == 'node': self.current_key = None
417
418        node = { }
419
420        mp = manifest_parser()
421        p = xml.parsers.expat.ParserCreate()
422        # These are bound to the class we just created
423        p.StartElementHandler = mp.start_element
424        p.EndElementHandler = mp.end_element
425
426        p.Parse(manifest)
427        # Make the node dict that the callers expect
428        for k in mp.d:
429            node[k] = mp.d.get('hostname', '')
430        return mp.d
431
432    def fake_manifest(self, topo):
433        """
434        Fake the output of manifest_to_dict with a bunch of generic node an
435        interface names, for debugging.
436        """
437        node = { }
438        for i, e in enumerate([ e for e in topo.elements \
439                if isinstance(e, topdl.Computer)]):
440            node[e.name] = { 
441                    'hostname': "node%03d" % i,
442                    'interfaces': { }
443                    }
444            for j, inf in enumerate(e.interface):
445                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
446
447        return node
448
449
450    def generate_portal_configs(self, topo, pubkey_base, 
451            secretkey_base, tmpdir, leid, connInfo, services, nodes):
452
453        def conninfo_to_dict(key, info):
454            """
455            Make a cpoy of the connection information about key, and flatten it
456            into a single dict by parsing out any feddAttrs.
457            """
458
459            rv = None
460            for i in info:
461                if key == i.get('portal', "") or \
462                        key in [e.get('element', "") \
463                        for e in i.get('member', [])]:
464                    rv = i.copy()
465                    break
466
467            else:
468                return rv
469
470            if 'fedAttr' in rv:
471                for a in rv['fedAttr']:
472                    attr = a.get('attribute', "")
473                    val = a.get('value', "")
474                    if attr and attr not in rv:
475                        rv[attr] = val
476                del rv['fedAttr']
477            return rv
478
479        # XXX: un hardcode this
480        def client_null(f, s):
481            print >>f, "Service: %s" % s['name']
482       
483        def client_seer_master(f, s):
484            print >>f, 'PortalAlias: seer-master'
485
486        def client_smb(f, s):
487            print >>f, "Service: %s" % s['name']
488            smbshare = None
489            smbuser = None
490            smbproj = None
491            for a in s.get('fedAttr', []):
492                if a.get('attribute', '') == 'SMBSHARE':
493                    smbshare = a.get('value', None)
494                elif a.get('attribute', '') == 'SMBUSER':
495                    smbuser = a.get('value', None)
496                elif a.get('attribute', '') == 'SMBPROJ':
497                    smbproj = a.get('value', None)
498
499            if all((smbshare, smbuser, smbproj)):
500                print >>f, "SMBshare: %s" % smbshare
501                print >>f, "ProjectUser: %s" % smbuser
502                print >>f, "ProjectName: %s" % smbproj
503
504        def client_hide_hosts(f, s):
505            for a in s.get('fedAttr', [ ]):
506                if a.get('attribute', "") == 'hosts':
507                    print >>f, 'Hide: %s' % a.get('value', "")
508
509        client_service_out = {
510                'SMB': client_smb,
511                'tmcd': client_null,
512                'seer': client_null,
513                'userconfig': client_null,
514                'project_export': client_null,
515                'seer_master': client_seer_master,
516                'hide_hosts': client_hide_hosts,
517            }
518
519        def client_seer_master_export(f, s):
520            print >>f, "AddedNode: seer-master"
521
522        def client_seer_local_export(f, s):
523            print >>f, "AddedNode: control"
524
525        client_export_service_out = {
526                'seer_master': client_seer_master_export,
527                'local_seer_control': client_seer_local_export,
528            }
529
530        def server_port(f, s):
531            p = urlparse(s.get('server', 'http://localhost'))
532            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
533
534        def server_null(f,s): pass
535
536        def server_seer(f, s):
537            print >>f, 'seer: true'
538
539        server_service_out = {
540                'SMB': server_port,
541                'tmcd': server_port,
542                'userconfig': server_null,
543                'project_export': server_null,
544                'seer': server_seer,
545                'seer_master': server_port,
546                'hide_hosts': server_null,
547            }
548        # XXX: end un hardcode this
549
550
551        seer_out = False
552        client_out = False
553        for e in [ e for e in topo.elements \
554                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
555            myname = e.name
556            type = e.get_attribute('portal_type')
557
558            info = conninfo_to_dict(myname, connInfo)
559
560            if not info:
561                raise service_error(service_error.req,
562                        "No connectivity info for %s" % myname)
563
564            # Translate to physical name (ProtoGENI doesn't have DNS)
565            physname = nodes.get(myname, { }).get('hostname', None)
566            peer = info.get('peer', "")
567            ldomain = self.domain
568            ssh_port = info.get('ssh_port', 22)
569
570            # Collect this for the client.conf file
571            if 'masterexperiment' in info:
572                mproj, meid = info['masterexperiment'].split("/", 1)
573
574            active = info.get('active', 'False')
575
576            if type in ('control', 'both'):
577                testbed = e.get_attribute('testbed')
578                control_gw = myname
579
580            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
581            tunnelconfig = self.tunnel_config
582            try:
583                f = open(cfn, "w")
584                if active == 'True':
585                    print >>f, "active: True"
586                    print >>f, "ssh_port: %s" % ssh_port
587                    if type in ('control', 'both'):
588                        for s in [s for s in services \
589                                if s.get('name', "") in self.imports]:
590                            server_service_out[s['name']](f, s)
591
592                if tunnelconfig:
593                    print >>f, "tunnelip: %s" % tunnelconfig
594                print >>f, "peer: %s" % peer.lower()
595                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
596                        pubkey_base
597                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
598                        secretkey_base
599                f.close()
600            except EnvironmentError, e:
601                raise service_error(service_error.internal,
602                        "Can't write protal config %s: %s" % (cfn, e))
603
604        # Done with portals, write the client config file.
605        try:
606            f = open("%s/client.conf" % tmpdir, "w")
607            if control_gw:
608                print >>f, "ControlGateway: %s" % physname.lower()
609            for s in services:
610                if s.get('name',"") in self.imports and \
611                        s.get('visibility','') == 'import':
612                    client_service_out[s['name']](f, s)
613                if s.get('name', '') in self.exports and \
614                        s.get('visibility', '') == 'export' and \
615                        s['name'] in client_export_service_out:
616                    client_export_service_out[s['name']](f, s)
617            # Seer uses this.
618            if mproj and meid:
619                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
620            f.close()
621        except EnvironmentError, e:
622            raise service_error(service_error.internal,
623                    "Cannot write client.conf: %s" %s)
624
625
626
627    def export_store_info(self, cf, nodes, ssh_port, connInfo):
628        """
629        For the export requests in the connection info, install the peer names
630        at the experiment controller via SetValue calls.
631        """
632
633        for c in connInfo:
634            for p in [ p for p in c.get('parameter', []) \
635                    if p.get('type', '') == 'output']:
636
637                if p.get('name', '') == 'peer':
638                    k = p.get('key', None)
639                    surl = p.get('store', None)
640                    if surl and k and k.index('/') != -1:
641                        if self.create_debug:
642                            req = { 'name': k, 'value': 'debug' }
643                            self.call_SetValue(surl, req, cf)
644                        else:
645                            n = nodes.get(k[k.index('/')+1:], { })
646                            value = n.get('hostname', None)
647                            if value:
648                                req = { 'name': k, 'value': value }
649                                self.call_SetValue(surl, req, cf)
650                            else:
651                                self.log.error("No hostname for %s" % \
652                                        k[k.index('/'):])
653                    else:
654                        self.log.error("Bad export request: %s" % p)
655                elif p.get('name', '') == 'ssh_port':
656                    k = p.get('key', None)
657                    surl = p.get('store', None)
658                    if surl and k:
659                        req = { 'name': k, 'value': ssh_port }
660                        self.call_SetValue(surl, req, cf)
661                    else:
662                        self.log.error("Bad export request: %s" % p)
663                else:
664
665                    self.log.error("Unknown export parameter: %s" % \
666                            p.get('name'))
667                    continue
668
669    def write_node_config_script(self, elem, node, user, pubkey,
670            secretkey, stagingdir, tmpdir):
671        """
672        Write out the configuration script that is to run on the node
673        represented by elem in the topology.  This is called
674        once per node to configure.
675        """
676        # These little functions/functors just make things more readable.  Each
677        # one encapsulates a small task of copying software files or installing
678        # them.
679        class stage_file_type:
680            """
681            Write code copying file sfrom the staging host to the host on which
682            this will run.
683            """
684            def __init__(self, user, host, stagingdir):
685                self.user = user
686                self.host = host
687                self.stagingdir = stagingdir
688                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
689                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
690
691            def __call__(self, script, file, dest="."):
692                # If the file is a full pathname, do not use stagingdir
693                if file.find('/') == -1:
694                    file = "%s/%s" % (self.stagingdir, file)
695                print >>script, "%s %s@%s:%s %s" % \
696                        (self.scp, self.user, self.host, file, dest)
697
698        def install_tar(script, loc, base):
699            """
700            Print code to script to install a tarfile in loc.
701            """
702            tar = "/bin/tar"
703            mkdir="/bin/mkdir"
704
705            print >>script, "%s -p %s" % (mkdir, loc)
706            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
707
708        def install_rpm(script, base):
709            """
710            Print code to script to install an rpm
711            """
712            rpm = "/bin/rpm"
713            print >>script, "%s --install %s" % (rpm, base)
714
715        ifconfig = "/sbin/ifconfig"
716        stage_file = stage_file_type(user, self.staging_host, stagingdir)
717        pname = node.get('hostname', None)
718        fed_dir = "/usr/local/federation"
719        fed_etc_dir = "%s/etc" % fed_dir
720        fed_bin_dir = "%s/bin" % fed_dir
721        fed_lib_dir = "%s/lib" % fed_dir
722
723        if pname:
724            sfile = "%s/%s.startup" % (tmpdir, pname)
725            script = open(sfile, "w")
726            # Reset the interfaces to the ones in the topo file
727            for i in [ i for i in elem.interface \
728                    if not i.get_attribute('portal')]:
729                pinf = node['interfaces'].get(i.name, None)
730                addr = i.get_attribute('ip4_address') 
731                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
732                if pinf and addr:
733                    print >>script, \
734                            "%s %s %s netmask %s"  % \
735                            (ifconfig, pinf, addr, netmask)
736                else:
737                    self.log.error("Missing interface or address for %s" \
738                            % i.name)
739               
740            for l, f in self.federation_software:
741                base = os.path.basename(f)
742                stage_file(script, base)
743                if l: install_tar(script, l, base)
744                else: install_rpm(script, base)
745
746            for s in elem.software:
747                s_base = s.location.rpartition('/')[2]
748                stage_file(script, s_base)
749                if s.install: install_tar(script, s.install, s_base)
750                else: install_rpm(script, s_base)
751
752            for f in ('hosts', pubkey, secretkey, 'client.conf', 
753                    'userconf'):
754                stage_file(script, f, fed_etc_dir)
755            if self.sshd:
756                stage_file(script, self.sshd, fed_bin_dir)
757            if self.sshd_config:
758                stage_file(script, self.sshd_config, fed_etc_dir)
759
760            # Look in tmpdir to get the names.  They've all been copied
761            # into the (remote) staging dir
762            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
763                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
764
765            # Done with staging, remove the identity used to stage
766            print >>script, "#/bin/rm .ssh/id_rsa"
767
768            # Start commands
769            if elem.get_attribute('portal') and self.portal_startcommand:
770                # Install portal software
771                for l, f in self.portal_software:
772                    base = os.path.basename(f)
773                    stage_file(script, base)
774                    if l: install_tar(script, l, base)
775                    else: install_rpm(script, base)
776
777                # Portals never have a user-specified start command
778                print >>script, self.portal_startcommand
779            elif self.node_startcommand:
780                # XXX: debug
781                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
782                # XXX: debug
783                if elem.get_attribute('startup'):
784                    print >>script, "%s \\$USER '%s'" % \
785                            (self.node_startcommand,
786                                    elem.get_attribute('startup'))
787                else:
788                    print >>script, self.node_startcommand
789            script.close()
790            return sfile, pname
791        else:
792            return None, None
793
794
795    def configure_nodes(self, segment_commands, topo, nodes, user, 
796            pubkey, secretkey, stagingdir, tmpdir):
797        """
798        For each node in the topology, generate a script file that copies
799        software onto it and installs it in the proper places and then runs the
800        startup command (including the federation commands.
801        """
802
803
804
805        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
806            vname = e.name
807            sfile, pname = self.write_node_config_script(e,
808                    nodes.get(vname, { }),
809                    user, pubkey, secretkey, stagingdir, tmpdir)
810            if sfile:
811                if not segment_commands.scp_file(sfile, user, pname):
812                    self.log.error("Could not copy script to %s" % pname)
813            else:
814                self.log.error("Unmapped node: %s" % vname)
815
816    def start_node(self, user, host, node, segment_commands):
817        """
818        Copy an identity to a node for the configuration script to be able to
819        import data and then run the startup script remotely.
820        """
821        # Place an identity on the node so that the copying can succeed
822        segment_commands.scp_file( segment_commands.ssh_privkey_file,
823                user, node, ".ssh/id_rsa")
824        segment_commands.ssh_cmd(user, node, 
825                "sudo /bin/sh ./%s.startup &" % node)
826
827    def start_nodes(self, user, host, nodes, segment_commands):
828        """
829        Start a thread to initialize each node and wait for them to complete.
830        Each thread runs start_node.
831        """
832        threads = [ ]
833        for n in nodes:
834            t = Thread(target=self.start_node, args=(user, host, n, 
835                segment_commands))
836            t.start()
837            threads.append(t)
838
839        done = [not t.isAlive() for t in threads]
840        while not all(done):
841            self.log.info("Waiting for threads %s" % done)
842            time.sleep(10)
843            done = [not t.isAlive() for t in threads]
844
845    def set_up_staging_filespace(self, segment_commands, user, host,
846            stagingdir):
847        """
848        Set up teh staging area on the staging machine.  To reduce the number
849        of ssh commands, we compose a script and execute it remotely.
850        """
851
852        self.log.info("[start_segment]: creating script file")
853        try:
854            sf, scriptname = tempfile.mkstemp()
855            scriptfile = os.fdopen(sf, 'w')
856        except EnvironmentError:
857            return False
858
859        scriptbase = os.path.basename(scriptname)
860
861        # Script the filesystem changes
862        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
863        print >>scriptfile, 'mkdir -p %s' % stagingdir
864        print >>scriptfile, "rm -f %s" % scriptbase
865        scriptfile.close()
866
867        # Move the script to the remote machine
868        # XXX: could collide tempfile names on the remote host
869        if segment_commands.scp_file(scriptname, user, host, scriptbase):
870            os.remove(scriptname)
871        else:
872            return False
873
874        # Execute the script (and the script's last line deletes it)
875        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
876            return False
877
878    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
879        """
880        Protogeni interactions take a context and a protogeni certificate.
881        This establishes both for later calls and returns them.
882        """
883        if os.access(certfile, os.R_OK):
884            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
885        else:
886            self.log.error("[start_segment]: Cannot read certfile: %s" % \
887                    certfile)
888            return None, None
889
890        try:
891            gcred = segment_commands.slice_authority_call('GetCredential', 
892                    {}, ctxt)
893        except segment_commands.ProtoGENIError, e:
894            raise service_error(service_error.federant,
895                    "ProtoGENI: %s" % e)
896
897        return ctxt, gcred
898
899    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
900        """
901        Find a usable slice name by trying random ones until there's no
902        collision.
903        """
904
905        def random_slicename(user):
906            """
907            Return a random slicename by appending 5 letters to the username.
908            """
909            slicename = user
910            for i in range(0,5):
911                slicename += random.choice(string.ascii_letters)
912            return slicename
913
914        while True:
915            slicename = random_slicename(user)
916            try:
917                param = {
918                        'credential': gcred, 
919                        'hrn': slicename,
920                        'type': 'Slice'
921                        }
922                segment_commands.slice_authority_call('Resolve', param, ctxt)
923            except segment_commands.ProtoGENIError, e:
924                print e
925                break
926
927        return slicename
928
929    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
930        """
931        Create the slice and allocate resources.  If any of this stuff fails,
932        the allocations will time out on PG in short order, so we just raise
933        the service_error.  Return the slice and sliver credentials as well as
934        the manifest.
935        """
936        try:
937            param = {
938                    'credential': gcred, 
939                    'hrn': slicename,
940                    'type': 'Slice'
941                    }
942            slice_cred = segment_commands.slice_authority_call('Register',
943                    param, ctxt)
944            # Resolve the slice to get the URN that PG has assigned it.
945            param = {
946                    'credential': gcred,
947                    'type': 'Slice',
948                    'hrn': slicename
949                    }
950            data = segment_commands.slice_authority_call('Resolve', param,
951                    ctxt)
952            if 'urn' in data:
953                slice_urn = data['urn']
954            else:
955                raise service_error(service_error.federant, 
956                        "No URN returned for slice %s" % hrn)
957
958            if 'creator_urn' in data:
959                creator_urn = data['creator_urn']
960            else:
961                raise service_error(service_error.federant, 
962                        "No creator URN returned for slice %s" % hrn)
963            # Populate the ssh keys (let PG format them)
964            param = {
965                    'credential': gcred, 
966                    }
967            keys =  segment_commands.slice_authority_call('GetKeys', param, 
968                    ctxt)
969            # Create a Sliver
970            param = {
971                    'credentials': [ slice_cred ],
972                    'rspec': rspec, 
973                    'users': [ {
974                        'urn': creator_urn,
975                        'keys': keys, 
976                        },
977                    ],
978                    'slice_urn': slice_urn,
979                    }
980            sliver_cred, manifest = segment_commands.component_manager_call(
981                    'CreateSliver', param, ctxt)
982        except segment_commands.ProtoGENIError, e:
983            raise service_error(service_error.federant,
984                    "ProtoGENI: %s %s" % (e.code, e))
985
986        return (slice_urn, slice_cred, manifest)
987
988    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
989            timeout=None):
990        """
991        Wait for the given slice to finish its startup.  Return the final
992        status.
993        """
994        completed_states = ('failed', 'ready')
995        status = 'changing'
996        if timeout is not None:
997            end = time.time() + timeout
998        try:
999            while status not in completed_states:
1000                param = { 
1001                        'credentials': [ slice_cred ],
1002                        'slice_urn': slice_urn,
1003                        }
1004                r = segment_commands.component_manager_call(
1005                        'SliverStatus', param, ctxt)
1006                status = r.get('status', 'changing')
1007                if status not in completed_states:
1008                    if timeout is not None and time.time() > end:
1009                        return 'timeout'
1010                    time.sleep(30)
1011        except segment_commands.ProtoGENIError, e:
1012            raise service_error(service_error.federant,
1013                    "ProtoGENI: %s %s" % (e.code, e))
1014
1015        return status
1016
1017    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
1018        """
1019        Delete the slice resources.  An error from the service is ignores,
1020        because the soft state will go away anyway.
1021        """
1022        try:
1023            param = { 
1024                    'credentials': [ slice_cred, ],
1025                    'slice_urn': slice_urn,
1026                    }
1027            segment_commands.component_manager_call('DeleteSlice',
1028                    param, ctxt)
1029        except segment_commands.ProtoGENIError, e:
1030            self.log.warn("ProtoGENI: %s" % e)
1031
1032
1033
1034    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1035            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1036            export_certfile, topo, connInfo, services, timeout=0):
1037        """
1038        Start a sub-experiment on a federant.
1039
1040        Get the current state, modify or create as appropriate, ship data
1041        and configs and start the experiment.  There are small ordering
1042        differences based on the initial state of the sub-experiment.
1043        """
1044
1045        # Local software dir
1046        lsoftdir = "%s/software" % tmpdir
1047        host = self.staging_host
1048
1049        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1050                certfile, certpw)
1051
1052        if not ctxt: return False
1053
1054        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1055        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1056        self.log.info("Creating %s" % slicename)
1057        slice_urn, slice_cred, manifest = self.allocate_slice(
1058                segment_commands, slicename, rspec, gcred, ctxt)
1059
1060        # With manifest in hand, we can export the portal node names.
1061        if self.create_debug: nodes = self.fake_manifest(topo)
1062        else: nodes = self.manifest_to_dict(manifest)
1063
1064        self.export_store_info(export_certfile, nodes, self.ssh_port,
1065                connInfo)
1066        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1067                ename, connInfo, services, nodes)
1068
1069        # Copy software to the staging machine (done after generation to copy
1070        # those, too)
1071        for d in (tmpdir, lsoftdir):
1072            if os.path.isdir(d):
1073                for f in os.listdir(d):
1074                    if not os.path.isdir("%s/%s" % (d, f)):
1075                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1076                                user, host, "%s/%s" % (stagingdir, f)):
1077                            self.log.error("Scp failed")
1078                            return False
1079
1080
1081        # Now we wait for the nodes to start on PG
1082        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1083                ctxt, timeout=300)
1084        if status == 'failed':
1085            self.log.error('Sliver failed to start on ProtoGENI')
1086            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1087            return False
1088        elif status == 'timeout':
1089            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1090            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1091            return False
1092        else:
1093            # All good: save ProtoGENI info in shared state
1094            self.state_lock.acquire()
1095            self.allocation[aid]['slice_urn'] = slice_urn
1096            self.allocation[aid]['slice_name'] = slicename
1097            self.allocation[aid]['slice_credential'] = slice_cred
1098            self.allocation[aid]['manifest'] = manifest
1099            self.allocation[aid]['certfile'] = certfile
1100            self.allocation[aid]['certpw'] = certpw
1101            self.write_state()
1102            self.state_lock.release()
1103
1104        # Now we have configuration to do for ProtoGENI
1105        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1106                secretkey, stagingdir, tmpdir)
1107
1108        self.start_nodes(user, self.staging_host, 
1109                [ n.get('hostname', None) for n in nodes.values()],
1110                segment_commands)
1111
1112        # Everything has gone OK.
1113        return True, dict([(k, n.get('hostname', None)) \
1114                for k, n in nodes.items()])
1115
1116    def generate_rspec(self, topo, softdir, connInfo):
1117
1118        # Force a useful image.  Without picking this the nodes can get
1119        # different images and there is great pain.
1120        def image_filter(e):
1121            if isinstance(e, topdl.Computer):
1122                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1123                        'image+emulab-ops//FEDORA10-STD" />'
1124            else:
1125                return ""
1126        # Main line of generate
1127        t = topo.clone()
1128
1129        starts = { }
1130        # Weed out the things we aren't going to instantiate: Segments, portal
1131        # substrates, and portal interfaces.  (The copy in the for loop allows
1132        # us to delete from e.elements in side the for loop).  While we're
1133        # touching all the elements, we also adjust paths from the original
1134        # testbed to local testbed paths and put the federation commands and
1135        # startcommands into a dict so we can start them manually later.
1136        # ProtoGENI requires setup before the federation commands run, so we
1137        # run them by hand after we've seeded configurations.
1138        for e in [e for e in t.elements]:
1139            if isinstance(e, topdl.Segment):
1140                t.elements.remove(e)
1141            # Fix software paths
1142            for s in getattr(e, 'software', []):
1143                s.location = re.sub("^.*/", softdir, s.location)
1144            if isinstance(e, topdl.Computer):
1145                if e.get_attribute('portal') and self.portal_startcommand:
1146                    # Portals never have a user-specified start command
1147                    starts[e.name] = self.portal_startcommand
1148                elif self.node_startcommand:
1149                    if e.get_attribute('startup'):
1150                        starts[e.name] = "%s \\$USER '%s'" % \
1151                                (self.node_startcommand, 
1152                                        e.get_attribute('startup'))
1153                        e.remove_attribute('startup')
1154                    else:
1155                        starts[e.name] = self.node_startcommand
1156
1157                # Remove portal interfaces
1158                e.interface = [i for i in e.interface \
1159                        if not i.get_attribute('portal')]
1160
1161        t.substrates = [ s.clone() for s in t.substrates ]
1162        t.incorporate_elements()
1163
1164        # Customize the rspec output to use the image we like
1165        filters = [ image_filter ]
1166
1167        # Convert to rspec and return it
1168        exp_rspec = topdl.topology_to_rspec(t, filters)
1169
1170        return exp_rspec
1171
1172    def retrieve_software(self, topo, certfile, softdir):
1173        """
1174        Collect the software that nodes in the topology need loaded and stage
1175        it locally.  This implies retrieving it from the experiment_controller
1176        and placing it into softdir.  Certfile is used to prove that this node
1177        has access to that data (it's the allocation/segment fedid).  Finally
1178        local portal and federation software is also copied to the same staging
1179        directory for simplicity - all software needed for experiment creation
1180        is in softdir.
1181        """
1182        sw = set()
1183        for e in topo.elements:
1184            for s in getattr(e, 'software', []):
1185                sw.add(s.location)
1186        os.mkdir(softdir)
1187        for s in sw:
1188            self.log.debug("Retrieving %s" % s)
1189            try:
1190                get_url(s, certfile, softdir)
1191            except:
1192                t, v, st = sys.exc_info()
1193                raise service_error(service_error.internal,
1194                        "Error retrieving %s: %s" % (s, v))
1195
1196        # Copy local portal node software to the tempdir
1197        for s in (self.portal_software, self.federation_software):
1198            for l, f in s:
1199                base = os.path.basename(f)
1200                copy_file(f, "%s/%s" % (softdir, base))
1201
1202
1203    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1204        """
1205        Gather common configuration files, retrieve or create an experiment
1206        name and project name, and return the ssh_key filenames.  Create an
1207        allocation log bound to the state log variable as well.
1208        """
1209        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1210        ename = None
1211        pubkey_base = None
1212        secretkey_base = None
1213        alloc_log = None
1214
1215        for a in attrs:
1216            if a['attribute'] in configs:
1217                try:
1218                    self.log.debug("Retrieving %s" % a['value'])
1219                    get_url(a['value'], certfile, tmpdir)
1220                except:
1221                    t, v, st = sys.exc_info()
1222                    raise service_error(service_error.internal,
1223                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1224            if a['attribute'] == 'ssh_pubkey':
1225                pubkey_base = a['value'].rpartition('/')[2]
1226            if a['attribute'] == 'ssh_secretkey':
1227                secretkey_base = a['value'].rpartition('/')[2]
1228            if a['attribute'] == 'experiment_name':
1229                ename = a['value']
1230
1231        if not ename:
1232            ename = ""
1233            for i in range(0,5):
1234                ename += random.choice(string.ascii_letters)
1235            self.log.warn("No experiment name: picked one randomly: %s" \
1236                    % ename)
1237
1238        self.state_lock.acquire()
1239        if self.allocation.has_key(aid):
1240            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1241            self.allocation[aid]['experiment'] = ename
1242            self.allocation[aid]['log'] = [ ]
1243            # Create a logger that logs to the experiment's state object as
1244            # well as to the main log file.
1245            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1246            h = logging.StreamHandler(
1247                    list_log.list_log(self.allocation[aid]['log']))
1248            # XXX: there should be a global one of these rather than
1249            # repeating the code.
1250            h.setFormatter(logging.Formatter(
1251                "%(asctime)s %(name)s %(message)s",
1252                        '%d %b %y %H:%M:%S'))
1253            alloc_log.addHandler(h)
1254            self.write_state()
1255        else:
1256            self.log.error("No allocation for %s!?" % aid)
1257        self.state_lock.release()
1258
1259        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1260                cpw, alloc_log)
1261
1262    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1263        # Copy the assigned names into the return topology
1264        rvtopo = topo.clone()
1265        embedding = [ ]
1266        for k, n in nodes.items():
1267            embedding.append({ 
1268                'toponame': k,
1269                'physname': [n ],
1270                })
1271        # Grab the log (this is some anal locking, but better safe than
1272        # sorry)
1273        self.state_lock.acquire()
1274        logv = "".join(self.allocation[aid]['log'])
1275        # It's possible that the StartSegment call gets retried (!).
1276        # if the 'started' key is in the allocation, we'll return it rather
1277        # than redo the setup.
1278        self.allocation[aid]['started'] = { 
1279                'allocID': alloc_id,
1280                'allocationLog': logv,
1281                'segmentdescription': { 
1282                    'topdldescription': rvtopo.to_dict() },
1283                'embedding': embedding,
1284                }
1285        retval = copy.deepcopy(self.allocation[aid]['started'])
1286        self.write_state()
1287        self.state_lock.release()
1288
1289        return retval
1290
1291    def StartSegment(self, req, fid):
1292        err = None  # Any service_error generated after tmpdir is created
1293        rv = None   # Return value from segment creation
1294
1295        try:
1296            req = req['StartSegmentRequestBody']
1297            topref = req['segmentdescription']['topdldescription']
1298        except KeyError:
1299            raise service_error(service_error.req, "Badly formed request")
1300
1301        connInfo = req.get('connection', [])
1302        services = req.get('service', [])
1303        auth_attr = req['allocID']['fedid']
1304        aid = "%s" % auth_attr
1305        attrs = req.get('fedAttr', [])
1306        if not self.auth.check_attribute(fid, auth_attr):
1307            raise service_error(service_error.access, "Access denied")
1308        else:
1309            # See if this is a replay of an earlier succeeded StartSegment -
1310            # sometimes SSL kills 'em.  If so, replay the response rather than
1311            # redoing the allocation.
1312            self.state_lock.acquire()
1313            retval = self.allocation[aid].get('started', None)
1314            self.state_lock.release()
1315            if retval:
1316                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1317                        "replaying response")
1318                return retval
1319
1320        if topref:
1321            topo = topdl.Topology(**topref)
1322        else:
1323            raise service_error(service_error.req, 
1324                    "Request missing segmentdescription'")
1325
1326        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1327        try:
1328            tmpdir = tempfile.mkdtemp(prefix="access-")
1329            softdir = "%s/software" % tmpdir
1330        except EnvironmentError:
1331            raise service_error(service_error.internal, "Cannot create tmp dir")
1332
1333        # Try block alllows us to clean up temporary files.
1334        try:
1335            self.retrieve_software(topo, certfile, softdir)
1336            self.configure_userconf(services, tmpdir)
1337            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1338                cpw, alloc_log = self.initialize_experiment_info(attrs,
1339                        aid, certfile, tmpdir)
1340            self.import_store_info(certfile, connInfo)
1341            rspec = self.generate_rspec(topo, "%s/%s/" \
1342                    % (self.staging_dir, ename), connInfo)
1343
1344            segment_commands = protogeni_proxy(keyfile=ssh_key,
1345                    debug=self.create_debug, log=alloc_log,
1346                    ch_url = self.ch_url, sa_url=self.sa_url,
1347                    cm_url=self.cm_url)
1348            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1349                    pubkey_base, 
1350                    secretkey_base, ename,
1351                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1352                    certfile, topo, connInfo, services)
1353        except EnvironmentError, e:
1354            err = service_error(service_error.internal, "%s" % e)
1355        except service_error, e:
1356            err = e
1357        except:
1358            t, v, st = sys.exc_info()
1359            err = service_error(service_error.internal, "%s: %s" % \
1360                    (v, traceback.extract_tb(st)))
1361
1362        # Walk up tmpdir, deleting as we go
1363        if self.cleanup: self.remove_dirs(tmpdir)
1364        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1365
1366        if rv:
1367            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1368        elif err:
1369            raise service_error(service_error.federant,
1370                    "Swapin failed: %s" % err)
1371        else:
1372            raise service_error(service_error.federant, "Swapin failed")
1373
1374    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1375            slice_urn, certfile, certpw):
1376        """
1377        Stop a sub experiment by calling swapexp on the federant
1378        """
1379        host = self.staging_host
1380        rv = False
1381        try:
1382            # Clean out tar files: we've gone over quota in the past
1383            if stagingdir:
1384                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1385            if slice_cred:
1386                self.log.error('Removing Sliver on ProtoGENI')
1387                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1388                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1389            return True
1390        except self.ssh_cmd_timeout:
1391            rv = False
1392        return rv
1393
1394    def TerminateSegment(self, req, fid):
1395        try:
1396            req = req['TerminateSegmentRequestBody']
1397        except KeyError:
1398            raise service_error(service_error.req, "Badly formed request")
1399
1400        auth_attr = req['allocID']['fedid']
1401        aid = "%s" % auth_attr
1402        attrs = req.get('fedAttr', [])
1403        if not self.auth.check_attribute(fid, auth_attr):
1404            raise service_error(service_error.access, "Access denied")
1405
1406        self.state_lock.acquire()
1407        if self.allocation.has_key(aid):
1408            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1409            slice_cred = self.allocation[aid].get('slice_credential', None)
1410            slice_urn = self.allocation[aid].get('slice_urn', None)
1411            ename = self.allocation[aid].get('experiment', None)
1412        else:
1413            cf, user, ssh_key, cpw = (None, None, None, None)
1414            slice_cred = None
1415            slice_urn = None
1416            ename = None
1417        self.state_lock.release()
1418
1419        if ename:
1420            staging = "%s/%s" % ( self.staging_dir, ename)
1421        else:
1422            self.log.warn("Can't find experiment name for %s" % aid)
1423            staging = None
1424
1425        segment_commands = protogeni_proxy(keyfile=ssh_key,
1426                debug=self.create_debug, ch_url = self.ch_url,
1427                sa_url=self.sa_url, cm_url=self.cm_url)
1428        self.stop_segment(segment_commands, user, staging, slice_cred,
1429                slice_urn, cf, cpw)
1430        return { 'allocID': req['allocID'] }
1431
1432    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1433            certfile, certpw):
1434        """
1435        Linear code through the segment renewal calls.
1436        """
1437        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1438        try:
1439            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1440                    time.gmtime(time.time() + interval))
1441            cred = segment_commands.slice_authority_call('GetCredential', 
1442                    {}, ctxt)
1443
1444            param = {
1445                    'credential': scred,
1446                    'expiration': expiration
1447                    }
1448            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1449            param = {
1450                    'credential': cred,
1451                    'hrn': name,
1452                    'type': 'Slice',
1453                    }
1454            slice = segment_commands.slice_authority_call('Resolve', 
1455                    param, ctxt)
1456            uuid = slice.get('uuid', None)
1457            if uuid == None:
1458                sys.exit('No uuid for %s' % slicename)
1459
1460            print 'Calling GetCredential (uuid)'
1461            param = {
1462                    'credential': cred,
1463                    'uuid': uuid,
1464                    'type': 'Slice',
1465                    }
1466            new_scred = segment_commands.slice_authority_call('GetCredential',
1467                    param, ctxt)
1468
1469        except segment_commands.ProtoGENIError, e:
1470            self.log.error("Failed to extend slice %s: %s" % (name, e))
1471            return None
1472        try:
1473            print 'Calling RenewSlice (CM)'
1474            param = {
1475                    'credentials': [new_scred,],
1476                    'slice_urn': slice_urn,
1477                    }
1478            r = segment_commands.component_manager_call('RenewSlice', param, 
1479                    ctxt)
1480        except segment_commands.ProtoGENIError, e:
1481            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1482
1483        return new_scred
1484   
1485
1486    def RenewSlices(self):
1487        self.log.info("Scanning for slices to renew")
1488        self.state_lock.acquire()
1489        aids = self.allocation.keys()
1490        self.state_lock.release()
1491
1492        for aid in aids:
1493            self.state_lock.acquire()
1494            if self.allocation.has_key(aid):
1495                name = self.allocation[aid].get('slice_name', None)
1496                scred = self.allocation[aid].get('slice_credential', None)
1497                slice_urn = self.allocation[aid].get('slice_urn', None)
1498                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1499            else:
1500                name = None
1501                scred = None
1502            self.state_lock.release()
1503
1504            if not os.access(cf, os.R_OK):
1505                self.log.error(
1506                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1507                continue
1508
1509            # There's a ProtoGENI slice associated with the segment; renew it.
1510            if name and scred and slice_urn:
1511                segment_commands = protogeni_proxy(log=self.log, 
1512                        debug=self.create_debug, keyfile=ssh_key,
1513                        cm_url = self.cm_url, sa_url = self.sa_url,
1514                        ch_url = self.ch_url)
1515                new_scred = self.renew_segment(segment_commands, name, scred, 
1516                        slice_urn, self.renewal_interval, cf, cpw)
1517                if new_scred:
1518                    self.log.info("Slice %s renewed until %s GMT" % \
1519                            (name, time.asctime(time.gmtime(\
1520                                time.time()+self.renewal_interval))))
1521                    self.state_lock.acquire()
1522                    if self.allocation.has_key(aid):
1523                        self.allocation[aid]['slice_credential'] = new_scred
1524                        self.write_state()
1525                    self.state_lock.release()
1526                else:
1527                    self.log.info("Failed to renew slice %s " % name)
1528
1529        # Let's do this all again soon.  (4 tries before the slices time out)   
1530        t = Timer(self.renewal_interval/4, self.RenewSlices)
1531        t.start()
Note: See TracBrowser for help on using the repository browser.