source: fedd/federation/protogeni_access.py @ d49c11c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since d49c11c was d49c11c, checked in by Ted Faber <faber@…>, 14 years ago

Move to v2 PG CM

  • Property mode set to 100644
File size: 47.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17from M2Crypto.SSL import SSLError
18
19from util import *
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from access import access_base
31from proxy_segment import proxy_segment
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class protogeni_proxy(proxy_segment):
45    class ProtoGENIError(Exception): 
46        def __init__(self, op, code, output):
47            Exception.__init__(self, output)
48            self.op = op
49            self.code = code
50            self.output = output
51
52    def __init__(self, log=None, keyfile=None, debug=False, 
53            ch_url=None, sa_url=None, cm_url=None):
54        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
55
56        self.ProtoGENIError = protogeni_proxy.ProtoGENIError
57        self.ch_url = ch_url
58        self.sa_url = sa_url
59        self.cm_url = cm_url
60
61        self.call_SetValue = service_caller('SetValue')
62
63        self.debug_fail = ['Resolve']
64        self.debug_response = {
65                'RedeemTicket': ("XML blob1", "XML blob2"),
66                'SliceStatus': { 'status': 'ready' },
67            }
68
69
70    def pg_call(self, url, method, params, context):
71        max_retries = 5
72        retries = 0
73
74        s = service_caller(method, request_body_name="", strict=False)
75        self.log.debug("Calling %s %s" % (url, method))
76        if not self.debug:
77            while retries < max_retries:
78                r = s.call_xmlrpc_service(url, params, context=context)
79                code = r.get('code', -1)
80                if code == 0:
81                    # Success leaves the loop here
82                    return r.get('value', None)
83                elif code == 14 and retries +1 < max_retries:
84                    # Busy resource
85                    retries+= 1
86                    self.log.info("Resource busy, retrying in 30 secs")
87                    time.sleep(30)
88                else:
89                    # NB: out of retries falls through to here
90                    raise self.ProtoGENIError(op=method, 
91                            code=r.get('code', 'unknown'), 
92                            output=r.get('output', 'No output'))
93        else:
94            if method in self.debug_fail:
95                raise self.ProtoGENIError(op=method, code='unknown',
96                        output='No output')
97            elif self.debug_response.has_key(method):
98                return self.debug_response[method]
99            else:
100                return "%s XML blob" % method
101
102
103
104class access(access_base):
105    """
106    The implementation of access control based on mapping users to projects.
107
108    Users can be mapped to existing projects or have projects created
109    dynamically.  This implements both direct requests and proxies.
110    """
111
112    def __init__(self, config=None, auth=None):
113        """
114        Initializer.  Pulls parameters out of the ConfigParser's access section.
115        """
116
117        access_base.__init__(self, config, auth)
118
119        self.domain = config.get("access", "domain")
120        self.userconfdir = config.get("access","userconfdir")
121        self.userconfcmd = config.get("access","userconfcmd")
122        self.userconfurl = config.get("access","userconfurl")
123        self.federation_software = config.get("access", "federation_software")
124        self.portal_software = config.get("access", "portal_software")
125        self.ssh_port = config.get("access","ssh_port") or "22"
126        self.sshd = config.get("access","sshd")
127        self.sshd_config = config.get("access", "sshd_config")
128        self.access_type = config.get("access", "type")
129        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
130        self.staging_host = config.get("access", "staging_host") \
131                or "ops.emulab.net"
132        self.local_seer_software = config.get("access", "local_seer_software")
133        self.local_seer_image = config.get("access", "local_seer_image")
134        self.local_seer_start = config.get("access", "local_seer_start")
135   
136        self.dragon_endpoint = config.get("access", "dragon")
137        self.dragon_vlans = config.get("access", "dragon_vlans")
138        self.deter_internal = config.get("access", "deter_internal")
139
140        self.tunnel_config = config.getboolean("access", "tunnel_config")
141        self.portal_command = config.get("access", "portal_command")
142        self.portal_image = config.get("access", "portal_image")
143        self.portal_type = config.get("access", "portal_type") or "pc"
144        self.portal_startcommand = config.get("access", "portal_startcommand")
145        self.node_startcommand = config.get("access", "node_startcommand")
146
147        self.federation_software = self.software_list(self.federation_software)
148        self.portal_software = self.software_list(self.portal_software)
149        self.local_seer_software = self.software_list(self.local_seer_software)
150
151        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
152        self.renewal_interval = int(self.renewal_interval) * 60
153
154        self.ch_url = config.get("access", "ch_url")
155        self.sa_url = config.get("access", "sa_url")
156        self.cm_url = config.get("access", "cm_url")
157
158        self.restricted = [ ]
159
160        # read_state in the base_class
161        self.state_lock.acquire()
162        for a  in ('allocation', 'projects', 'keys', 'types'):
163            if a not in self.state:
164                self.state[a] = { }
165        self.allocation = self.state['allocation']
166        self.projects = self.state['projects']
167        self.keys = self.state['keys']
168        self.types = self.state['types']
169        # Add the ownership attributes to the authorizer.  Note that the
170        # indices of the allocation dict are strings, but the attributes are
171        # fedids, so there is a conversion.
172        for k in self.state.get('allocation', {}).keys():
173            for o in self.state['allocation'][k].get('owners', []):
174                self.auth.set_attribute(o, fedid(hexstr=k))
175            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
176
177        self.state_lock.release()
178
179
180        self.log = logging.getLogger("fedd.access")
181        set_log_level(config, "access", self.log)
182
183        self.access = { }
184        if config.has_option("access", "accessdb"):
185            self.read_access(config.get("access", "accessdb"), 
186                    access_obj=self.make_access_info)
187
188        self.lookup_access = self.lookup_access_base
189
190        self.call_SetValue = service_caller('SetValue')
191        self.call_GetValue = service_caller('GetValue')
192        self.exports = {
193                'local_seer_control': self.export_local_seer,
194                'seer_master': self.export_seer_master,
195                'hide_hosts': self.export_hide_hosts,
196                }
197
198        if not self.local_seer_image or not self.local_seer_software or \
199                not self.local_seer_start:
200            if 'local_seer_control' in self.exports:
201                del self.exports['local_seer_control']
202
203        if not self.local_seer_image or not self.local_seer_software or \
204                not self.seer_master_start:
205            if 'seer_master' in self.exports:
206                del self.exports['seer_master']
207
208        self.RenewSlices()
209
210        self.soap_services = {\
211            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
212            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
213            'StartSegment': soap_handler("StartSegment", self.StartSegment),
214            'TerminateSegment': soap_handler("TerminateSegment", 
215                self.TerminateSegment),
216            }
217        self.xmlrpc_services =  {\
218            'RequestAccess': xmlrpc_handler('RequestAccess',
219                self.RequestAccess),
220            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
221                self.ReleaseAccess),
222            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
223            'TerminateSegment': xmlrpc_handler('TerminateSegment',
224                self.TerminateSegment),
225            }
226
227    @staticmethod
228    def make_access_info(s):
229        """
230        Split a string of the form (id, id, id, id) ito its constituent tuples
231        and return them as a tuple.  Use to import access info from the
232        access_db.
233        """
234
235        ss = s.strip()
236        if ss.startswith('(') and ss.endswith(')'):
237            l = [ s.strip() for s  in ss[1:-1].split(",")]
238            if len(l) == 4:
239                return tuple(l)
240            else:
241                raise self.parse_error(
242                        "Exactly 4 elements in access info required")
243        else:
244            raise self.parse_error("Expecting parenthezied values")
245
246
247    def get_handler(self, path, fid):
248        self.log.info("Get handler %s %s" % (path, fid))
249        if self.auth.check_attribute(fid, path) and self.userconfdir:
250            return ("%s/%s" % (self.userconfdir, path), "application/binary")
251        else:
252            return (None, None)
253
254    def build_access_response(self, alloc_id, services):
255        """
256        Create the SOAP response.
257
258        Build the dictionary description of the response and use
259        fedd_utils.pack_soap to create the soap message.  ap is the allocate
260        project message returned from a remote project allocation (even if that
261        allocation was done locally).
262        """
263        # Because alloc_id is already a fedd_services_types.IDType_Holder,
264        # there's no need to repack it
265        msg = { 
266                'allocID': alloc_id,
267                'fedAttr': [
268                    { 'attribute': 'domain', 'value': self.domain } , 
269                ]
270            }
271        if self.dragon_endpoint:
272            msg['fedAttr'].append({'attribute': 'dragon',
273                'value': self.dragon_endpoint})
274        if self.deter_internal:
275            msg['fedAttr'].append({'attribute': 'deter_internal',
276                'value': self.deter_internal})
277        #XXX: ??
278        if self.dragon_vlans:
279            msg['fedAttr'].append({'attribute': 'vlans',
280                'value': self.dragon_vlans})
281
282        if services:
283            msg['service'] = services
284        return msg
285
286    def RequestAccess(self, req, fid):
287        """
288        Handle the access request.
289        """
290
291        # The dance to get into the request body
292        if req.has_key('RequestAccessRequestBody'):
293            req = req['RequestAccessRequestBody']
294        else:
295            raise service_error(service_error.req, "No request!?")
296
297        if req.has_key('destinationTestbed'):
298            dt = unpack_id(req['destinationTestbed'])
299
300        # Request for this fedd
301        found, match = self.lookup_access(req, fid)
302        services, svc_state = self.export_services(req.get('service',[]),
303                None, None)
304        # keep track of what's been added
305        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
306        aid = unicode(allocID)
307
308        self.state_lock.acquire()
309        self.allocation[aid] = { }
310        # The protoGENI certificate
311        self.allocation[aid]['credentials'] = found
312        # The list of owner FIDs
313        self.allocation[aid]['owners'] = [ fid ]
314        self.write_state()
315        self.state_lock.release()
316        self.auth.set_attribute(fid, allocID)
317        self.auth.set_attribute(allocID, allocID)
318
319        try:
320            f = open("%s/%s.pem" % (self.certdir, aid), "w")
321            print >>f, alloc_cert
322            f.close()
323        except EnvironmentError, e:
324            raise service_error(service_error.internal, 
325                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
326        return self.build_access_response({ 'fedid': allocID }, None)
327
328
329    def ReleaseAccess(self, req, fid):
330        # The dance to get into the request body
331        if req.has_key('ReleaseAccessRequestBody'):
332            req = req['ReleaseAccessRequestBody']
333        else:
334            raise service_error(service_error.req, "No request!?")
335
336        # Local request
337        try:
338            if req['allocID'].has_key('localname'):
339                auth_attr = aid = req['allocID']['localname']
340            elif req['allocID'].has_key('fedid'):
341                aid = unicode(req['allocID']['fedid'])
342                auth_attr = req['allocID']['fedid']
343            else:
344                raise service_error(service_error.req,
345                        "Only localnames and fedids are understood")
346        except KeyError:
347            raise service_error(service_error.req, "Badly formed request")
348
349        self.log.debug("[access] deallocation requested for %s", aid)
350        if not self.auth.check_attribute(fid, auth_attr):
351            self.log.debug("[access] deallocation denied for %s", aid)
352            raise service_error(service_error.access, "Access Denied")
353
354        self.state_lock.acquire()
355        if self.allocation.has_key(aid):
356            self.log.debug("Found allocation for %s" %aid)
357            del self.allocation[aid]
358            self.write_state()
359            self.state_lock.release()
360            # And remove the access cert
361            cf = "%s/%s.pem" % (self.certdir, aid)
362            self.log.debug("Removing %s" % cf)
363            os.remove(cf)
364            return { 'allocID': req['allocID'] } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    def manifest_to_dict(self, manifest, ignore_debug=False):
370        """
371        Turn the manifest into a dict were each virtual nodename (i.e. the
372        topdl name) has an entry with the allocated machine in hostname and the
373        interfaces in 'interfaces'.  I love having XML parser code lying
374        around.
375        """
376        if self.create_debug and not ignore_debug: 
377            self.log.debug("Returning null manifest dict")
378            return { }
379
380        # The class allows us to keep a little state - the dict under
381        # consteruction and the current entry in that dict for the interface
382        # element code.
383        class manifest_parser:
384            def __init__(self):
385                self.d = { }
386                self.current_key=None
387
388            # If the element is a node, create a dict entry for it.  If it's an
389            # interface inside a node, add an entry in the interfaces list with
390            # the virtual name and component id.
391            def start_element(self, name, attrs):
392                if name == 'node':
393                    self.current_key = attrs.get('virtual_id',"")
394                    if self.current_key:
395                        self.d[self.current_key] = {
396                                'hostname': attrs.get('hostname', None),
397                                'interfaces': { }
398                                }
399                elif name == 'interface' and self.current_key:
400                    self.d[self.current_key]['interfaces']\
401                            [attrs.get('virtual_id','')] = \
402                            attrs.get('component_id', None)
403            #  When a node is finished, clear current_key
404            def end_element(self, name):
405                if name == 'node': self.current_key = None
406
407        node = { }
408
409        mp = manifest_parser()
410        p = xml.parsers.expat.ParserCreate()
411        # These are bound to the class we just created
412        p.StartElementHandler = mp.start_element
413        p.EndElementHandler = mp.end_element
414
415        p.Parse(manifest)
416        # Make the node dict that the callers expect
417        for k in mp.d:
418            node[k] = mp.d.get('hostname', '')
419        return mp.d
420
421    def fake_manifest(self, topo):
422        """
423        Fake the output of manifest_to_dict with a bunch of generic node an
424        interface names, for debugging.
425        """
426        node = { }
427        for i, e in enumerate([ e for e in topo.elements \
428                if isinstance(e, topdl.Computer)]):
429            node[e.name] = { 
430                    'hostname': "node%03d" % i,
431                    'interfaces': { }
432                    }
433            for j, inf in enumerate(e.interface):
434                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
435
436        return node
437
438
439    def generate_portal_configs(self, topo, pubkey_base, 
440            secretkey_base, tmpdir, leid, connInfo, services, nodes):
441
442        def conninfo_to_dict(key, info):
443            """
444            Make a cpoy of the connection information about key, and flatten it
445            into a single dict by parsing out any feddAttrs.
446            """
447
448            rv = None
449            for i in info:
450                if key == i.get('portal', "") or \
451                        key in [e.get('element', "") \
452                        for e in i.get('member', [])]:
453                    rv = i.copy()
454                    break
455
456            else:
457                return rv
458
459            if 'fedAttr' in rv:
460                for a in rv['fedAttr']:
461                    attr = a.get('attribute', "")
462                    val = a.get('value', "")
463                    if attr and attr not in rv:
464                        rv[attr] = val
465                del rv['fedAttr']
466            return rv
467
468        # XXX: un hardcode this
469        def client_null(f, s):
470            print >>f, "Service: %s" % s['name']
471       
472        def client_seer_master(f, s):
473            print >>f, 'PortalAlias: seer-master'
474
475        def client_smb(f, s):
476            print >>f, "Service: %s" % s['name']
477            smbshare = None
478            smbuser = None
479            smbproj = None
480            for a in s.get('fedAttr', []):
481                if a.get('attribute', '') == 'SMBSHARE':
482                    smbshare = a.get('value', None)
483                elif a.get('attribute', '') == 'SMBUSER':
484                    smbuser = a.get('value', None)
485                elif a.get('attribute', '') == 'SMBPROJ':
486                    smbproj = a.get('value', None)
487
488            if all((smbshare, smbuser, smbproj)):
489                print >>f, "SMBshare: %s" % smbshare
490                print >>f, "ProjectUser: %s" % smbuser
491                print >>f, "ProjectName: %s" % smbproj
492
493        def client_hide_hosts(f, s):
494            for a in s.get('fedAttr', [ ]):
495                if a.get('attribute', "") == 'hosts':
496                    print >>f, 'Hide: %s' % a.get('value', "")
497
498        client_service_out = {
499                'SMB': client_smb,
500                'tmcd': client_null,
501                'seer': client_null,
502                'userconfig': client_null,
503                'project_export': client_null,
504                'seer_master': client_seer_master,
505                'hide_hosts': client_hide_hosts,
506            }
507
508        def client_seer_master_export(f, s):
509            print >>f, "AddedNode: seer-master"
510
511        def client_seer_local_export(f, s):
512            print >>f, "AddedNode: control"
513
514        client_export_service_out = {
515                'seer_master': client_seer_master_export,
516                'local_seer_control': client_seer_local_export,
517            }
518
519        def server_port(f, s):
520            p = urlparse(s.get('server', 'http://localhost'))
521            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
522
523        def server_null(f,s): pass
524
525        def server_seer(f, s):
526            print >>f, 'seer: true'
527
528        server_service_out = {
529                'SMB': server_port,
530                'tmcd': server_port,
531                'userconfig': server_null,
532                'project_export': server_null,
533                'seer': server_seer,
534                'seer_master': server_port,
535                'hide_hosts': server_null,
536            }
537        # XXX: end un hardcode this
538
539
540        seer_out = False
541        client_out = False
542        for e in [ e for e in topo.elements \
543                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
544            myname = e.name
545            type = e.get_attribute('portal_type')
546
547            info = conninfo_to_dict(myname, connInfo)
548
549            if not info:
550                raise service_error(service_error.req,
551                        "No connectivity info for %s" % myname)
552
553            # Translate to physical name (ProtoGENI doesn't have DNS)
554            physname = nodes.get(myname, { }).get('hostname', None)
555            peer = info.get('peer', "")
556            ldomain = self.domain
557            ssh_port = info.get('ssh_port', 22)
558
559            # Collect this for the client.conf file
560            if 'masterexperiment' in info:
561                mproj, meid = info['masterexperiment'].split("/", 1)
562
563            active = info.get('active', 'False')
564
565            if type in ('control', 'both'):
566                testbed = e.get_attribute('testbed')
567                control_gw = myname
568
569            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
570            tunnelconfig = self.tunnel_config
571            try:
572                f = open(cfn, "w")
573                if active == 'True':
574                    print >>f, "active: True"
575                    print >>f, "ssh_port: %s" % ssh_port
576                    if type in ('control', 'both'):
577                        for s in [s for s in services \
578                                if s.get('name', "") in self.imports]:
579                            server_service_out[s['name']](f, s)
580
581                if tunnelconfig:
582                    print >>f, "tunnelip: %s" % tunnelconfig
583                print >>f, "peer: %s" % peer.lower()
584                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
585                        pubkey_base
586                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
587                        secretkey_base
588                f.close()
589            except EnvironmentError, e:
590                raise service_error(service_error.internal,
591                        "Can't write protal config %s: %s" % (cfn, e))
592
593        # Done with portals, write the client config file.
594        try:
595            f = open("%s/client.conf" % tmpdir, "w")
596            if control_gw:
597                print >>f, "ControlGateway: %s" % physname.lower()
598            for s in services:
599                if s.get('name',"") in self.imports and \
600                        s.get('visibility','') == 'import':
601                    client_service_out[s['name']](f, s)
602                if s.get('name', '') in self.exports and \
603                        s.get('visibility', '') == 'export' and \
604                        s['name'] in client_export_service_out:
605                    client_export_service_out[s['name']](f, s)
606            # Seer uses this.
607            if mproj and meid:
608                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
609            f.close()
610        except EnvironmentError, e:
611            raise service_error(service_error.internal,
612                    "Cannot write client.conf: %s" %s)
613
614
615
616    def export_store_info(self, cf, nodes, ssh_port, connInfo):
617        """
618        For the export requests in the connection info, install the peer names
619        at the experiment controller via SetValue calls.
620        """
621
622        for c in connInfo:
623            for p in [ p for p in c.get('parameter', []) \
624                    if p.get('type', '') == 'output']:
625
626                if p.get('name', '') == 'peer':
627                    k = p.get('key', None)
628                    surl = p.get('store', None)
629                    if surl and k and k.index('/') != -1:
630                        if self.create_debug:
631                            req = { 'name': k, 'value': 'debug' }
632                            self.call_SetValue(surl, req, cf)
633                        else:
634                            n = nodes.get(k[k.index('/')+1:], { })
635                            value = n.get('hostname', None)
636                            if value:
637                                req = { 'name': k, 'value': value }
638                                self.call_SetValue(surl, req, cf)
639                            else:
640                                self.log.error("No hostname for %s" % \
641                                        k[k.index('/'):])
642                    else:
643                        self.log.error("Bad export request: %s" % p)
644                elif p.get('name', '') == 'ssh_port':
645                    k = p.get('key', None)
646                    surl = p.get('store', None)
647                    if surl and k:
648                        req = { 'name': k, 'value': ssh_port }
649                        self.call_SetValue(surl, req, cf)
650                    else:
651                        self.log.error("Bad export request: %s" % p)
652                else:
653
654                    self.log.error("Unknown export parameter: %s" % \
655                            p.get('name'))
656                    continue
657
658    def write_node_config_script(self, elem, node, user, pubkey,
659            secretkey, stagingdir, tmpdir):
660        """
661        Write out the configuration script that is to run on the node
662        represented by elem in the topology.  This is called
663        once per node to configure.
664        """
665        # These little functions/functors just make things more readable.  Each
666        # one encapsulates a small task of copying software files or installing
667        # them.
668        class stage_file_type:
669            """
670            Write code copying file sfrom the staging host to the host on which
671            this will run.
672            """
673            def __init__(self, user, host, stagingdir):
674                self.user = user
675                self.host = host
676                self.stagingdir = stagingdir
677                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
678                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
679
680            def __call__(self, script, file, dest="."):
681                # If the file is a full pathname, do not use stagingdir
682                if file.find('/') == -1:
683                    file = "%s/%s" % (self.stagingdir, file)
684                print >>script, "%s %s@%s:%s %s" % \
685                        (self.scp, self.user, self.host, file, dest)
686
687        def install_tar(script, loc, base):
688            """
689            Print code to script to install a tarfile in loc.
690            """
691            tar = "/bin/tar"
692            mkdir="/bin/mkdir"
693
694            print >>script, "%s -p %s" % (mkdir, loc)
695            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
696
697        def install_rpm(script, base):
698            """
699            Print code to script to install an rpm
700            """
701            rpm = "/bin/rpm"
702            print >>script, "%s --install %s" % (rpm, base)
703
704        ifconfig = "/sbin/ifconfig"
705        stage_file = stage_file_type(user, self.staging_host, stagingdir)
706        pname = node.get('hostname', None)
707        fed_dir = "/usr/local/federation"
708        fed_etc_dir = "%s/etc" % fed_dir
709        fed_bin_dir = "%s/bin" % fed_dir
710        fed_lib_dir = "%s/lib" % fed_dir
711
712        if pname:
713            sfile = "%s/%s.startup" % (tmpdir, pname)
714            script = open(sfile, "w")
715            # Reset the interfaces to the ones in the topo file
716            for i in [ i for i in elem.interface \
717                    if not i.get_attribute('portal')]:
718                pinf = node['interfaces'].get(i.name, None)
719                addr = i.get_attribute('ip4_address') 
720                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
721                if pinf and addr:
722                    print >>script, \
723                            "%s %s %s netmask %s"  % \
724                            (ifconfig, pinf, addr, netmask)
725                else:
726                    self.log.error("Missing interface or address for %s" \
727                            % i.name)
728               
729            for l, f in self.federation_software:
730                base = os.path.basename(f)
731                stage_file(script, base)
732                if l: install_tar(script, l, base)
733                else: install_rpm(script, base)
734
735            for s in elem.software:
736                s_base = s.location.rpartition('/')[2]
737                stage_file(script, s_base)
738                if s.install: install_tar(script, s.install, s_base)
739                else: install_rpm(script, s_base)
740
741            for f in ('hosts', pubkey, secretkey, 'client.conf', 
742                    'userconf'):
743                stage_file(script, f, fed_etc_dir)
744            if self.sshd:
745                stage_file(script, self.sshd, fed_bin_dir)
746            if self.sshd_config:
747                stage_file(script, self.sshd_config, fed_etc_dir)
748
749            # Look in tmpdir to get the names.  They've all been copied
750            # into the (remote) staging dir
751            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
752                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
753
754            # Done with staging, remove the identity used to stage
755            print >>script, "#/bin/rm .ssh/id_rsa"
756
757            # Start commands
758            if elem.get_attribute('portal') and self.portal_startcommand:
759                # Install portal software
760                for l, f in self.portal_software:
761                    base = os.path.basename(f)
762                    stage_file(script, base)
763                    if l: install_tar(script, l, base)
764                    else: install_rpm(script, base)
765
766                # Portals never have a user-specified start command
767                print >>script, self.portal_startcommand
768            elif self.node_startcommand:
769                # XXX: debug
770                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
771                # XXX: debug
772                if elem.get_attribute('startup'):
773                    print >>script, "%s \\$USER '%s'" % \
774                            (self.node_startcommand,
775                                    elem.get_attribute('startup'))
776                else:
777                    print >>script, self.node_startcommand
778            script.close()
779            return sfile, pname
780        else:
781            return None, None
782
783
784    def configure_nodes(self, segment_commands, topo, nodes, user, 
785            pubkey, secretkey, stagingdir, tmpdir):
786        """
787        For each node in the topology, generate a script file that copies
788        software onto it and installs it in the proper places and then runs the
789        startup command (including the federation commands.
790        """
791
792
793
794        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
795            vname = e.name
796            sfile, pname = self.write_node_config_script(e,
797                    nodes.get(vname, { }),
798                    user, pubkey, secretkey, stagingdir, tmpdir)
799            if sfile:
800                if not segment_commands.scp_file(sfile, user, pname):
801                    self.log.error("Could not copy script to %s" % pname)
802            else:
803                self.log.error("Unmapped node: %s" % vname)
804
805    def start_node(self, user, host, node, segment_commands):
806        """
807        Copy an identity to a node for the configuration script to be able to
808        import data and then run the startup script remotely.
809        """
810        # Place an identity on the node so that the copying can succeed
811        segment_commands.scp_file( segment_commands.ssh_privkey_file,
812                user, node, ".ssh/id_rsa")
813        segment_commands.ssh_cmd(user, node, 
814                "sudo /bin/sh ./%s.startup &" % node)
815
816    def start_nodes(self, user, host, nodes, segment_commands):
817        """
818        Start a thread to initialize each node and wait for them to complete.
819        Each thread runs start_node.
820        """
821        threads = [ ]
822        for n in nodes:
823            t = Thread(target=self.start_node, args=(user, host, n, 
824                segment_commands))
825            t.start()
826            threads.append(t)
827
828        done = [not t.isAlive() for t in threads]
829        while not all(done):
830            self.log.info("Waiting for threads %s" % done)
831            time.sleep(10)
832            done = [not t.isAlive() for t in threads]
833
834    def set_up_staging_filespace(self, segment_commands, user, host,
835            stagingdir):
836        """
837        Set up teh staging area on the staging machine.  To reduce the number
838        of ssh commands, we compose a script and execute it remotely.
839        """
840
841        self.log.info("[start_segment]: creating script file")
842        try:
843            sf, scriptname = tempfile.mkstemp()
844            scriptfile = os.fdopen(sf, 'w')
845        except EnvironmentError:
846            return False
847
848        scriptbase = os.path.basename(scriptname)
849
850        # Script the filesystem changes
851        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
852        print >>scriptfile, 'mkdir -p %s' % stagingdir
853        print >>scriptfile, "rm -f %s" % scriptbase
854        scriptfile.close()
855
856        # Move the script to the remote machine
857        # XXX: could collide tempfile names on the remote host
858        if segment_commands.scp_file(scriptname, user, host, scriptbase):
859            os.remove(scriptname)
860        else:
861            return False
862
863        # Execute the script (and the script's last line deletes it)
864        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
865            return False
866
867    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
868        """
869        Protogeni interactions take a context and a protogeni certificate.
870        This establishes both for later calls and returns them.
871        """
872        if os.access(certfile, os.R_OK):
873            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
874        else:
875            self.log.error("[start_segment]: Cannot read certfile: %s" % \
876                    certfile)
877            return None, None
878
879        try:
880            gcred = segment_commands.pg_call(self.sa_url, 
881                    'GetCredential', {}, ctxt)
882        except self.ProtoGENIError, e:
883            raise service_error(service_error.federant,
884                    "ProtoGENI: %s" % e)
885
886        return ctxt, gcred
887
888    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
889        """
890        Find a usable slice name by trying random ones until there's no
891        collision.
892        """
893
894        def random_slicename(user):
895            """
896            Return a random slicename by appending 5 letters to the username.
897            """
898            slicename = user
899            for i in range(0,5):
900                slicename += random.choice(string.ascii_letters)
901            return slicename
902
903        while True:
904            slicename = random_slicename(user)
905            try:
906                param = {
907                        'credential': gcred, 
908                        'hrn': slicename,
909                        'type': 'Slice'
910                        }
911                segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt)
912            except segment_commands.ProtoGENIError, e:
913                print e
914                break
915
916        return slicename
917
918    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
919        """
920        Create the slice and allocate resources.  If any of this stuff fails,
921        the allocations will time out on PG in short order, so we just raise
922        the service_error.  Return the slice and sliver credentials as well as
923        the manifest.
924        """
925        try:
926            param = {
927                    'credential': gcred, 
928                    'hrn': slicename,
929                    'type': 'Slice'
930                    }
931            slice_cred = segment_commands.pg_call(self.sa_url, 'Register',
932                    param, ctxt)
933            # Resolve the slice to get the URN that PG has assigned it.
934            param = {
935                    'credential': gcred,
936                    'type': 'Slice',
937                    'hrn': slicename
938                    }
939            data = segment_commands.pg_call(self.sa_url, 'Resolve', param,
940                    ctxt)
941            if 'urn' in data:
942                slice_urn = data['urn']
943            else:
944                raise service_error(service_error.federant, 
945                        "No URN returned for slice %s" % hrn)
946            # Populate the ssh keys (let PG format them)
947            param = {
948                    'credential': gcred, 
949                    }
950            keys =  segment_commands.pg_call(self.sa_url, 'GetKeys', param, 
951                    ctxt)
952            # Create a Sliver
953            param = {
954                    'credentials': [ slice_cred ],
955                    'rspec': rspec, 
956                    'keys': keys, 
957                    'slice_urn': slice_urn,
958                    }
959            sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 
960                    'CreateSliver', param, ctxt)
961        except segment_commands.ProtoGENIError, e:
962            raise service_error(service_error.federant,
963                    "ProtoGENI: %s %s" % (e.code, e))
964
965        return (slice_urn, slice_cred, sliver_cred, manifest)
966
967    def wait_for_slice(self, segment_commands, sliver_cred, slice_urn, ctxt, 
968            timeout=None):
969        """
970        Wait for the given slice to finish its startup.  Return the final
971        status.
972        """
973        completed_states = ('failed', 'ready')
974        status = 'changing'
975        if timeout is not None:
976            end = time.time() + timeout
977        try:
978            while status not in completed_states:
979                param = { 
980                        'credentials': [ sliver_cred ],
981                        'slice_urn': slice_urn,
982                        }
983                r = segment_commands.pg_call(self.cm_url,
984                        'SliverStatus', param, ctxt)
985                status = r.get('status', 'changing')
986                if status not in completed_states:
987                    if timeout is not None and time.time() > end:
988                        return 'timeout'
989                    time.sleep(30)
990        except segment_commands.ProtoGENIError, e:
991            raise service_error(service_error.federant,
992                    "ProtoGENI: %s %s" % (e.code, e))
993
994        return status
995
996    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
997        """
998        Delete the slice resources.  An error from the service is ignores,
999        because the soft state will go away anyway.
1000        """
1001        try:
1002            param = { 
1003                    'credentials': [ slice_cred, ],
1004                    'slice_urn': slice_urn,
1005                    }
1006            segment_commands.pg_call(self.cm_url, 'DeleteSlice',
1007                    param, ctxt)
1008        except segment_commands.ProtoGENIError, e:
1009            self.log.warn("ProtoGENI: %s" % e)
1010
1011
1012
1013    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1014            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1015            export_certfile, topo, connInfo, services, timeout=0):
1016        """
1017        Start a sub-experiment on a federant.
1018
1019        Get the current state, modify or create as appropriate, ship data
1020        and configs and start the experiment.  There are small ordering
1021        differences based on the initial state of the sub-experiment.
1022        """
1023
1024        # Local software dir
1025        lsoftdir = "%s/software" % tmpdir
1026        host = self.staging_host
1027
1028        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1029                certfile, certpw)
1030
1031        if not ctxt: return False
1032
1033        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1034        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1035        self.log.info("Creating %s" % slicename)
1036        slice_urn, slice_cred, sliver_cred, manifest = self.allocate_slice(
1037                segment_commands, slicename, rspec, gcred, ctxt)
1038
1039        # With manifest in hand, we can export the portal node names.
1040        if self.create_debug: nodes = self.fake_manifest(topo)
1041        else: nodes = self.manifest_to_dict(manifest)
1042
1043        self.export_store_info(export_certfile, nodes, self.ssh_port,
1044                connInfo)
1045        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1046                ename, connInfo, services, nodes)
1047
1048        # Copy software to the staging machine (done after generation to copy
1049        # those, too)
1050        for d in (tmpdir, lsoftdir):
1051            if os.path.isdir(d):
1052                for f in os.listdir(d):
1053                    if not os.path.isdir("%s/%s" % (d, f)):
1054                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1055                                user, host, "%s/%s" % (stagingdir, f)):
1056                            self.log.error("Scp failed")
1057                            return False
1058
1059
1060        # Now we wait for the nodes to start on PG
1061        status = self.wait_for_slice(segment_commands, sliver_cred, slice_urn, 
1062                ctxt, timeout=300)
1063        if status == 'failed':
1064            self.log.error('Sliver failed to start on ProtoGENI')
1065            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1066            return False
1067        elif status == 'timeout':
1068            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1069            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1070            return False
1071        else:
1072            # All good: save ProtoGENI info in shared state
1073            self.state_lock.acquire()
1074            self.allocation[aid]['slice_urn'] = slice_urn
1075            self.allocation[aid]['slice_name'] = slicename
1076            self.allocation[aid]['slice_credential'] = slice_cred
1077            self.allocation[aid]['sliver_credential'] = sliver_cred
1078            self.allocation[aid]['manifest'] = manifest
1079            self.allocation[aid]['certfile'] = certfile
1080            self.allocation[aid]['certpw'] = certpw
1081            self.write_state()
1082            self.state_lock.release()
1083
1084        # Now we have configuration to do for ProtoGENI
1085        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1086                secretkey, stagingdir, tmpdir)
1087
1088        self.start_nodes(user, self.staging_host, 
1089                [ n.get('hostname', None) for n in nodes.values()],
1090                segment_commands)
1091
1092        # Everything has gone OK.
1093        return True, dict([(k, n.get('hostname', None)) \
1094                for k, n in nodes.items()])
1095
1096    def generate_rspec(self, topo, softdir, connInfo):
1097
1098        # Force a useful image.  Without picking this the nodes can get
1099        # different images and there is great pain.
1100        def image_filter(e):
1101            if isinstance(e, topdl.Computer):
1102                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1103                        'image+emulab-ops//FEDORA10-STD" />'
1104            else:
1105                return ""
1106        # Main line of generate
1107        t = topo.clone()
1108
1109        starts = { }
1110        # Weed out the things we aren't going to instantiate: Segments, portal
1111        # substrates, and portal interfaces.  (The copy in the for loop allows
1112        # us to delete from e.elements in side the for loop).  While we're
1113        # touching all the elements, we also adjust paths from the original
1114        # testbed to local testbed paths and put the federation commands and
1115        # startcommands into a dict so we can start them manually later.
1116        # ProtoGENI requires setup before the federation commands run, so we
1117        # run them by hand after we've seeded configurations.
1118        for e in [e for e in t.elements]:
1119            if isinstance(e, topdl.Segment):
1120                t.elements.remove(e)
1121            # Fix software paths
1122            for s in getattr(e, 'software', []):
1123                s.location = re.sub("^.*/", softdir, s.location)
1124            if isinstance(e, topdl.Computer):
1125                if e.get_attribute('portal') and self.portal_startcommand:
1126                    # Portals never have a user-specified start command
1127                    starts[e.name] = self.portal_startcommand
1128                elif self.node_startcommand:
1129                    if e.get_attribute('startup'):
1130                        starts[e.name] = "%s \\$USER '%s'" % \
1131                                (self.node_startcommand, 
1132                                        e.get_attribute('startup'))
1133                        e.remove_attribute('startup')
1134                    else:
1135                        starts[e.name] = self.node_startcommand
1136
1137                # Remove portal interfaces
1138                e.interface = [i for i in e.interface \
1139                        if not i.get_attribute('portal')]
1140
1141        t.substrates = [ s.clone() for s in t.substrates ]
1142        t.incorporate_elements()
1143
1144        # Customize the rspec output to use the image we like
1145        filters = [ image_filter ]
1146
1147        # Convert to rspec and return it
1148        exp_rspec = topdl.topology_to_rspec(t, filters)
1149
1150        return exp_rspec
1151
1152    def retrieve_software(self, topo, certfile, softdir):
1153        """
1154        Collect the software that nodes in the topology need loaded and stage
1155        it locally.  This implies retrieving it from the experiment_controller
1156        and placing it into softdir.  Certfile is used to prove that this node
1157        has access to that data (it's the allocation/segment fedid).  Finally
1158        local portal and federation software is also copied to the same staging
1159        directory for simplicity - all software needed for experiment creation
1160        is in softdir.
1161        """
1162        sw = set()
1163        for e in topo.elements:
1164            for s in getattr(e, 'software', []):
1165                sw.add(s.location)
1166        os.mkdir(softdir)
1167        for s in sw:
1168            self.log.debug("Retrieving %s" % s)
1169            try:
1170                get_url(s, certfile, softdir)
1171            except:
1172                t, v, st = sys.exc_info()
1173                raise service_error(service_error.internal,
1174                        "Error retrieving %s: %s" % (s, v))
1175
1176        # Copy local portal node software to the tempdir
1177        for s in (self.portal_software, self.federation_software):
1178            for l, f in s:
1179                base = os.path.basename(f)
1180                copy_file(f, "%s/%s" % (softdir, base))
1181
1182
1183    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1184        """
1185        Gather common configuration files, retrieve or create an experiment
1186        name and project name, and return the ssh_key filenames.  Create an
1187        allocation log bound to the state log variable as well.
1188        """
1189        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1190        ename = None
1191        pubkey_base = None
1192        secretkey_base = None
1193        alloc_log = None
1194
1195        for a in attrs:
1196            if a['attribute'] in configs:
1197                try:
1198                    self.log.debug("Retrieving %s" % a['value'])
1199                    get_url(a['value'], certfile, tmpdir)
1200                except:
1201                    t, v, st = sys.exc_info()
1202                    raise service_error(service_error.internal,
1203                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1204            if a['attribute'] == 'ssh_pubkey':
1205                pubkey_base = a['value'].rpartition('/')[2]
1206            if a['attribute'] == 'ssh_secretkey':
1207                secretkey_base = a['value'].rpartition('/')[2]
1208            if a['attribute'] == 'experiment_name':
1209                ename = a['value']
1210
1211        if not ename:
1212            ename = ""
1213            for i in range(0,5):
1214                ename += random.choice(string.ascii_letters)
1215            self.log.warn("No experiment name: picked one randomly: %s" \
1216                    % ename)
1217
1218        self.state_lock.acquire()
1219        if self.allocation.has_key(aid):
1220            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1221            self.allocation[aid]['experiment'] = ename
1222            self.allocation[aid]['log'] = [ ]
1223            # Create a logger that logs to the experiment's state object as
1224            # well as to the main log file.
1225            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1226            h = logging.StreamHandler(
1227                    list_log.list_log(self.allocation[aid]['log']))
1228            # XXX: there should be a global one of these rather than
1229            # repeating the code.
1230            h.setFormatter(logging.Formatter(
1231                "%(asctime)s %(name)s %(message)s",
1232                        '%d %b %y %H:%M:%S'))
1233            alloc_log.addHandler(h)
1234            self.write_state()
1235        else:
1236            self.log.error("No allocation for %s!?" % aid)
1237        self.state_lock.release()
1238
1239        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1240                cpw, alloc_log)
1241
1242    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1243        # Copy the assigned names into the return topology
1244        rvtopo = topo.clone()
1245        embedding = [ ]
1246        for k, n in nodes.items():
1247            embedding.append({ 
1248                'toponame': k,
1249                'physname': [n ],
1250                })
1251        # Grab the log (this is some anal locking, but better safe than
1252        # sorry)
1253        self.state_lock.acquire()
1254        logv = "".join(self.allocation[aid]['log'])
1255        # It's possible that the StartSegment call gets retried (!).
1256        # if the 'started' key is in the allocation, we'll return it rather
1257        # than redo the setup.
1258        self.allocation[aid]['started'] = { 
1259                'allocID': alloc_id,
1260                'allocationLog': logv,
1261                'segmentdescription': { 
1262                    'topdldescription': rvtopo.to_dict() },
1263                'embedding': embedding,
1264                }
1265        retval = copy.deepcopy(self.allocation[aid]['started'])
1266        self.write_state()
1267        self.state_lock.release()
1268
1269        return retval
1270
1271    def StartSegment(self, req, fid):
1272        err = None  # Any service_error generated after tmpdir is created
1273        rv = None   # Return value from segment creation
1274
1275        try:
1276            req = req['StartSegmentRequestBody']
1277            topref = req['segmentdescription']['topdldescription']
1278        except KeyError:
1279            raise service_error(service_error.req, "Badly formed request")
1280
1281        connInfo = req.get('connection', [])
1282        services = req.get('service', [])
1283        auth_attr = req['allocID']['fedid']
1284        aid = "%s" % auth_attr
1285        attrs = req.get('fedAttr', [])
1286        if not self.auth.check_attribute(fid, auth_attr):
1287            raise service_error(service_error.access, "Access denied")
1288        else:
1289            # See if this is a replay of an earlier succeeded StartSegment -
1290            # sometimes SSL kills 'em.  If so, replay the response rather than
1291            # redoing the allocation.
1292            self.state_lock.acquire()
1293            retval = self.allocation[aid].get('started', None)
1294            self.state_lock.release()
1295            if retval:
1296                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1297                        "replaying response")
1298                return retval
1299
1300        if topref:
1301            topo = topdl.Topology(**topref)
1302        else:
1303            raise service_error(service_error.req, 
1304                    "Request missing segmentdescription'")
1305
1306        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1307        try:
1308            tmpdir = tempfile.mkdtemp(prefix="access-")
1309            softdir = "%s/software" % tmpdir
1310        except EnvironmentError:
1311            raise service_error(service_error.internal, "Cannot create tmp dir")
1312
1313        # Try block alllows us to clean up temporary files.
1314        try:
1315            self.retrieve_software(topo, certfile, softdir)
1316            self.configure_userconf(services, tmpdir)
1317            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1318                cpw, alloc_log = self.initialize_experiment_info(attrs,
1319                        aid, certfile, tmpdir)
1320            self.import_store_info(certfile, connInfo)
1321            rspec = self.generate_rspec(topo, "%s/%s/" \
1322                    % (self.staging_dir, ename), connInfo)
1323
1324            segment_commands = protogeni_proxy(keyfile=ssh_key,
1325                    debug=self.create_debug, log=alloc_log,
1326                    ch_url = self.ch_url, sa_url=self.sa_url,
1327                    cm_url=self.cm_url)
1328            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1329                    pubkey_base, 
1330                    secretkey_base, ename,
1331                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1332                    certfile, topo, connInfo, services)
1333        except EnvironmentError, e:
1334            err = service_error(service_error.internal, "%s" % e)
1335        except service_error, e:
1336            err = e
1337        except:
1338            t, v, st = sys.exc_info()
1339            err = service_error(service_error.internal, "%s: %s" % \
1340                    (v, traceback.extract_tb(st)))
1341
1342        # Walk up tmpdir, deleting as we go
1343        if self.cleanup: self.remove_dirs(tmpdir)
1344        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1345
1346        if rv:
1347            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1348        elif err:
1349            raise service_error(service_error.federant,
1350                    "Swapin failed: %s" % err)
1351        else:
1352            raise service_error(service_error.federant, "Swapin failed")
1353
1354    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1355            slice_urn, certfile, certpw):
1356        """
1357        Stop a sub experiment by calling swapexp on the federant
1358        """
1359        host = self.staging_host
1360        rv = False
1361        try:
1362            # Clean out tar files: we've gone over quota in the past
1363            if stagingdir:
1364                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1365            if slice_cred:
1366                self.log.error('Removing Sliver on ProtoGENI')
1367                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1368                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1369            return True
1370        except self.ssh_cmd_timeout:
1371            rv = False
1372        return rv
1373
1374    def TerminateSegment(self, req, fid):
1375        try:
1376            req = req['TerminateSegmentRequestBody']
1377        except KeyError:
1378            raise service_error(service_error.req, "Badly formed request")
1379
1380        auth_attr = req['allocID']['fedid']
1381        aid = "%s" % auth_attr
1382        attrs = req.get('fedAttr', [])
1383        if not self.auth.check_attribute(fid, auth_attr):
1384            raise service_error(service_error.access, "Access denied")
1385
1386        self.state_lock.acquire()
1387        if self.allocation.has_key(aid):
1388            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1389            slice_cred = self.allocation[aid].get('slice_credential', None)
1390            slice_urn = self.allocation[aid].get('slice_urn', None)
1391            ename = self.allocation[aid].get('experiment', None)
1392        else:
1393            cf, user, ssh_key, cpw = (None, None, None, None)
1394            slice_cred = None
1395            ename = None
1396        self.state_lock.release()
1397
1398        if ename:
1399            staging = "%s/%s" % ( self.staging_dir, ename)
1400        else:
1401            self.log.warn("Can't find experiment name for %s" % aid)
1402            staging = None
1403
1404        segment_commands = protogeni_proxy(keyfile=ssh_key,
1405                debug=self.create_debug, ch_url = self.ch_url,
1406                sa_url=self.sa_url, cm_url=self.cm_url)
1407        self.stop_segment(segment_commands, user, staging, slice_cred,
1408                slice_urn, cf, cpw)
1409        return { 'allocID': req['allocID'] }
1410
1411    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1412            certfile, certpw):
1413        """
1414        Linear code through the segment renewal calls.
1415        """
1416        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1417        try:
1418            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1419                    time.gmtime(time.time() + interval))
1420            cred = segment_commands.pg_call(self.sa_url, 'GetCredential', 
1421                    {}, ctxt)
1422
1423            param = {
1424                    'credential': scred,
1425                    'expiration': expiration
1426                    }
1427            r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt)
1428            param = {
1429                    'credential': cred,
1430                    'hrn': name,
1431                    'type': 'Slice',
1432                    }
1433            slice = segment_commands.pg_call(self.sa_url, 'Resolve', 
1434                    param, ctxt)
1435            uuid = slice.get('uuid', None)
1436            if uuid == None:
1437                sys.exit('No uuid for %s' % slicename)
1438
1439            print 'Calling GetCredential (uuid)'
1440            param = {
1441                    'credential': cred,
1442                    'uuid': uuid,
1443                    'type': 'Slice',
1444                    }
1445            new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential',
1446                    param, ctxt)
1447
1448        except segment_commands.ProtoGENIError, e:
1449            self.log.error("Failed to extend slice %s: %s" % (name, e))
1450            return None
1451        try:
1452            print 'Calling RenewSlice (CM)'
1453            param = {
1454                    'credentials': [new_scred,],
1455                    'slice_urn': slice_urn,
1456                    }
1457            r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt)
1458        except segment_commands.ProtoGENIError, e:
1459            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1460
1461        return new_scred
1462   
1463
1464    def RenewSlices(self):
1465        self.log.info("Scanning for slices to renew")
1466        self.state_lock.acquire()
1467        aids = self.allocation.keys()
1468        self.state_lock.release()
1469
1470        for aid in aids:
1471            self.state_lock.acquire()
1472            if self.allocation.has_key(aid):
1473                name = self.allocation[aid].get('slice_name', None)
1474                scred = self.allocation[aid].get('slice_credential', None)
1475                slice_urn = self.allocation[aid].get('slice_urn', None)
1476                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1477            else:
1478                name = None
1479                scred = None
1480            self.state_lock.release()
1481
1482            if not os.access(cf, os.R_OK):
1483                self.log.error(
1484                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1485                continue
1486
1487            # There's a ProtoGENI slice associated with the segment; renew it.
1488            if name and scred and slice_urn:
1489                segment_commands = protogeni_proxy(log=self.log, 
1490                        debug=self.create_debug, keyfile=ssh_key,
1491                        cm_url = self.cm_url, sa_url = self.sa_url,
1492                        ch_url = self.ch_url)
1493                new_scred = self.renew_segment(segment_commands, name, scred, 
1494                        slice_urn, self.renewal_interval, cf, cpw)
1495                if new_scred:
1496                    self.log.info("Slice %s renewed until %s GMT" % \
1497                            (name, time.asctime(time.gmtime(\
1498                                time.time()+self.renewal_interval))))
1499                    self.state_lock.acquire()
1500                    if self.allocation.has_key(aid):
1501                        self.allocation[aid]['slice_credential'] = new_scred
1502                    self.state_lock.release()
1503                else:
1504                    self.log.info("Failed to renew slice %s " % name)
1505
1506        # Let's do this all again soon.  (4 tries before the slices time out)   
1507        t = Timer(self.renewal_interval/4, self.RenewSlices)
1508        t.start()
Note: See TracBrowser for help on using the repository browser.