source: fedd/federation/protogeni_access.py @ 027b87b

axis_examplecompt_changesinfo-ops
Last change on this file since 027b87b was 027b87b, checked in by Ted Faber <faber@…>, 13 years ago

This little class added a useless complexity. While I'm in here I removed it.

  • Property mode set to 100644
File size: 46.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from fedid import fedid, generate_fedid
20from authorizer import authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28from access import access_base
29from protogeni_proxy import protogeni_proxy
30from geniapi_proxy import geniapi_proxy
31
32import topdl
33import list_log
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    def __init__(self, config=None, auth=None):
52        """
53        Initializer.  Pulls parameters out of the ConfigParser's access section.
54        """
55
56        access_base.__init__(self, config, auth)
57
58        self.domain = config.get("access", "domain")
59        self.userconfdir = config.get("access","userconfdir")
60        self.userconfcmd = config.get("access","userconfcmd")
61        self.userconfurl = config.get("access","userconfurl")
62        self.federation_software = config.get("access", "federation_software")
63        self.portal_software = config.get("access", "portal_software")
64        self.ssh_port = config.get("access","ssh_port") or "22"
65        self.sshd = config.get("access","sshd")
66        self.sshd_config = config.get("access", "sshd_config")
67        self.access_type = config.get("access", "type")
68        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
69        self.staging_host = config.get("access", "staging_host") \
70                or "ops.emulab.net"
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74   
75        self.dragon_endpoint = config.get("access", "dragon")
76        self.dragon_vlans = config.get("access", "dragon_vlans")
77        self.deter_internal = config.get("access", "deter_internal")
78
79        self.tunnel_config = config.getboolean("access", "tunnel_config")
80        self.portal_command = config.get("access", "portal_command")
81        self.portal_image = config.get("access", "portal_image")
82        self.portal_type = config.get("access", "portal_type") or "pc"
83        self.portal_startcommand = config.get("access", "portal_startcommand")
84        self.node_startcommand = config.get("access", "node_startcommand")
85
86        self.federation_software = self.software_list(self.federation_software)
87        self.portal_software = self.software_list(self.portal_software)
88        self.local_seer_software = self.software_list(self.local_seer_software)
89
90        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
91        self.renewal_interval = int(self.renewal_interval) * 60
92
93        self.ch_url = config.get("access", "ch_url")
94        self.sa_url = config.get("access", "sa_url")
95        self.cm_url = config.get("access", "cm_url")
96
97        self.restricted = [ ]
98
99        # read_state in the base_class
100        self.state_lock.acquire()
101        for a  in ('allocation', 'projects', 'keys', 'types'):
102            if a not in self.state:
103                self.state[a] = { }
104        self.allocation = self.state['allocation']
105        self.projects = self.state['projects']
106        self.keys = self.state['keys']
107        self.types = self.state['types']
108        # Add the ownership attributes to the authorizer.  Note that the
109        # indices of the allocation dict are strings, but the attributes are
110        # fedids, so there is a conversion.
111        for k in self.state.get('allocation', {}).keys():
112            for o in self.state['allocation'][k].get('owners', []):
113                self.auth.set_attribute(o, fedid(hexstr=k))
114            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
115
116        self.state_lock.release()
117
118
119        self.log = logging.getLogger("fedd.access")
120        set_log_level(config, "access", self.log)
121
122        self.access = { }
123        if config.has_option("access", "accessdb"):
124            self.read_access(config.get("access", "accessdb"), 
125                    access_obj=self.make_access_info)
126
127        self.lookup_access = self.lookup_access_base
128
129        api = config.get("access", "api") or "protogeni"
130        if api == "protogeni":
131            self.api_proxy = protogeni_proxy
132        elif api == "geniapi":
133            self.api_proxy = geniapi_proxy
134        else:
135            self.log.debug("Unknown interface, using protogeni")
136            self.api_proxy = protogeni_proxy
137
138        self.call_SetValue = service_caller('SetValue')
139        self.call_GetValue = service_caller('GetValue')
140        self.exports = {
141                'local_seer_control': self.export_local_seer,
142                'seer_master': self.export_seer_master,
143                'hide_hosts': self.export_hide_hosts,
144                }
145
146        if not self.local_seer_image or not self.local_seer_software or \
147                not self.local_seer_start:
148            if 'local_seer_control' in self.exports:
149                del self.exports['local_seer_control']
150
151        if not self.local_seer_image or not self.local_seer_software or \
152                not self.seer_master_start:
153            if 'seer_master' in self.exports:
154                del self.exports['seer_master']
155
156        self.RenewSlices()
157
158        self.soap_services = {\
159            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
160            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
161            'StartSegment': soap_handler("StartSegment", self.StartSegment),
162            'TerminateSegment': soap_handler("TerminateSegment", 
163                self.TerminateSegment),
164            }
165        self.xmlrpc_services =  {\
166            'RequestAccess': xmlrpc_handler('RequestAccess',
167                self.RequestAccess),
168            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
169                self.ReleaseAccess),
170            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
171            'TerminateSegment': xmlrpc_handler('TerminateSegment',
172                self.TerminateSegment),
173            }
174
175    @staticmethod
176    def make_access_info(s):
177        """
178        Split a string of the form (id, id, id, id) ito its constituent tuples
179        and return them as a tuple.  Use to import access info from the
180        access_db.
181        """
182
183        ss = s.strip()
184        if ss.startswith('(') and ss.endswith(')'):
185            l = [ s.strip() for s  in ss[1:-1].split(",")]
186            if len(l) == 4:
187                return tuple(l)
188            else:
189                raise self.parse_error(
190                        "Exactly 4 elements in access info required")
191        else:
192            raise self.parse_error("Expecting parenthezied values")
193
194
195    def get_handler(self, path, fid):
196        self.log.info("Get handler %s %s" % (path, fid))
197        if self.auth.check_attribute(fid, path) and self.userconfdir:
198            return ("%s/%s" % (self.userconfdir, path), "application/binary")
199        else:
200            return (None, None)
201
202    def build_access_response(self, alloc_id, services):
203        """
204        Create the SOAP response.
205
206        Build the dictionary description of the response and use
207        fedd_utils.pack_soap to create the soap message.  ap is the allocate
208        project message returned from a remote project allocation (even if that
209        allocation was done locally).
210        """
211        # Because alloc_id is already a fedd_services_types.IDType_Holder,
212        # there's no need to repack it
213        msg = { 
214                'allocID': alloc_id,
215                'fedAttr': [
216                    { 'attribute': 'domain', 'value': self.domain } , 
217                ]
218            }
219        if self.dragon_endpoint:
220            msg['fedAttr'].append({'attribute': 'dragon',
221                'value': self.dragon_endpoint})
222        if self.deter_internal:
223            msg['fedAttr'].append({'attribute': 'deter_internal',
224                'value': self.deter_internal})
225        #XXX: ??
226        if self.dragon_vlans:
227            msg['fedAttr'].append({'attribute': 'vlans',
228                'value': self.dragon_vlans})
229
230        if services:
231            msg['service'] = services
232        return msg
233
234    def RequestAccess(self, req, fid):
235        """
236        Handle the access request.
237        """
238
239        # The dance to get into the request body
240        if req.has_key('RequestAccessRequestBody'):
241            req = req['RequestAccessRequestBody']
242        else:
243            raise service_error(service_error.req, "No request!?")
244
245        if req.has_key('destinationTestbed'):
246            dt = unpack_id(req['destinationTestbed'])
247
248        # Request for this fedd
249        found, match = self.lookup_access(req, fid)
250        services, svc_state = self.export_services(req.get('service',[]),
251                None, None)
252        # keep track of what's been added
253        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
254        aid = unicode(allocID)
255
256        self.state_lock.acquire()
257        self.allocation[aid] = { }
258        # The protoGENI certificate
259        self.allocation[aid]['credentials'] = found
260        # The list of owner FIDs
261        self.allocation[aid]['owners'] = [ fid ]
262        self.write_state()
263        self.state_lock.release()
264        self.auth.set_attribute(fid, allocID)
265        self.auth.set_attribute(allocID, allocID)
266
267        try:
268            f = open("%s/%s.pem" % (self.certdir, aid), "w")
269            print >>f, alloc_cert
270            f.close()
271        except EnvironmentError, e:
272            raise service_error(service_error.internal, 
273                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
274        return self.build_access_response({ 'fedid': allocID }, None)
275
276
277    def ReleaseAccess(self, req, fid):
278        # The dance to get into the request body
279        if req.has_key('ReleaseAccessRequestBody'):
280            req = req['ReleaseAccessRequestBody']
281        else:
282            raise service_error(service_error.req, "No request!?")
283
284        # Local request
285        try:
286            if req['allocID'].has_key('localname'):
287                auth_attr = aid = req['allocID']['localname']
288            elif req['allocID'].has_key('fedid'):
289                aid = unicode(req['allocID']['fedid'])
290                auth_attr = req['allocID']['fedid']
291            else:
292                raise service_error(service_error.req,
293                        "Only localnames and fedids are understood")
294        except KeyError:
295            raise service_error(service_error.req, "Badly formed request")
296
297        self.log.debug("[access] deallocation requested for %s", aid)
298        if not self.auth.check_attribute(fid, auth_attr):
299            self.log.debug("[access] deallocation denied for %s", aid)
300            raise service_error(service_error.access, "Access Denied")
301
302        self.state_lock.acquire()
303        if self.allocation.has_key(aid):
304            self.log.debug("Found allocation for %s" %aid)
305            del self.allocation[aid]
306            self.write_state()
307            self.state_lock.release()
308            # And remove the access cert
309            cf = "%s/%s.pem" % (self.certdir, aid)
310            self.log.debug("Removing %s" % cf)
311            os.remove(cf)
312            return { 'allocID': req['allocID'] } 
313        else:
314            self.state_lock.release()
315            raise service_error(service_error.req, "No such allocation")
316
317    def manifest_to_dict(self, manifest, ignore_debug=False):
318        """
319        Turn the manifest into a dict were each virtual nodename (i.e. the
320        topdl name) has an entry with the allocated machine in hostname and the
321        interfaces in 'interfaces'.  I love having XML parser code lying
322        around.
323        """
324        if self.create_debug and not ignore_debug: 
325            self.log.debug("Returning null manifest dict")
326            return { }
327
328        # The class allows us to keep a little state - the dict under
329        # consteruction and the current entry in that dict for the interface
330        # element code.
331        class manifest_parser:
332            def __init__(self):
333                self.d = { }
334                self.current_key=None
335
336            # If the element is a node, create a dict entry for it.  If it's an
337            # interface inside a node, add an entry in the interfaces list with
338            # the virtual name and component id.
339            def start_element(self, name, attrs):
340                if name == 'node':
341                    self.current_key = attrs.get('virtual_id',"")
342                    if self.current_key:
343                        self.d[self.current_key] = {
344                                'hostname': attrs.get('hostname', None),
345                                'interfaces': { },
346                                'mac': { }
347                                }
348                elif name == 'interface' and self.current_key:
349                    self.d[self.current_key]['interfaces']\
350                            [attrs.get('virtual_id','')] = \
351                            attrs.get('component_id', None)
352                elif name == 'interface_ref':
353                    # Collect mac address information from an interface_ref.
354                    # These appear after the node info has been parsed.
355                    nid = attrs.get('virtual_node_id', None)
356                    ifid = attrs.get('virtual_interface_id', None)
357                    mac = attrs.get('MAC', None)
358                    self.d[nid]['mac'][ifid] = mac
359            #  When a node is finished, clear current_key
360            def end_element(self, name):
361                if name == 'node': self.current_key = None
362
363        node = { }
364
365        mp = manifest_parser()
366        p = xml.parsers.expat.ParserCreate()
367        # These are bound to the class we just created
368        p.StartElementHandler = mp.start_element
369        p.EndElementHandler = mp.end_element
370
371        p.Parse(manifest)
372        # Make the node dict that the callers expect
373        for k in mp.d:
374            node[k] = mp.d.get('hostname', '')
375        return mp.d
376
377    def fake_manifest(self, topo):
378        """
379        Fake the output of manifest_to_dict with a bunch of generic node an
380        interface names, for debugging.
381        """
382        node = { }
383        for i, e in enumerate([ e for e in topo.elements \
384                if isinstance(e, topdl.Computer)]):
385            node[e.name] = { 
386                    'hostname': "node%03d" % i,
387                    'interfaces': { }
388                    }
389            for j, inf in enumerate(e.interface):
390                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
391
392        return node
393
394
395    def generate_portal_configs(self, topo, pubkey_base, 
396            secretkey_base, tmpdir, leid, connInfo, services, nodes):
397
398        def conninfo_to_dict(key, info):
399            """
400            Make a cpoy of the connection information about key, and flatten it
401            into a single dict by parsing out any feddAttrs.
402            """
403
404            rv = None
405            for i in info:
406                if key == i.get('portal', "") or \
407                        key in [e.get('element', "") \
408                        for e in i.get('member', [])]:
409                    rv = i.copy()
410                    break
411
412            else:
413                return rv
414
415            if 'fedAttr' in rv:
416                for a in rv['fedAttr']:
417                    attr = a.get('attribute', "")
418                    val = a.get('value', "")
419                    if attr and attr not in rv:
420                        rv[attr] = val
421                del rv['fedAttr']
422            return rv
423
424        # XXX: un hardcode this
425        def client_null(f, s):
426            print >>f, "Service: %s" % s['name']
427       
428        def client_seer_master(f, s):
429            print >>f, 'PortalAlias: seer-master'
430
431        def client_smb(f, s):
432            print >>f, "Service: %s" % s['name']
433            smbshare = None
434            smbuser = None
435            smbproj = None
436            for a in s.get('fedAttr', []):
437                if a.get('attribute', '') == 'SMBSHARE':
438                    smbshare = a.get('value', None)
439                elif a.get('attribute', '') == 'SMBUSER':
440                    smbuser = a.get('value', None)
441                elif a.get('attribute', '') == 'SMBPROJ':
442                    smbproj = a.get('value', None)
443
444            if all((smbshare, smbuser, smbproj)):
445                print >>f, "SMBshare: %s" % smbshare
446                print >>f, "ProjectUser: %s" % smbuser
447                print >>f, "ProjectName: %s" % smbproj
448
449        def client_hide_hosts(f, s):
450            for a in s.get('fedAttr', [ ]):
451                if a.get('attribute', "") == 'hosts':
452                    print >>f, 'Hide: %s' % a.get('value', "")
453
454        client_service_out = {
455                'SMB': client_smb,
456                'tmcd': client_null,
457                'seer': client_null,
458                'userconfig': client_null,
459                'project_export': client_null,
460                'seer_master': client_seer_master,
461                'hide_hosts': client_hide_hosts,
462            }
463
464        def client_seer_master_export(f, s):
465            print >>f, "AddedNode: seer-master"
466
467        def client_seer_local_export(f, s):
468            print >>f, "AddedNode: control"
469
470        client_export_service_out = {
471                'seer_master': client_seer_master_export,
472                'local_seer_control': client_seer_local_export,
473            }
474
475        def server_port(f, s):
476            p = urlparse(s.get('server', 'http://localhost'))
477            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
478
479        def server_null(f,s): pass
480
481        def server_seer(f, s):
482            print >>f, 'seer: true'
483
484        server_service_out = {
485                'SMB': server_port,
486                'tmcd': server_port,
487                'userconfig': server_null,
488                'project_export': server_null,
489                'seer': server_seer,
490                'seer_master': server_port,
491                'hide_hosts': server_null,
492            }
493        # XXX: end un hardcode this
494
495
496        seer_out = False
497        client_out = False
498        for e in [ e for e in topo.elements \
499                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
500            myname = e.name
501            type = e.get_attribute('portal_type')
502
503            info = conninfo_to_dict(myname, connInfo)
504
505            if not info:
506                raise service_error(service_error.req,
507                        "No connectivity info for %s" % myname)
508
509            # Translate to physical name (ProtoGENI doesn't have DNS)
510            physname = nodes.get(myname, { }).get('hostname', None)
511            peer = info.get('peer', "")
512            ldomain = self.domain
513            ssh_port = info.get('ssh_port', 22)
514
515            # Collect this for the client.conf file
516            if 'masterexperiment' in info:
517                mproj, meid = info['masterexperiment'].split("/", 1)
518
519            active = info.get('active', 'False')
520
521            if type in ('control', 'both'):
522                testbed = e.get_attribute('testbed')
523                control_gw = myname
524
525            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
526            tunnelconfig = self.tunnel_config
527            try:
528                f = open(cfn, "w")
529                if active == 'True':
530                    print >>f, "active: True"
531                    print >>f, "ssh_port: %s" % ssh_port
532                    if type in ('control', 'both'):
533                        for s in [s for s in services \
534                                if s.get('name', "") in self.imports]:
535                            server_service_out[s['name']](f, s)
536
537                if tunnelconfig:
538                    print >>f, "tunnelip: %s" % tunnelconfig
539                print >>f, "peer: %s" % peer.lower()
540                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
541                        pubkey_base
542                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
543                        secretkey_base
544                f.close()
545            except EnvironmentError, e:
546                raise service_error(service_error.internal,
547                        "Can't write protal config %s: %s" % (cfn, e))
548
549        # Done with portals, write the client config file.
550        try:
551            f = open("%s/client.conf" % tmpdir, "w")
552            if control_gw:
553                print >>f, "ControlGateway: %s" % physname.lower()
554            for s in services:
555                if s.get('name',"") in self.imports and \
556                        s.get('visibility','') == 'import':
557                    client_service_out[s['name']](f, s)
558                if s.get('name', '') in self.exports and \
559                        s.get('visibility', '') == 'export' and \
560                        s['name'] in client_export_service_out:
561                    client_export_service_out[s['name']](f, s)
562            # Seer uses this.
563            if mproj and meid:
564                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
565            f.close()
566        except EnvironmentError, e:
567            raise service_error(service_error.internal,
568                    "Cannot write client.conf: %s" %s)
569
570
571
572    def export_store_info(self, cf, nodes, ssh_port, connInfo):
573        """
574        For the export requests in the connection info, install the peer names
575        at the experiment controller via SetValue calls.
576        """
577
578        for c in connInfo:
579            for p in [ p for p in c.get('parameter', []) \
580                    if p.get('type', '') == 'output']:
581
582                if p.get('name', '') == 'peer':
583                    k = p.get('key', None)
584                    surl = p.get('store', None)
585                    if surl and k and k.index('/') != -1:
586                        if self.create_debug:
587                            req = { 'name': k, 'value': 'debug' }
588                            self.call_SetValue(surl, req, cf)
589                        else:
590                            n = nodes.get(k[k.index('/')+1:], { })
591                            value = n.get('hostname', None)
592                            if value:
593                                req = { 'name': k, 'value': value }
594                                self.call_SetValue(surl, req, cf)
595                            else:
596                                self.log.error("No hostname for %s" % \
597                                        k[k.index('/'):])
598                    else:
599                        self.log.error("Bad export request: %s" % p)
600                elif p.get('name', '') == 'ssh_port':
601                    k = p.get('key', None)
602                    surl = p.get('store', None)
603                    if surl and k:
604                        req = { 'name': k, 'value': ssh_port }
605                        self.call_SetValue(surl, req, cf)
606                    else:
607                        self.log.error("Bad export request: %s" % p)
608                else:
609
610                    self.log.error("Unknown export parameter: %s" % \
611                            p.get('name'))
612                    continue
613
614    def write_node_config_script(self, elem, node, user, pubkey,
615            secretkey, stagingdir, tmpdir):
616        """
617        Write out the configuration script that is to run on the node
618        represented by elem in the topology.  This is called
619        once per node to configure.
620        """
621        # These little functions/functors just make things more readable.  Each
622        # one encapsulates a small task of copying software files or installing
623        # them.
624        class stage_file_type:
625            """
626            Write code copying file sfrom the staging host to the host on which
627            this will run.
628            """
629            def __init__(self, user, host, stagingdir):
630                self.user = user
631                self.host = host
632                self.stagingdir = stagingdir
633                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
634                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
635
636            def __call__(self, script, file, dest="."):
637                # If the file is a full pathname, do not use stagingdir
638                if file.find('/') == -1:
639                    file = "%s/%s" % (self.stagingdir, file)
640                print >>script, "%s %s@%s:%s %s" % \
641                        (self.scp, self.user, self.host, file, dest)
642
643        def install_tar(script, loc, base):
644            """
645            Print code to script to install a tarfile in loc.
646            """
647            tar = "/bin/tar"
648            mkdir="/bin/mkdir"
649
650            print >>script, "%s -p %s" % (mkdir, loc)
651            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
652
653        def install_rpm(script, base):
654            """
655            Print code to script to install an rpm
656            """
657            rpm = "/bin/rpm"
658            print >>script, "%s --install %s" % (rpm, base)
659
660        ifconfig = "/sbin/ifconfig"
661        findif = '/usr/local/etc/emulab/findif'
662        stage_file = stage_file_type(user, self.staging_host, stagingdir)
663        pname = node.get('hostname', None)
664        fed_dir = "/usr/local/federation"
665        fed_etc_dir = "%s/etc" % fed_dir
666        fed_bin_dir = "%s/bin" % fed_dir
667        fed_lib_dir = "%s/lib" % fed_dir
668
669        if pname:
670            sfile = "%s/%s.startup" % (tmpdir, pname)
671            script = open(sfile, "w")
672            # Reset the interfaces to the ones in the topo file
673            for i in [ i for i in elem.interface \
674                    if not i.get_attribute('portal')]:
675                pinf = node['interfaces'].get(i.name, None)
676                pmac = node['mac'].get(i.name, None)
677                addr = i.get_attribute('ip4_address') 
678                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
679                # The interface names in manifests are not to be trusted, so we
680                # find the interface to configure using the local node's script
681                # to match mac address to interface name.
682                if pinf and addr and pmac:
683                    print >>script, '# %s' % pinf
684                    print >>script, \
685                            "%s `%s %s` %s netmask %s"  % \
686                            (ifconfig, findif, pmac, addr, netmask)
687                else:
688                    self.log.error("Missing interface or address for %s" \
689                            % i.name)
690               
691            for l, f in self.federation_software:
692                base = os.path.basename(f)
693                stage_file(script, base)
694                if l: install_tar(script, l, base)
695                else: install_rpm(script, base)
696
697            for s in elem.software:
698                s_base = s.location.rpartition('/')[2]
699                stage_file(script, s_base)
700                if s.install: install_tar(script, s.install, s_base)
701                else: install_rpm(script, s_base)
702
703            for f in ('hosts', pubkey, secretkey, 'client.conf', 
704                    'userconf'):
705                stage_file(script, f, fed_etc_dir)
706            if self.sshd:
707                stage_file(script, self.sshd, fed_bin_dir)
708            if self.sshd_config:
709                stage_file(script, self.sshd_config, fed_etc_dir)
710
711            # Look in tmpdir to get the names.  They've all been copied
712            # into the (remote) staging dir
713            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
714                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
715
716            # Done with staging, remove the identity used to stage
717            print >>script, "#/bin/rm .ssh/id_rsa"
718
719            # Start commands
720            if elem.get_attribute('portal') and self.portal_startcommand:
721                # Install portal software
722                for l, f in self.portal_software:
723                    base = os.path.basename(f)
724                    stage_file(script, base)
725                    if l: install_tar(script, l, base)
726                    else: install_rpm(script, base)
727
728                # Portals never have a user-specified start command
729                print >>script, self.portal_startcommand
730            elif self.node_startcommand:
731                # XXX: debug
732                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
733                # XXX: debug
734                if elem.get_attribute('startup'):
735                    print >>script, "%s \\$USER '%s'" % \
736                            (self.node_startcommand,
737                                    elem.get_attribute('startup'))
738                else:
739                    print >>script, self.node_startcommand
740            script.close()
741            return sfile, pname
742        else:
743            return None, None
744
745
746    def configure_nodes(self, segment_commands, topo, nodes, user, 
747            pubkey, secretkey, stagingdir, tmpdir):
748        """
749        For each node in the topology, generate a script file that copies
750        software onto it and installs it in the proper places and then runs the
751        startup command (including the federation commands.
752        """
753
754
755
756        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
757            vname = e.name
758            sfile, pname = self.write_node_config_script(e,
759                    nodes.get(vname, { }),
760                    user, pubkey, secretkey, stagingdir, tmpdir)
761            if sfile:
762                if not segment_commands.scp_file(sfile, user, pname):
763                    self.log.error("Could not copy script to %s" % pname)
764            else:
765                self.log.error("Unmapped node: %s" % vname)
766
767    def start_node(self, user, host, node, segment_commands):
768        """
769        Copy an identity to a node for the configuration script to be able to
770        import data and then run the startup script remotely.
771        """
772        # Place an identity on the node so that the copying can succeed
773        segment_commands.scp_file( segment_commands.ssh_privkey_file,
774                user, node, ".ssh/id_rsa")
775        segment_commands.ssh_cmd(user, node, 
776                "sudo /bin/sh ./%s.startup &" % node)
777
778    def start_nodes(self, user, host, nodes, segment_commands):
779        """
780        Start a thread to initialize each node and wait for them to complete.
781        Each thread runs start_node.
782        """
783        threads = [ ]
784        for n in nodes:
785            t = Thread(target=self.start_node, args=(user, host, n, 
786                segment_commands))
787            t.start()
788            threads.append(t)
789
790        done = [not t.isAlive() for t in threads]
791        while not all(done):
792            self.log.info("Waiting for threads %s" % done)
793            time.sleep(10)
794            done = [not t.isAlive() for t in threads]
795
796    def set_up_staging_filespace(self, segment_commands, user, host,
797            stagingdir):
798        """
799        Set up teh staging area on the staging machine.  To reduce the number
800        of ssh commands, we compose a script and execute it remotely.
801        """
802
803        self.log.info("[start_segment]: creating script file")
804        try:
805            sf, scriptname = tempfile.mkstemp()
806            scriptfile = os.fdopen(sf, 'w')
807        except EnvironmentError:
808            return False
809
810        scriptbase = os.path.basename(scriptname)
811
812        # Script the filesystem changes
813        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
814        print >>scriptfile, 'mkdir -p %s' % stagingdir
815        print >>scriptfile, "rm -f %s" % scriptbase
816        scriptfile.close()
817
818        # Move the script to the remote machine
819        # XXX: could collide tempfile names on the remote host
820        if segment_commands.scp_file(scriptname, user, host, scriptbase):
821            os.remove(scriptname)
822        else:
823            return False
824
825        # Execute the script (and the script's last line deletes it)
826        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
827            return False
828
829    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
830        """
831        Protogeni interactions take a context and a protogeni certificate.
832        This establishes both for later calls and returns them.
833        """
834        if os.access(certfile, os.R_OK):
835            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
836        else:
837            self.log.error("[start_segment]: Cannot read certfile: %s" % \
838                    certfile)
839            return None, None
840
841        try:
842            gcred = segment_commands.slice_authority_call('GetCredential', 
843                    {}, ctxt)
844        except segment_commands.ProtoGENIError, e:
845            raise service_error(service_error.federant,
846                    "ProtoGENI: %s" % e)
847
848        return ctxt, gcred
849
850    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
851        """
852        Find a usable slice name by trying random ones until there's no
853        collision.
854        """
855
856        def random_slicename(user):
857            """
858            Return a random slicename by appending 5 letters to the username.
859            """
860            slicename = user
861            for i in range(0,5):
862                slicename += random.choice(string.ascii_letters)
863            return slicename
864
865        while True:
866            slicename = random_slicename(user)
867            try:
868                param = {
869                        'credential': gcred, 
870                        'hrn': slicename,
871                        'type': 'Slice'
872                        }
873                segment_commands.slice_authority_call('Resolve', param, ctxt)
874            except segment_commands.ProtoGENIError, e:
875                print e
876                break
877
878        return slicename
879
880    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
881        """
882        Create the slice and allocate resources.  If any of this stuff fails,
883        the allocations will time out on PG in short order, so we just raise
884        the service_error.  Return the slice and sliver credentials as well as
885        the manifest.
886        """
887        try:
888            param = {
889                    'credential': gcred, 
890                    'hrn': slicename,
891                    'type': 'Slice'
892                    }
893            slice_cred = segment_commands.slice_authority_call('Register',
894                    param, ctxt)
895            # Resolve the slice to get the URN that PG has assigned it.
896            param = {
897                    'credential': gcred,
898                    'type': 'Slice',
899                    'hrn': slicename
900                    }
901            data = segment_commands.slice_authority_call('Resolve', param,
902                    ctxt)
903            if 'urn' in data:
904                slice_urn = data['urn']
905            else:
906                raise service_error(service_error.federant, 
907                        "No URN returned for slice %s" % hrn)
908
909            if 'creator_urn' in data:
910                creator_urn = data['creator_urn']
911            else:
912                raise service_error(service_error.federant, 
913                        "No creator URN returned for slice %s" % hrn)
914            # Populate the ssh keys (let PG format them)
915            param = {
916                    'credential': gcred, 
917                    }
918            keys =  segment_commands.slice_authority_call('GetKeys', param, 
919                    ctxt)
920            # Create a Sliver
921            param = {
922                    'credentials': [ slice_cred ],
923                    'rspec': rspec, 
924                    'users': [ {
925                        'urn': creator_urn,
926                        'keys': keys, 
927                        },
928                    ],
929                    'slice_urn': slice_urn,
930                    }
931            rv = segment_commands.component_manager_call(
932                    'CreateSliver', param, ctxt)
933
934            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
935            # API hands back a sliver credential.  This just finds the format
936            # of choice.
937            if isinstance(rv, tuple):
938                manifest = rv[1]
939            else:
940                manifest = rv
941        except segment_commands.ProtoGENIError, e:
942            raise service_error(service_error.federant,
943                    "ProtoGENI: %s %s" % (e.code, e))
944
945        return (slice_urn, slice_cred, manifest, rspec)
946
947    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
948            timeout=None):
949        """
950        Wait for the given slice to finish its startup.  Return the final
951        status.
952        """
953        completed_states = ('failed', 'ready')
954        status = 'changing'
955        if timeout is not None:
956            end = time.time() + timeout
957        try:
958            while status not in completed_states:
959                param = { 
960                        'credentials': [ slice_cred ],
961                        'slice_urn': slice_urn,
962                        }
963                r = segment_commands.component_manager_call(
964                        'SliverStatus', param, ctxt)
965                # GENIAPI uses geni_status as the key, so check for both
966                status = r.get('status', r.get('geni_status','changing'))
967                if status not in completed_states:
968                    if timeout is not None and time.time() > end:
969                        return 'timeout'
970                    time.sleep(30)
971        except segment_commands.ProtoGENIError, e:
972            raise service_error(service_error.federant,
973                    "ProtoGENI: %s %s" % (e.code, e))
974
975        return status
976
977    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
978        """
979        Delete the slice resources.  An error from the service is ignores,
980        because the soft state will go away anyway.
981        """
982        try:
983            param = { 
984                    'credentials': [ slice_cred, ],
985                    'slice_urn': slice_urn,
986                    }
987            segment_commands.component_manager_call('DeleteSlice',
988                    param, ctxt)
989        except segment_commands.ProtoGENIError, e:
990            self.log.warn("ProtoGENI: %s" % e)
991
992
993
994    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
995            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
996            export_certfile, topo, connInfo, services, timeout=0):
997        """
998        Start a sub-experiment on a federant.
999
1000        Get the current state, modify or create as appropriate, ship data
1001        and configs and start the experiment.  There are small ordering
1002        differences based on the initial state of the sub-experiment.
1003        """
1004
1005        # Local software dir
1006        lsoftdir = "%s/software" % tmpdir
1007        host = self.staging_host
1008
1009        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1010                certfile, certpw)
1011
1012        if not ctxt: return False, {}
1013
1014        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1015        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1016        self.log.info("Creating %s" % slicename)
1017        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1018                segment_commands, slicename, rspec, gcred, ctxt)
1019
1020        # With manifest in hand, we can export the portal node names.
1021        if self.create_debug: nodes = self.fake_manifest(topo)
1022        else: nodes = self.manifest_to_dict(manifest)
1023
1024        self.export_store_info(export_certfile, nodes, self.ssh_port,
1025                connInfo)
1026        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1027                ename, connInfo, services, nodes)
1028
1029        # Copy software to the staging machine (done after generation to copy
1030        # those, too)
1031        for d in (tmpdir, lsoftdir):
1032            if os.path.isdir(d):
1033                for f in os.listdir(d):
1034                    if not os.path.isdir("%s/%s" % (d, f)):
1035                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1036                                user, host, "%s/%s" % (stagingdir, f)):
1037                            self.log.error("Scp failed")
1038                            return False, {}
1039
1040
1041        # Now we wait for the nodes to start on PG
1042        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1043                ctxt, timeout=300)
1044        if status == 'failed':
1045            self.log.error('Sliver failed to start on ProtoGENI')
1046            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1047            return False, {}
1048        elif status == 'timeout':
1049            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1050            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1051            return False, {}
1052        else:
1053            # All good: save ProtoGENI info in shared state
1054            self.state_lock.acquire()
1055            self.allocation[aid]['slice_urn'] = slice_urn
1056            self.allocation[aid]['slice_name'] = slicename
1057            self.allocation[aid]['slice_credential'] = slice_cred
1058            self.allocation[aid]['manifest'] = manifest
1059            self.allocation[aid]['rspec'] = rspec
1060            self.allocation[aid]['certfile'] = certfile
1061            self.allocation[aid]['certpw'] = certpw
1062            self.write_state()
1063            self.state_lock.release()
1064
1065        # Now we have configuration to do for ProtoGENI
1066        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1067                secretkey, stagingdir, tmpdir)
1068
1069        self.start_nodes(user, self.staging_host, 
1070                [ n.get('hostname', None) for n in nodes.values()],
1071                segment_commands)
1072
1073        # Everything has gone OK.
1074        return True, dict([(k, n.get('hostname', None)) \
1075                for k, n in nodes.items()])
1076
1077    def generate_rspec(self, topo, softdir, connInfo):
1078
1079        # Force a useful image.  Without picking this the nodes can get
1080        # different images and there is great pain.
1081        def image_filter(e):
1082            if isinstance(e, topdl.Computer):
1083                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1084                        'image+emulab-ops//FEDORA10-STD" />'
1085            else:
1086                return ""
1087        # Main line of generate
1088        t = topo.clone()
1089
1090        starts = { }
1091        # Weed out the things we aren't going to instantiate: Segments, portal
1092        # substrates, and portal interfaces.  (The copy in the for loop allows
1093        # us to delete from e.elements in side the for loop).  While we're
1094        # touching all the elements, we also adjust paths from the original
1095        # testbed to local testbed paths and put the federation commands and
1096        # startcommands into a dict so we can start them manually later.
1097        # ProtoGENI requires setup before the federation commands run, so we
1098        # run them by hand after we've seeded configurations.
1099        for e in [e for e in t.elements]:
1100            if isinstance(e, topdl.Segment):
1101                t.elements.remove(e)
1102            # Fix software paths
1103            for s in getattr(e, 'software', []):
1104                s.location = re.sub("^.*/", softdir, s.location)
1105            if isinstance(e, topdl.Computer):
1106                if e.get_attribute('portal') and self.portal_startcommand:
1107                    # Portals never have a user-specified start command
1108                    starts[e.name] = self.portal_startcommand
1109                elif self.node_startcommand:
1110                    if e.get_attribute('startup'):
1111                        starts[e.name] = "%s \\$USER '%s'" % \
1112                                (self.node_startcommand, 
1113                                        e.get_attribute('startup'))
1114                        e.remove_attribute('startup')
1115                    else:
1116                        starts[e.name] = self.node_startcommand
1117
1118                # Remove portal interfaces
1119                e.interface = [i for i in e.interface \
1120                        if not i.get_attribute('portal')]
1121
1122        t.substrates = [ s.clone() for s in t.substrates ]
1123        t.incorporate_elements()
1124
1125        # Customize the rspec output to use the image we like
1126        filters = [ image_filter ]
1127
1128        # Convert to rspec and return it
1129        exp_rspec = topdl.topology_to_rspec(t, filters)
1130
1131        return exp_rspec
1132
1133    def retrieve_software(self, topo, certfile, softdir):
1134        """
1135        Collect the software that nodes in the topology need loaded and stage
1136        it locally.  This implies retrieving it from the experiment_controller
1137        and placing it into softdir.  Certfile is used to prove that this node
1138        has access to that data (it's the allocation/segment fedid).  Finally
1139        local portal and federation software is also copied to the same staging
1140        directory for simplicity - all software needed for experiment creation
1141        is in softdir.
1142        """
1143        sw = set()
1144        for e in topo.elements:
1145            for s in getattr(e, 'software', []):
1146                sw.add(s.location)
1147        os.mkdir(softdir)
1148        for s in sw:
1149            self.log.debug("Retrieving %s" % s)
1150            try:
1151                get_url(s, certfile, softdir)
1152            except:
1153                t, v, st = sys.exc_info()
1154                raise service_error(service_error.internal,
1155                        "Error retrieving %s: %s" % (s, v))
1156
1157        # Copy local portal node software to the tempdir
1158        for s in (self.portal_software, self.federation_software):
1159            for l, f in s:
1160                base = os.path.basename(f)
1161                copy_file(f, "%s/%s" % (softdir, base))
1162
1163
1164    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1165        """
1166        Gather common configuration files, retrieve or create an experiment
1167        name and project name, and return the ssh_key filenames.  Create an
1168        allocation log bound to the state log variable as well.
1169        """
1170        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1171        ename = None
1172        pubkey_base = None
1173        secretkey_base = None
1174        alloc_log = None
1175
1176        for a in attrs:
1177            if a['attribute'] in configs:
1178                try:
1179                    self.log.debug("Retrieving %s" % a['value'])
1180                    get_url(a['value'], certfile, tmpdir)
1181                except:
1182                    t, v, st = sys.exc_info()
1183                    raise service_error(service_error.internal,
1184                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1185            if a['attribute'] == 'ssh_pubkey':
1186                pubkey_base = a['value'].rpartition('/')[2]
1187            if a['attribute'] == 'ssh_secretkey':
1188                secretkey_base = a['value'].rpartition('/')[2]
1189            if a['attribute'] == 'experiment_name':
1190                ename = a['value']
1191
1192        if not ename:
1193            ename = ""
1194            for i in range(0,5):
1195                ename += random.choice(string.ascii_letters)
1196            self.log.warn("No experiment name: picked one randomly: %s" \
1197                    % ename)
1198
1199        self.state_lock.acquire()
1200        if self.allocation.has_key(aid):
1201            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1202            self.allocation[aid]['experiment'] = ename
1203            self.allocation[aid]['log'] = [ ]
1204            # Create a logger that logs to the experiment's state object as
1205            # well as to the main log file.
1206            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1207            h = logging.StreamHandler(
1208                    list_log.list_log(self.allocation[aid]['log']))
1209            # XXX: there should be a global one of these rather than
1210            # repeating the code.
1211            h.setFormatter(logging.Formatter(
1212                "%(asctime)s %(name)s %(message)s",
1213                        '%d %b %y %H:%M:%S'))
1214            alloc_log.addHandler(h)
1215            self.write_state()
1216        else:
1217            self.log.error("No allocation for %s!?" % aid)
1218        self.state_lock.release()
1219
1220        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1221                cpw, alloc_log)
1222
1223    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1224        # Copy the assigned names into the return topology
1225        rvtopo = topo.clone()
1226        embedding = [ ]
1227        for k, n in nodes.items():
1228            embedding.append({ 
1229                'toponame': k,
1230                'physname': [n ],
1231                })
1232        # Grab the log (this is some anal locking, but better safe than
1233        # sorry)
1234        self.state_lock.acquire()
1235        logv = "".join(self.allocation[aid]['log'])
1236        # It's possible that the StartSegment call gets retried (!).
1237        # if the 'started' key is in the allocation, we'll return it rather
1238        # than redo the setup.
1239        self.allocation[aid]['started'] = { 
1240                'allocID': alloc_id,
1241                'allocationLog': logv,
1242                'segmentdescription': { 
1243                    'topdldescription': rvtopo.to_dict() },
1244                'embedding': embedding,
1245                }
1246        retval = copy.deepcopy(self.allocation[aid]['started'])
1247        self.write_state()
1248        self.state_lock.release()
1249
1250        return retval
1251
1252    def StartSegment(self, req, fid):
1253        err = None  # Any service_error generated after tmpdir is created
1254        rv = None   # Return value from segment creation
1255
1256        try:
1257            req = req['StartSegmentRequestBody']
1258            topref = req['segmentdescription']['topdldescription']
1259        except KeyError:
1260            raise service_error(service_error.req, "Badly formed request")
1261
1262        connInfo = req.get('connection', [])
1263        services = req.get('service', [])
1264        auth_attr = req['allocID']['fedid']
1265        aid = "%s" % auth_attr
1266        attrs = req.get('fedAttr', [])
1267        if not self.auth.check_attribute(fid, auth_attr):
1268            raise service_error(service_error.access, "Access denied")
1269        else:
1270            # See if this is a replay of an earlier succeeded StartSegment -
1271            # sometimes SSL kills 'em.  If so, replay the response rather than
1272            # redoing the allocation.
1273            self.state_lock.acquire()
1274            retval = self.allocation[aid].get('started', None)
1275            self.state_lock.release()
1276            if retval:
1277                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1278                        "replaying response")
1279                return retval
1280
1281        if topref:
1282            topo = topdl.Topology(**topref)
1283        else:
1284            raise service_error(service_error.req, 
1285                    "Request missing segmentdescription'")
1286
1287        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1288        try:
1289            tmpdir = tempfile.mkdtemp(prefix="access-")
1290            softdir = "%s/software" % tmpdir
1291        except EnvironmentError:
1292            raise service_error(service_error.internal, "Cannot create tmp dir")
1293
1294        # Try block alllows us to clean up temporary files.
1295        try:
1296            self.retrieve_software(topo, certfile, softdir)
1297            self.configure_userconf(services, tmpdir)
1298            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1299                cpw, alloc_log = self.initialize_experiment_info(attrs,
1300                        aid, certfile, tmpdir)
1301            self.import_store_info(certfile, connInfo)
1302            rspec = self.generate_rspec(topo, "%s/%s/" \
1303                    % (self.staging_dir, ename), connInfo)
1304
1305            segment_commands = self.api_proxy(keyfile=ssh_key,
1306                    debug=self.create_debug, log=alloc_log,
1307                    ch_url = self.ch_url, sa_url=self.sa_url,
1308                    cm_url=self.cm_url)
1309            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1310                    pubkey_base, 
1311                    secretkey_base, ename,
1312                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1313                    certfile, topo, connInfo, services)
1314        except EnvironmentError, e:
1315            err = service_error(service_error.internal, "%s" % e)
1316        except service_error, e:
1317            err = e
1318        except:
1319            t, v, st = sys.exc_info()
1320            err = service_error(service_error.internal, "%s: %s" % \
1321                    (v, traceback.extract_tb(st)))
1322
1323        # Walk up tmpdir, deleting as we go
1324        if self.cleanup: self.remove_dirs(tmpdir)
1325        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1326
1327        if rv:
1328            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1329        elif err:
1330            raise service_error(service_error.federant,
1331                    "Swapin failed: %s" % err)
1332        else:
1333            raise service_error(service_error.federant, "Swapin failed")
1334
1335    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1336            slice_urn, certfile, certpw):
1337        """
1338        Stop a sub experiment by calling swapexp on the federant
1339        """
1340        host = self.staging_host
1341        rv = False
1342        try:
1343            # Clean out tar files: we've gone over quota in the past
1344            if stagingdir:
1345                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1346            if slice_cred:
1347                self.log.error('Removing Sliver on ProtoGENI')
1348                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1349                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1350            return True
1351        except self.ssh_cmd_timeout:
1352            rv = False
1353        return rv
1354
1355    def TerminateSegment(self, req, fid):
1356        try:
1357            req = req['TerminateSegmentRequestBody']
1358        except KeyError:
1359            raise service_error(service_error.req, "Badly formed request")
1360
1361        auth_attr = req['allocID']['fedid']
1362        aid = "%s" % auth_attr
1363        attrs = req.get('fedAttr', [])
1364        if not self.auth.check_attribute(fid, auth_attr):
1365            raise service_error(service_error.access, "Access denied")
1366
1367        self.state_lock.acquire()
1368        if self.allocation.has_key(aid):
1369            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1370            slice_cred = self.allocation[aid].get('slice_credential', None)
1371            slice_urn = self.allocation[aid].get('slice_urn', None)
1372            ename = self.allocation[aid].get('experiment', None)
1373        else:
1374            cf, user, ssh_key, cpw = (None, None, None, None)
1375            slice_cred = None
1376            slice_urn = None
1377            ename = None
1378        self.state_lock.release()
1379
1380        if ename:
1381            staging = "%s/%s" % ( self.staging_dir, ename)
1382        else:
1383            self.log.warn("Can't find experiment name for %s" % aid)
1384            staging = None
1385
1386        segment_commands = self.api_proxy(keyfile=ssh_key,
1387                debug=self.create_debug, ch_url = self.ch_url,
1388                sa_url=self.sa_url, cm_url=self.cm_url)
1389        self.stop_segment(segment_commands, user, staging, slice_cred,
1390                slice_urn, cf, cpw)
1391        return { 'allocID': req['allocID'] }
1392
1393    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1394            certfile, certpw):
1395        """
1396        Linear code through the segment renewal calls.
1397        """
1398        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1399        try:
1400            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1401                    time.gmtime(time.time() + interval))
1402            cred = segment_commands.slice_authority_call('GetCredential', 
1403                    {}, ctxt)
1404
1405            param = {
1406                    'credential': scred,
1407                    'expiration': expiration
1408                    }
1409            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1410            param = {
1411                    'credential': cred,
1412                    'urn': slice_urn,
1413                    'type': 'Slice',
1414                    }
1415            new_scred = segment_commands.slice_authority_call('GetCredential',
1416                    param, ctxt)
1417
1418        except segment_commands.ProtoGENIError, e:
1419            self.log.error("Failed to extend slice %s: %s" % (name, e))
1420            return None
1421        try:
1422            param = {
1423                    'credentials': [new_scred,],
1424                    'slice_urn': slice_urn,
1425                    }
1426            r = segment_commands.component_manager_call('RenewSlice', param, 
1427                    ctxt)
1428        except segment_commands.ProtoGENIError, e:
1429            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1430
1431        return new_scred
1432   
1433
1434    def RenewSlices(self):
1435        self.log.info("Scanning for slices to renew")
1436        self.state_lock.acquire()
1437        aids = self.allocation.keys()
1438        self.state_lock.release()
1439
1440        for aid in aids:
1441            self.state_lock.acquire()
1442            if self.allocation.has_key(aid):
1443                name = self.allocation[aid].get('slice_name', None)
1444                scred = self.allocation[aid].get('slice_credential', None)
1445                slice_urn = self.allocation[aid].get('slice_urn', None)
1446                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1447            else:
1448                name = None
1449                scred = None
1450            self.state_lock.release()
1451
1452            if not os.access(cf, os.R_OK):
1453                self.log.error(
1454                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1455                continue
1456
1457            # There's a ProtoGENI slice associated with the segment; renew it.
1458            if name and scred and slice_urn:
1459                segment_commands = self.api_proxy(log=self.log, 
1460                        debug=self.create_debug, keyfile=ssh_key,
1461                        cm_url = self.cm_url, sa_url = self.sa_url,
1462                        ch_url = self.ch_url)
1463                new_scred = self.renew_segment(segment_commands, name, scred, 
1464                        slice_urn, self.renewal_interval, cf, cpw)
1465                if new_scred:
1466                    self.log.info("Slice %s renewed until %s GMT" % \
1467                            (name, time.asctime(time.gmtime(\
1468                                time.time()+self.renewal_interval))))
1469                    self.state_lock.acquire()
1470                    if self.allocation.has_key(aid):
1471                        self.allocation[aid]['slice_credential'] = new_scred
1472                        self.write_state()
1473                    self.state_lock.release()
1474                else:
1475                    self.log.info("Failed to renew slice %s " % name)
1476
1477        # Let's do this all again soon.  (4 tries before the slices time out)   
1478        t = Timer(self.renewal_interval/4, self.RenewSlices)
1479        t.start()
Note: See TracBrowser for help on using the repository browser.