source: fedd/federation/protogeni_access.py @ ec36918

version-3.01version-3.02
Last change on this file since ec36918 was ec36918, checked in by Ted Faber <faber@…>, 13 years ago

Changes to deal with ProtoGENI's changes and occasional flakiness in mapping interface names. We use MAC addres to differentiate interfaces rather than name.

We aslo record the rspec used to request resources as well as the manifest to improve debugging.

Some changes were also made to simplify slice renewal.

  • Property mode set to 100644
File size: 47.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from access_project import access_project
20from fedid import fedid, generate_fedid
21from authorizer import authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24
25import httplib
26import tempfile
27from urlparse import urlparse
28
29from access import access_base
30from protogeni_proxy import protogeni_proxy
31from geniapi_proxy import geniapi_proxy
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.domain = config.get("access", "domain")
60        self.userconfdir = config.get("access","userconfdir")
61        self.userconfcmd = config.get("access","userconfcmd")
62        self.userconfurl = config.get("access","userconfurl")
63        self.federation_software = config.get("access", "federation_software")
64        self.portal_software = config.get("access", "portal_software")
65        self.ssh_port = config.get("access","ssh_port") or "22"
66        self.sshd = config.get("access","sshd")
67        self.sshd_config = config.get("access", "sshd_config")
68        self.access_type = config.get("access", "type")
69        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
70        self.staging_host = config.get("access", "staging_host") \
71                or "ops.emulab.net"
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75   
76        self.dragon_endpoint = config.get("access", "dragon")
77        self.dragon_vlans = config.get("access", "dragon_vlans")
78        self.deter_internal = config.get("access", "deter_internal")
79
80        self.tunnel_config = config.getboolean("access", "tunnel_config")
81        self.portal_command = config.get("access", "portal_command")
82        self.portal_image = config.get("access", "portal_image")
83        self.portal_type = config.get("access", "portal_type") or "pc"
84        self.portal_startcommand = config.get("access", "portal_startcommand")
85        self.node_startcommand = config.get("access", "node_startcommand")
86
87        self.federation_software = self.software_list(self.federation_software)
88        self.portal_software = self.software_list(self.portal_software)
89        self.local_seer_software = self.software_list(self.local_seer_software)
90
91        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
92        self.renewal_interval = int(self.renewal_interval) * 60
93
94        self.ch_url = config.get("access", "ch_url")
95        self.sa_url = config.get("access", "sa_url")
96        self.cm_url = config.get("access", "cm_url")
97
98        self.restricted = [ ]
99
100        # read_state in the base_class
101        self.state_lock.acquire()
102        for a  in ('allocation', 'projects', 'keys', 'types'):
103            if a not in self.state:
104                self.state[a] = { }
105        self.allocation = self.state['allocation']
106        self.projects = self.state['projects']
107        self.keys = self.state['keys']
108        self.types = self.state['types']
109        # Add the ownership attributes to the authorizer.  Note that the
110        # indices of the allocation dict are strings, but the attributes are
111        # fedids, so there is a conversion.
112        for k in self.state.get('allocation', {}).keys():
113            for o in self.state['allocation'][k].get('owners', []):
114                self.auth.set_attribute(o, fedid(hexstr=k))
115            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
116
117        self.state_lock.release()
118
119
120        self.log = logging.getLogger("fedd.access")
121        set_log_level(config, "access", self.log)
122
123        self.access = { }
124        if config.has_option("access", "accessdb"):
125            self.read_access(config.get("access", "accessdb"), 
126                    access_obj=self.make_access_info)
127
128        self.lookup_access = self.lookup_access_base
129
130        api = config.get("access", "api") or "protogeni"
131        if api == "protogeni":
132            self.api_proxy = protogeni_proxy
133        elif api == "geniapi":
134            self.api_proxy = geniapi_proxy
135        else:
136            self.log.debug("Unknown interface, using protogeni")
137            self.api_proxy = protogeni_proxy
138
139        self.call_SetValue = service_caller('SetValue')
140        self.call_GetValue = service_caller('GetValue')
141        self.exports = {
142                'local_seer_control': self.export_local_seer,
143                'seer_master': self.export_seer_master,
144                'hide_hosts': self.export_hide_hosts,
145                }
146
147        if not self.local_seer_image or not self.local_seer_software or \
148                not self.local_seer_start:
149            if 'local_seer_control' in self.exports:
150                del self.exports['local_seer_control']
151
152        if not self.local_seer_image or not self.local_seer_software or \
153                not self.seer_master_start:
154            if 'seer_master' in self.exports:
155                del self.exports['seer_master']
156
157        self.RenewSlices()
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment", 
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176    @staticmethod
177    def make_access_info(s):
178        """
179        Split a string of the form (id, id, id, id) ito its constituent tuples
180        and return them as a tuple.  Use to import access info from the
181        access_db.
182        """
183
184        ss = s.strip()
185        if ss.startswith('(') and ss.endswith(')'):
186            l = [ s.strip() for s  in ss[1:-1].split(",")]
187            if len(l) == 4:
188                return tuple(l)
189            else:
190                raise self.parse_error(
191                        "Exactly 4 elements in access info required")
192        else:
193            raise self.parse_error("Expecting parenthezied values")
194
195
196    def get_handler(self, path, fid):
197        self.log.info("Get handler %s %s" % (path, fid))
198        if self.auth.check_attribute(fid, path) and self.userconfdir:
199            return ("%s/%s" % (self.userconfdir, path), "application/binary")
200        else:
201            return (None, None)
202
203    def build_access_response(self, alloc_id, services):
204        """
205        Create the SOAP response.
206
207        Build the dictionary description of the response and use
208        fedd_utils.pack_soap to create the soap message.  ap is the allocate
209        project message returned from a remote project allocation (even if that
210        allocation was done locally).
211        """
212        # Because alloc_id is already a fedd_services_types.IDType_Holder,
213        # there's no need to repack it
214        msg = { 
215                'allocID': alloc_id,
216                'fedAttr': [
217                    { 'attribute': 'domain', 'value': self.domain } , 
218                ]
219            }
220        if self.dragon_endpoint:
221            msg['fedAttr'].append({'attribute': 'dragon',
222                'value': self.dragon_endpoint})
223        if self.deter_internal:
224            msg['fedAttr'].append({'attribute': 'deter_internal',
225                'value': self.deter_internal})
226        #XXX: ??
227        if self.dragon_vlans:
228            msg['fedAttr'].append({'attribute': 'vlans',
229                'value': self.dragon_vlans})
230
231        if services:
232            msg['service'] = services
233        return msg
234
235    def RequestAccess(self, req, fid):
236        """
237        Handle the access request.
238        """
239
240        # The dance to get into the request body
241        if req.has_key('RequestAccessRequestBody'):
242            req = req['RequestAccessRequestBody']
243        else:
244            raise service_error(service_error.req, "No request!?")
245
246        if req.has_key('destinationTestbed'):
247            dt = unpack_id(req['destinationTestbed'])
248
249        # Request for this fedd
250        found, match = self.lookup_access(req, fid)
251        services, svc_state = self.export_services(req.get('service',[]),
252                None, None)
253        # keep track of what's been added
254        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
255        aid = unicode(allocID)
256
257        self.state_lock.acquire()
258        self.allocation[aid] = { }
259        # The protoGENI certificate
260        self.allocation[aid]['credentials'] = found
261        # The list of owner FIDs
262        self.allocation[aid]['owners'] = [ fid ]
263        self.write_state()
264        self.state_lock.release()
265        self.auth.set_attribute(fid, allocID)
266        self.auth.set_attribute(allocID, allocID)
267
268        try:
269            f = open("%s/%s.pem" % (self.certdir, aid), "w")
270            print >>f, alloc_cert
271            f.close()
272        except EnvironmentError, e:
273            raise service_error(service_error.internal, 
274                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
275        return self.build_access_response({ 'fedid': allocID }, None)
276
277
278    def ReleaseAccess(self, req, fid):
279        # The dance to get into the request body
280        if req.has_key('ReleaseAccessRequestBody'):
281            req = req['ReleaseAccessRequestBody']
282        else:
283            raise service_error(service_error.req, "No request!?")
284
285        # Local request
286        try:
287            if req['allocID'].has_key('localname'):
288                auth_attr = aid = req['allocID']['localname']
289            elif req['allocID'].has_key('fedid'):
290                aid = unicode(req['allocID']['fedid'])
291                auth_attr = req['allocID']['fedid']
292            else:
293                raise service_error(service_error.req,
294                        "Only localnames and fedids are understood")
295        except KeyError:
296            raise service_error(service_error.req, "Badly formed request")
297
298        self.log.debug("[access] deallocation requested for %s", aid)
299        if not self.auth.check_attribute(fid, auth_attr):
300            self.log.debug("[access] deallocation denied for %s", aid)
301            raise service_error(service_error.access, "Access Denied")
302
303        self.state_lock.acquire()
304        if self.allocation.has_key(aid):
305            self.log.debug("Found allocation for %s" %aid)
306            del self.allocation[aid]
307            self.write_state()
308            self.state_lock.release()
309            # And remove the access cert
310            cf = "%s/%s.pem" % (self.certdir, aid)
311            self.log.debug("Removing %s" % cf)
312            os.remove(cf)
313            return { 'allocID': req['allocID'] } 
314        else:
315            self.state_lock.release()
316            raise service_error(service_error.req, "No such allocation")
317
318    def manifest_to_dict(self, manifest, ignore_debug=False):
319        """
320        Turn the manifest into a dict were each virtual nodename (i.e. the
321        topdl name) has an entry with the allocated machine in hostname and the
322        interfaces in 'interfaces'.  I love having XML parser code lying
323        around.
324        """
325        if self.create_debug and not ignore_debug: 
326            self.log.debug("Returning null manifest dict")
327            return { }
328
329        # The class allows us to keep a little state - the dict under
330        # consteruction and the current entry in that dict for the interface
331        # element code.
332        class manifest_parser:
333            def __init__(self):
334                self.d = { }
335                self.current_key=None
336
337            # If the element is a node, create a dict entry for it.  If it's an
338            # interface inside a node, add an entry in the interfaces list with
339            # the virtual name and component id.
340            def start_element(self, name, attrs):
341                if name == 'node':
342                    self.current_key = attrs.get('virtual_id',"")
343                    if self.current_key:
344                        self.d[self.current_key] = {
345                                'hostname': attrs.get('hostname', None),
346                                'interfaces': { },
347                                'mac': { }
348                                }
349                elif name == 'interface' and self.current_key:
350                    self.d[self.current_key]['interfaces']\
351                            [attrs.get('virtual_id','')] = \
352                            attrs.get('component_id', None)
353                elif name == 'interface_ref':
354                    # Collect mac address information from an interface_ref.
355                    # These appear after the node info has been parsed.
356                    nid = attrs.get('virtual_node_id', None)
357                    ifid = attrs.get('virtual_interface_id', None)
358                    mac = attrs.get('MAC', None)
359                    self.d[nid]['mac'][ifid] = mac
360            #  When a node is finished, clear current_key
361            def end_element(self, name):
362                if name == 'node': self.current_key = None
363
364        node = { }
365
366        mp = manifest_parser()
367        p = xml.parsers.expat.ParserCreate()
368        # These are bound to the class we just created
369        p.StartElementHandler = mp.start_element
370        p.EndElementHandler = mp.end_element
371
372        p.Parse(manifest)
373        # Make the node dict that the callers expect
374        for k in mp.d:
375            node[k] = mp.d.get('hostname', '')
376        return mp.d
377
378    def fake_manifest(self, topo):
379        """
380        Fake the output of manifest_to_dict with a bunch of generic node an
381        interface names, for debugging.
382        """
383        node = { }
384        for i, e in enumerate([ e for e in topo.elements \
385                if isinstance(e, topdl.Computer)]):
386            node[e.name] = { 
387                    'hostname': "node%03d" % i,
388                    'interfaces': { }
389                    }
390            for j, inf in enumerate(e.interface):
391                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
392
393        return node
394
395
396    def generate_portal_configs(self, topo, pubkey_base, 
397            secretkey_base, tmpdir, leid, connInfo, services, nodes):
398
399        def conninfo_to_dict(key, info):
400            """
401            Make a cpoy of the connection information about key, and flatten it
402            into a single dict by parsing out any feddAttrs.
403            """
404
405            rv = None
406            for i in info:
407                if key == i.get('portal', "") or \
408                        key in [e.get('element', "") \
409                        for e in i.get('member', [])]:
410                    rv = i.copy()
411                    break
412
413            else:
414                return rv
415
416            if 'fedAttr' in rv:
417                for a in rv['fedAttr']:
418                    attr = a.get('attribute', "")
419                    val = a.get('value', "")
420                    if attr and attr not in rv:
421                        rv[attr] = val
422                del rv['fedAttr']
423            return rv
424
425        # XXX: un hardcode this
426        def client_null(f, s):
427            print >>f, "Service: %s" % s['name']
428       
429        def client_seer_master(f, s):
430            print >>f, 'PortalAlias: seer-master'
431
432        def client_smb(f, s):
433            print >>f, "Service: %s" % s['name']
434            smbshare = None
435            smbuser = None
436            smbproj = None
437            for a in s.get('fedAttr', []):
438                if a.get('attribute', '') == 'SMBSHARE':
439                    smbshare = a.get('value', None)
440                elif a.get('attribute', '') == 'SMBUSER':
441                    smbuser = a.get('value', None)
442                elif a.get('attribute', '') == 'SMBPROJ':
443                    smbproj = a.get('value', None)
444
445            if all((smbshare, smbuser, smbproj)):
446                print >>f, "SMBshare: %s" % smbshare
447                print >>f, "ProjectUser: %s" % smbuser
448                print >>f, "ProjectName: %s" % smbproj
449
450        def client_hide_hosts(f, s):
451            for a in s.get('fedAttr', [ ]):
452                if a.get('attribute', "") == 'hosts':
453                    print >>f, 'Hide: %s' % a.get('value', "")
454
455        client_service_out = {
456                'SMB': client_smb,
457                'tmcd': client_null,
458                'seer': client_null,
459                'userconfig': client_null,
460                'project_export': client_null,
461                'seer_master': client_seer_master,
462                'hide_hosts': client_hide_hosts,
463            }
464
465        def client_seer_master_export(f, s):
466            print >>f, "AddedNode: seer-master"
467
468        def client_seer_local_export(f, s):
469            print >>f, "AddedNode: control"
470
471        client_export_service_out = {
472                'seer_master': client_seer_master_export,
473                'local_seer_control': client_seer_local_export,
474            }
475
476        def server_port(f, s):
477            p = urlparse(s.get('server', 'http://localhost'))
478            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
479
480        def server_null(f,s): pass
481
482        def server_seer(f, s):
483            print >>f, 'seer: true'
484
485        server_service_out = {
486                'SMB': server_port,
487                'tmcd': server_port,
488                'userconfig': server_null,
489                'project_export': server_null,
490                'seer': server_seer,
491                'seer_master': server_port,
492                'hide_hosts': server_null,
493            }
494        # XXX: end un hardcode this
495
496
497        seer_out = False
498        client_out = False
499        for e in [ e for e in topo.elements \
500                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
501            myname = e.name
502            type = e.get_attribute('portal_type')
503
504            info = conninfo_to_dict(myname, connInfo)
505
506            if not info:
507                raise service_error(service_error.req,
508                        "No connectivity info for %s" % myname)
509
510            # Translate to physical name (ProtoGENI doesn't have DNS)
511            physname = nodes.get(myname, { }).get('hostname', None)
512            peer = info.get('peer', "")
513            ldomain = self.domain
514            ssh_port = info.get('ssh_port', 22)
515
516            # Collect this for the client.conf file
517            if 'masterexperiment' in info:
518                mproj, meid = info['masterexperiment'].split("/", 1)
519
520            active = info.get('active', 'False')
521
522            if type in ('control', 'both'):
523                testbed = e.get_attribute('testbed')
524                control_gw = myname
525
526            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
527            tunnelconfig = self.tunnel_config
528            try:
529                f = open(cfn, "w")
530                if active == 'True':
531                    print >>f, "active: True"
532                    print >>f, "ssh_port: %s" % ssh_port
533                    if type in ('control', 'both'):
534                        for s in [s for s in services \
535                                if s.get('name', "") in self.imports]:
536                            server_service_out[s['name']](f, s)
537
538                if tunnelconfig:
539                    print >>f, "tunnelip: %s" % tunnelconfig
540                print >>f, "peer: %s" % peer.lower()
541                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
542                        pubkey_base
543                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
544                        secretkey_base
545                f.close()
546            except EnvironmentError, e:
547                raise service_error(service_error.internal,
548                        "Can't write protal config %s: %s" % (cfn, e))
549
550        # Done with portals, write the client config file.
551        try:
552            f = open("%s/client.conf" % tmpdir, "w")
553            if control_gw:
554                print >>f, "ControlGateway: %s" % physname.lower()
555            for s in services:
556                if s.get('name',"") in self.imports and \
557                        s.get('visibility','') == 'import':
558                    client_service_out[s['name']](f, s)
559                if s.get('name', '') in self.exports and \
560                        s.get('visibility', '') == 'export' and \
561                        s['name'] in client_export_service_out:
562                    client_export_service_out[s['name']](f, s)
563            # Seer uses this.
564            if mproj and meid:
565                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
566            f.close()
567        except EnvironmentError, e:
568            raise service_error(service_error.internal,
569                    "Cannot write client.conf: %s" %s)
570
571
572
573    def export_store_info(self, cf, nodes, ssh_port, connInfo):
574        """
575        For the export requests in the connection info, install the peer names
576        at the experiment controller via SetValue calls.
577        """
578
579        for c in connInfo:
580            for p in [ p for p in c.get('parameter', []) \
581                    if p.get('type', '') == 'output']:
582
583                if p.get('name', '') == 'peer':
584                    k = p.get('key', None)
585                    surl = p.get('store', None)
586                    if surl and k and k.index('/') != -1:
587                        if self.create_debug:
588                            req = { 'name': k, 'value': 'debug' }
589                            self.call_SetValue(surl, req, cf)
590                        else:
591                            n = nodes.get(k[k.index('/')+1:], { })
592                            value = n.get('hostname', None)
593                            if value:
594                                req = { 'name': k, 'value': value }
595                                self.call_SetValue(surl, req, cf)
596                            else:
597                                self.log.error("No hostname for %s" % \
598                                        k[k.index('/'):])
599                    else:
600                        self.log.error("Bad export request: %s" % p)
601                elif p.get('name', '') == 'ssh_port':
602                    k = p.get('key', None)
603                    surl = p.get('store', None)
604                    if surl and k:
605                        req = { 'name': k, 'value': ssh_port }
606                        self.call_SetValue(surl, req, cf)
607                    else:
608                        self.log.error("Bad export request: %s" % p)
609                else:
610
611                    self.log.error("Unknown export parameter: %s" % \
612                            p.get('name'))
613                    continue
614
615    def write_node_config_script(self, elem, node, user, pubkey,
616            secretkey, stagingdir, tmpdir):
617        """
618        Write out the configuration script that is to run on the node
619        represented by elem in the topology.  This is called
620        once per node to configure.
621        """
622        # These little functions/functors just make things more readable.  Each
623        # one encapsulates a small task of copying software files or installing
624        # them.
625        class stage_file_type:
626            """
627            Write code copying file sfrom the staging host to the host on which
628            this will run.
629            """
630            def __init__(self, user, host, stagingdir):
631                self.user = user
632                self.host = host
633                self.stagingdir = stagingdir
634                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
635                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
636
637            def __call__(self, script, file, dest="."):
638                # If the file is a full pathname, do not use stagingdir
639                if file.find('/') == -1:
640                    file = "%s/%s" % (self.stagingdir, file)
641                print >>script, "%s %s@%s:%s %s" % \
642                        (self.scp, self.user, self.host, file, dest)
643
644        def install_tar(script, loc, base):
645            """
646            Print code to script to install a tarfile in loc.
647            """
648            tar = "/bin/tar"
649            mkdir="/bin/mkdir"
650
651            print >>script, "%s -p %s" % (mkdir, loc)
652            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
653
654        def install_rpm(script, base):
655            """
656            Print code to script to install an rpm
657            """
658            rpm = "/bin/rpm"
659            print >>script, "%s --install %s" % (rpm, base)
660
661        ifconfig = "/sbin/ifconfig"
662        findif = '/usr/local/etc/emulab/findif'
663        stage_file = stage_file_type(user, self.staging_host, stagingdir)
664        pname = node.get('hostname', None)
665        fed_dir = "/usr/local/federation"
666        fed_etc_dir = "%s/etc" % fed_dir
667        fed_bin_dir = "%s/bin" % fed_dir
668        fed_lib_dir = "%s/lib" % fed_dir
669
670        if pname:
671            sfile = "%s/%s.startup" % (tmpdir, pname)
672            script = open(sfile, "w")
673            # Reset the interfaces to the ones in the topo file
674            for i in [ i for i in elem.interface \
675                    if not i.get_attribute('portal')]:
676                pinf = node['interfaces'].get(i.name, None)
677                pmac = node['mac'].get(i.name, None)
678                addr = i.get_attribute('ip4_address') 
679                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
680                # The interface names in manifests are not to be trusted, so we
681                # find the interface to configure using the local node's script
682                # to match mac address to interface name.
683                if pinf and addr and pmac:
684                    print >>script, '# %s' % pinf
685                    print >>script, \
686                            "%s `%s %s` %s netmask %s"  % \
687                            (ifconfig, findif, pmac, addr, netmask)
688                else:
689                    self.log.error("Missing interface or address for %s" \
690                            % i.name)
691               
692            for l, f in self.federation_software:
693                base = os.path.basename(f)
694                stage_file(script, base)
695                if l: install_tar(script, l, base)
696                else: install_rpm(script, base)
697
698            for s in elem.software:
699                s_base = s.location.rpartition('/')[2]
700                stage_file(script, s_base)
701                if s.install: install_tar(script, s.install, s_base)
702                else: install_rpm(script, s_base)
703
704            for f in ('hosts', pubkey, secretkey, 'client.conf', 
705                    'userconf'):
706                stage_file(script, f, fed_etc_dir)
707            if self.sshd:
708                stage_file(script, self.sshd, fed_bin_dir)
709            if self.sshd_config:
710                stage_file(script, self.sshd_config, fed_etc_dir)
711
712            # Look in tmpdir to get the names.  They've all been copied
713            # into the (remote) staging dir
714            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
715                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
716
717            # Done with staging, remove the identity used to stage
718            print >>script, "#/bin/rm .ssh/id_rsa"
719
720            # Start commands
721            if elem.get_attribute('portal') and self.portal_startcommand:
722                # Install portal software
723                for l, f in self.portal_software:
724                    base = os.path.basename(f)
725                    stage_file(script, base)
726                    if l: install_tar(script, l, base)
727                    else: install_rpm(script, base)
728
729                # Portals never have a user-specified start command
730                print >>script, self.portal_startcommand
731            elif self.node_startcommand:
732                # XXX: debug
733                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
734                # XXX: debug
735                if elem.get_attribute('startup'):
736                    print >>script, "%s \\$USER '%s'" % \
737                            (self.node_startcommand,
738                                    elem.get_attribute('startup'))
739                else:
740                    print >>script, self.node_startcommand
741            script.close()
742            return sfile, pname
743        else:
744            return None, None
745
746
747    def configure_nodes(self, segment_commands, topo, nodes, user, 
748            pubkey, secretkey, stagingdir, tmpdir):
749        """
750        For each node in the topology, generate a script file that copies
751        software onto it and installs it in the proper places and then runs the
752        startup command (including the federation commands.
753        """
754
755
756
757        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
758            vname = e.name
759            sfile, pname = self.write_node_config_script(e,
760                    nodes.get(vname, { }),
761                    user, pubkey, secretkey, stagingdir, tmpdir)
762            if sfile:
763                if not segment_commands.scp_file(sfile, user, pname):
764                    self.log.error("Could not copy script to %s" % pname)
765            else:
766                self.log.error("Unmapped node: %s" % vname)
767
768    def start_node(self, user, host, node, segment_commands):
769        """
770        Copy an identity to a node for the configuration script to be able to
771        import data and then run the startup script remotely.
772        """
773        # Place an identity on the node so that the copying can succeed
774        segment_commands.scp_file( segment_commands.ssh_privkey_file,
775                user, node, ".ssh/id_rsa")
776        segment_commands.ssh_cmd(user, node, 
777                "sudo /bin/sh ./%s.startup &" % node)
778
779    def start_nodes(self, user, host, nodes, segment_commands):
780        """
781        Start a thread to initialize each node and wait for them to complete.
782        Each thread runs start_node.
783        """
784        threads = [ ]
785        for n in nodes:
786            t = Thread(target=self.start_node, args=(user, host, n, 
787                segment_commands))
788            t.start()
789            threads.append(t)
790
791        done = [not t.isAlive() for t in threads]
792        while not all(done):
793            self.log.info("Waiting for threads %s" % done)
794            time.sleep(10)
795            done = [not t.isAlive() for t in threads]
796
797    def set_up_staging_filespace(self, segment_commands, user, host,
798            stagingdir):
799        """
800        Set up teh staging area on the staging machine.  To reduce the number
801        of ssh commands, we compose a script and execute it remotely.
802        """
803
804        self.log.info("[start_segment]: creating script file")
805        try:
806            sf, scriptname = tempfile.mkstemp()
807            scriptfile = os.fdopen(sf, 'w')
808        except EnvironmentError:
809            return False
810
811        scriptbase = os.path.basename(scriptname)
812
813        # Script the filesystem changes
814        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
815        print >>scriptfile, 'mkdir -p %s' % stagingdir
816        print >>scriptfile, "rm -f %s" % scriptbase
817        scriptfile.close()
818
819        # Move the script to the remote machine
820        # XXX: could collide tempfile names on the remote host
821        if segment_commands.scp_file(scriptname, user, host, scriptbase):
822            os.remove(scriptname)
823        else:
824            return False
825
826        # Execute the script (and the script's last line deletes it)
827        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
828            return False
829
830    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
831        """
832        Protogeni interactions take a context and a protogeni certificate.
833        This establishes both for later calls and returns them.
834        """
835        if os.access(certfile, os.R_OK):
836            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
837        else:
838            self.log.error("[start_segment]: Cannot read certfile: %s" % \
839                    certfile)
840            return None, None
841
842        try:
843            gcred = segment_commands.slice_authority_call('GetCredential', 
844                    {}, ctxt)
845        except segment_commands.ProtoGENIError, e:
846            raise service_error(service_error.federant,
847                    "ProtoGENI: %s" % e)
848
849        return ctxt, gcred
850
851    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
852        """
853        Find a usable slice name by trying random ones until there's no
854        collision.
855        """
856
857        def random_slicename(user):
858            """
859            Return a random slicename by appending 5 letters to the username.
860            """
861            slicename = user
862            for i in range(0,5):
863                slicename += random.choice(string.ascii_letters)
864            return slicename
865
866        while True:
867            slicename = random_slicename(user)
868            try:
869                param = {
870                        'credential': gcred, 
871                        'hrn': slicename,
872                        'type': 'Slice'
873                        }
874                segment_commands.slice_authority_call('Resolve', param, ctxt)
875            except segment_commands.ProtoGENIError, e:
876                print e
877                break
878
879        return slicename
880
881    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
882        """
883        Create the slice and allocate resources.  If any of this stuff fails,
884        the allocations will time out on PG in short order, so we just raise
885        the service_error.  Return the slice and sliver credentials as well as
886        the manifest.
887        """
888        try:
889            param = {
890                    'credential': gcred, 
891                    'hrn': slicename,
892                    'type': 'Slice'
893                    }
894            slice_cred = segment_commands.slice_authority_call('Register',
895                    param, ctxt)
896            # Resolve the slice to get the URN that PG has assigned it.
897            param = {
898                    'credential': gcred,
899                    'type': 'Slice',
900                    'hrn': slicename
901                    }
902            data = segment_commands.slice_authority_call('Resolve', param,
903                    ctxt)
904            if 'urn' in data:
905                slice_urn = data['urn']
906            else:
907                raise service_error(service_error.federant, 
908                        "No URN returned for slice %s" % hrn)
909
910            if 'creator_urn' in data:
911                creator_urn = data['creator_urn']
912            else:
913                raise service_error(service_error.federant, 
914                        "No creator URN returned for slice %s" % hrn)
915            # Populate the ssh keys (let PG format them)
916            param = {
917                    'credential': gcred, 
918                    }
919            keys =  segment_commands.slice_authority_call('GetKeys', param, 
920                    ctxt)
921            # Create a Sliver
922            param = {
923                    'credentials': [ slice_cred ],
924                    'rspec': rspec, 
925                    'users': [ {
926                        'urn': creator_urn,
927                        'keys': keys, 
928                        },
929                    ],
930                    'slice_urn': slice_urn,
931                    }
932            rv = segment_commands.component_manager_call(
933                    'CreateSliver', param, ctxt)
934
935            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
936            # API hands back a sliver credential.  This just finds the format
937            # of choice.
938            if isinstance(rv, tuple):
939                manifest = rv[1]
940            else:
941                manifest = rv
942        except segment_commands.ProtoGENIError, e:
943            raise service_error(service_error.federant,
944                    "ProtoGENI: %s %s" % (e.code, e))
945
946        return (slice_urn, slice_cred, manifest, rspec)
947
948    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
949            timeout=None):
950        """
951        Wait for the given slice to finish its startup.  Return the final
952        status.
953        """
954        completed_states = ('failed', 'ready')
955        status = 'changing'
956        if timeout is not None:
957            end = time.time() + timeout
958        try:
959            while status not in completed_states:
960                param = { 
961                        'credentials': [ slice_cred ],
962                        'slice_urn': slice_urn,
963                        }
964                r = segment_commands.component_manager_call(
965                        'SliverStatus', param, ctxt)
966                # GENIAPI uses geni_status as the key, so check for both
967                status = r.get('status', r.get('geni_status','changing'))
968                if status not in completed_states:
969                    if timeout is not None and time.time() > end:
970                        return 'timeout'
971                    time.sleep(30)
972        except segment_commands.ProtoGENIError, e:
973            raise service_error(service_error.federant,
974                    "ProtoGENI: %s %s" % (e.code, e))
975
976        return status
977
978    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
979        """
980        Delete the slice resources.  An error from the service is ignores,
981        because the soft state will go away anyway.
982        """
983        try:
984            param = { 
985                    'credentials': [ slice_cred, ],
986                    'slice_urn': slice_urn,
987                    }
988            segment_commands.component_manager_call('DeleteSlice',
989                    param, ctxt)
990        except segment_commands.ProtoGENIError, e:
991            self.log.warn("ProtoGENI: %s" % e)
992
993
994
995    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
996            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
997            export_certfile, topo, connInfo, services, timeout=0):
998        """
999        Start a sub-experiment on a federant.
1000
1001        Get the current state, modify or create as appropriate, ship data
1002        and configs and start the experiment.  There are small ordering
1003        differences based on the initial state of the sub-experiment.
1004        """
1005
1006        # Local software dir
1007        lsoftdir = "%s/software" % tmpdir
1008        host = self.staging_host
1009
1010        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1011                certfile, certpw)
1012
1013        if not ctxt: return False, {}
1014
1015        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1016        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1017        self.log.info("Creating %s" % slicename)
1018        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1019                segment_commands, slicename, rspec, gcred, ctxt)
1020
1021        # With manifest in hand, we can export the portal node names.
1022        if self.create_debug: nodes = self.fake_manifest(topo)
1023        else: nodes = self.manifest_to_dict(manifest)
1024
1025        self.export_store_info(export_certfile, nodes, self.ssh_port,
1026                connInfo)
1027        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1028                ename, connInfo, services, nodes)
1029
1030        # Copy software to the staging machine (done after generation to copy
1031        # those, too)
1032        for d in (tmpdir, lsoftdir):
1033            if os.path.isdir(d):
1034                for f in os.listdir(d):
1035                    if not os.path.isdir("%s/%s" % (d, f)):
1036                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1037                                user, host, "%s/%s" % (stagingdir, f)):
1038                            self.log.error("Scp failed")
1039                            return False, {}
1040
1041
1042        # Now we wait for the nodes to start on PG
1043        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1044                ctxt, timeout=300)
1045        if status == 'failed':
1046            self.log.error('Sliver failed to start on ProtoGENI')
1047            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1048            return False, {}
1049        elif status == 'timeout':
1050            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1051            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1052            return False, {}
1053        else:
1054            # All good: save ProtoGENI info in shared state
1055            self.state_lock.acquire()
1056            self.allocation[aid]['slice_urn'] = slice_urn
1057            self.allocation[aid]['slice_name'] = slicename
1058            self.allocation[aid]['slice_credential'] = slice_cred
1059            self.allocation[aid]['manifest'] = manifest
1060            self.allocation[aid]['rspec'] = rspec
1061            self.allocation[aid]['certfile'] = certfile
1062            self.allocation[aid]['certpw'] = certpw
1063            self.write_state()
1064            self.state_lock.release()
1065
1066        # Now we have configuration to do for ProtoGENI
1067        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1068                secretkey, stagingdir, tmpdir)
1069
1070        self.start_nodes(user, self.staging_host, 
1071                [ n.get('hostname', None) for n in nodes.values()],
1072                segment_commands)
1073
1074        # Everything has gone OK.
1075        return True, dict([(k, n.get('hostname', None)) \
1076                for k, n in nodes.items()])
1077
1078    def generate_rspec(self, topo, softdir, connInfo):
1079
1080        # Force a useful image.  Without picking this the nodes can get
1081        # different images and there is great pain.
1082        def image_filter(e):
1083            if isinstance(e, topdl.Computer):
1084                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1085                        'image+emulab-ops//FEDORA10-STD" />'
1086            else:
1087                return ""
1088        # Main line of generate
1089        t = topo.clone()
1090
1091        starts = { }
1092        # Weed out the things we aren't going to instantiate: Segments, portal
1093        # substrates, and portal interfaces.  (The copy in the for loop allows
1094        # us to delete from e.elements in side the for loop).  While we're
1095        # touching all the elements, we also adjust paths from the original
1096        # testbed to local testbed paths and put the federation commands and
1097        # startcommands into a dict so we can start them manually later.
1098        # ProtoGENI requires setup before the federation commands run, so we
1099        # run them by hand after we've seeded configurations.
1100        for e in [e for e in t.elements]:
1101            if isinstance(e, topdl.Segment):
1102                t.elements.remove(e)
1103            # Fix software paths
1104            for s in getattr(e, 'software', []):
1105                s.location = re.sub("^.*/", softdir, s.location)
1106            if isinstance(e, topdl.Computer):
1107                if e.get_attribute('portal') and self.portal_startcommand:
1108                    # Portals never have a user-specified start command
1109                    starts[e.name] = self.portal_startcommand
1110                elif self.node_startcommand:
1111                    if e.get_attribute('startup'):
1112                        starts[e.name] = "%s \\$USER '%s'" % \
1113                                (self.node_startcommand, 
1114                                        e.get_attribute('startup'))
1115                        e.remove_attribute('startup')
1116                    else:
1117                        starts[e.name] = self.node_startcommand
1118
1119                # Remove portal interfaces
1120                e.interface = [i for i in e.interface \
1121                        if not i.get_attribute('portal')]
1122
1123        t.substrates = [ s.clone() for s in t.substrates ]
1124        t.incorporate_elements()
1125
1126        # Customize the rspec output to use the image we like
1127        filters = [ image_filter ]
1128
1129        # Convert to rspec and return it
1130        exp_rspec = topdl.topology_to_rspec(t, filters)
1131
1132        return exp_rspec
1133
1134    def retrieve_software(self, topo, certfile, softdir):
1135        """
1136        Collect the software that nodes in the topology need loaded and stage
1137        it locally.  This implies retrieving it from the experiment_controller
1138        and placing it into softdir.  Certfile is used to prove that this node
1139        has access to that data (it's the allocation/segment fedid).  Finally
1140        local portal and federation software is also copied to the same staging
1141        directory for simplicity - all software needed for experiment creation
1142        is in softdir.
1143        """
1144        sw = set()
1145        for e in topo.elements:
1146            for s in getattr(e, 'software', []):
1147                sw.add(s.location)
1148        os.mkdir(softdir)
1149        for s in sw:
1150            self.log.debug("Retrieving %s" % s)
1151            try:
1152                get_url(s, certfile, softdir)
1153            except:
1154                t, v, st = sys.exc_info()
1155                raise service_error(service_error.internal,
1156                        "Error retrieving %s: %s" % (s, v))
1157
1158        # Copy local portal node software to the tempdir
1159        for s in (self.portal_software, self.federation_software):
1160            for l, f in s:
1161                base = os.path.basename(f)
1162                copy_file(f, "%s/%s" % (softdir, base))
1163
1164
1165    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1166        """
1167        Gather common configuration files, retrieve or create an experiment
1168        name and project name, and return the ssh_key filenames.  Create an
1169        allocation log bound to the state log variable as well.
1170        """
1171        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1172        ename = None
1173        pubkey_base = None
1174        secretkey_base = None
1175        alloc_log = None
1176
1177        for a in attrs:
1178            if a['attribute'] in configs:
1179                try:
1180                    self.log.debug("Retrieving %s" % a['value'])
1181                    get_url(a['value'], certfile, tmpdir)
1182                except:
1183                    t, v, st = sys.exc_info()
1184                    raise service_error(service_error.internal,
1185                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1186            if a['attribute'] == 'ssh_pubkey':
1187                pubkey_base = a['value'].rpartition('/')[2]
1188            if a['attribute'] == 'ssh_secretkey':
1189                secretkey_base = a['value'].rpartition('/')[2]
1190            if a['attribute'] == 'experiment_name':
1191                ename = a['value']
1192
1193        if not ename:
1194            ename = ""
1195            for i in range(0,5):
1196                ename += random.choice(string.ascii_letters)
1197            self.log.warn("No experiment name: picked one randomly: %s" \
1198                    % ename)
1199
1200        self.state_lock.acquire()
1201        if self.allocation.has_key(aid):
1202            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1203            self.allocation[aid]['experiment'] = ename
1204            self.allocation[aid]['log'] = [ ]
1205            # Create a logger that logs to the experiment's state object as
1206            # well as to the main log file.
1207            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1208            h = logging.StreamHandler(
1209                    list_log.list_log(self.allocation[aid]['log']))
1210            # XXX: there should be a global one of these rather than
1211            # repeating the code.
1212            h.setFormatter(logging.Formatter(
1213                "%(asctime)s %(name)s %(message)s",
1214                        '%d %b %y %H:%M:%S'))
1215            alloc_log.addHandler(h)
1216            self.write_state()
1217        else:
1218            self.log.error("No allocation for %s!?" % aid)
1219        self.state_lock.release()
1220
1221        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1222                cpw, alloc_log)
1223
1224    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1225        # Copy the assigned names into the return topology
1226        rvtopo = topo.clone()
1227        embedding = [ ]
1228        for k, n in nodes.items():
1229            embedding.append({ 
1230                'toponame': k,
1231                'physname': [n ],
1232                })
1233        # Grab the log (this is some anal locking, but better safe than
1234        # sorry)
1235        self.state_lock.acquire()
1236        logv = "".join(self.allocation[aid]['log'])
1237        # It's possible that the StartSegment call gets retried (!).
1238        # if the 'started' key is in the allocation, we'll return it rather
1239        # than redo the setup.
1240        self.allocation[aid]['started'] = { 
1241                'allocID': alloc_id,
1242                'allocationLog': logv,
1243                'segmentdescription': { 
1244                    'topdldescription': rvtopo.to_dict() },
1245                'embedding': embedding,
1246                }
1247        retval = copy.deepcopy(self.allocation[aid]['started'])
1248        self.write_state()
1249        self.state_lock.release()
1250
1251        return retval
1252
1253    def StartSegment(self, req, fid):
1254        err = None  # Any service_error generated after tmpdir is created
1255        rv = None   # Return value from segment creation
1256
1257        try:
1258            req = req['StartSegmentRequestBody']
1259            topref = req['segmentdescription']['topdldescription']
1260        except KeyError:
1261            raise service_error(service_error.req, "Badly formed request")
1262
1263        connInfo = req.get('connection', [])
1264        services = req.get('service', [])
1265        auth_attr = req['allocID']['fedid']
1266        aid = "%s" % auth_attr
1267        attrs = req.get('fedAttr', [])
1268        if not self.auth.check_attribute(fid, auth_attr):
1269            raise service_error(service_error.access, "Access denied")
1270        else:
1271            # See if this is a replay of an earlier succeeded StartSegment -
1272            # sometimes SSL kills 'em.  If so, replay the response rather than
1273            # redoing the allocation.
1274            self.state_lock.acquire()
1275            retval = self.allocation[aid].get('started', None)
1276            self.state_lock.release()
1277            if retval:
1278                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1279                        "replaying response")
1280                return retval
1281
1282        if topref:
1283            topo = topdl.Topology(**topref)
1284        else:
1285            raise service_error(service_error.req, 
1286                    "Request missing segmentdescription'")
1287
1288        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1289        try:
1290            tmpdir = tempfile.mkdtemp(prefix="access-")
1291            softdir = "%s/software" % tmpdir
1292        except EnvironmentError:
1293            raise service_error(service_error.internal, "Cannot create tmp dir")
1294
1295        # Try block alllows us to clean up temporary files.
1296        try:
1297            self.retrieve_software(topo, certfile, softdir)
1298            self.configure_userconf(services, tmpdir)
1299            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1300                cpw, alloc_log = self.initialize_experiment_info(attrs,
1301                        aid, certfile, tmpdir)
1302            self.import_store_info(certfile, connInfo)
1303            rspec = self.generate_rspec(topo, "%s/%s/" \
1304                    % (self.staging_dir, ename), connInfo)
1305
1306            segment_commands = self.api_proxy(keyfile=ssh_key,
1307                    debug=self.create_debug, log=alloc_log,
1308                    ch_url = self.ch_url, sa_url=self.sa_url,
1309                    cm_url=self.cm_url)
1310            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1311                    pubkey_base, 
1312                    secretkey_base, ename,
1313                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1314                    certfile, topo, connInfo, services)
1315        except EnvironmentError, e:
1316            err = service_error(service_error.internal, "%s" % e)
1317        except service_error, e:
1318            err = e
1319        except:
1320            t, v, st = sys.exc_info()
1321            err = service_error(service_error.internal, "%s: %s" % \
1322                    (v, traceback.extract_tb(st)))
1323
1324        # Walk up tmpdir, deleting as we go
1325        if self.cleanup: self.remove_dirs(tmpdir)
1326        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1327
1328        if rv:
1329            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1330        elif err:
1331            raise service_error(service_error.federant,
1332                    "Swapin failed: %s" % err)
1333        else:
1334            raise service_error(service_error.federant, "Swapin failed")
1335
1336    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1337            slice_urn, certfile, certpw):
1338        """
1339        Stop a sub experiment by calling swapexp on the federant
1340        """
1341        host = self.staging_host
1342        rv = False
1343        try:
1344            # Clean out tar files: we've gone over quota in the past
1345            if stagingdir:
1346                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1347            if slice_cred:
1348                self.log.error('Removing Sliver on ProtoGENI')
1349                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1350                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1351            return True
1352        except self.ssh_cmd_timeout:
1353            rv = False
1354        return rv
1355
1356    def TerminateSegment(self, req, fid):
1357        try:
1358            req = req['TerminateSegmentRequestBody']
1359        except KeyError:
1360            raise service_error(service_error.req, "Badly formed request")
1361
1362        auth_attr = req['allocID']['fedid']
1363        aid = "%s" % auth_attr
1364        attrs = req.get('fedAttr', [])
1365        if not self.auth.check_attribute(fid, auth_attr):
1366            raise service_error(service_error.access, "Access denied")
1367
1368        self.state_lock.acquire()
1369        if self.allocation.has_key(aid):
1370            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1371            slice_cred = self.allocation[aid].get('slice_credential', None)
1372            slice_urn = self.allocation[aid].get('slice_urn', None)
1373            ename = self.allocation[aid].get('experiment', None)
1374        else:
1375            cf, user, ssh_key, cpw = (None, None, None, None)
1376            slice_cred = None
1377            slice_urn = None
1378            ename = None
1379        self.state_lock.release()
1380
1381        if ename:
1382            staging = "%s/%s" % ( self.staging_dir, ename)
1383        else:
1384            self.log.warn("Can't find experiment name for %s" % aid)
1385            staging = None
1386
1387        segment_commands = self.api_proxy(keyfile=ssh_key,
1388                debug=self.create_debug, ch_url = self.ch_url,
1389                sa_url=self.sa_url, cm_url=self.cm_url)
1390        self.stop_segment(segment_commands, user, staging, slice_cred,
1391                slice_urn, cf, cpw)
1392        return { 'allocID': req['allocID'] }
1393
1394    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1395            certfile, certpw):
1396        """
1397        Linear code through the segment renewal calls.
1398        """
1399        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1400        try:
1401            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1402                    time.gmtime(time.time() + interval))
1403            cred = segment_commands.slice_authority_call('GetCredential', 
1404                    {}, ctxt)
1405
1406            param = {
1407                    'credential': scred,
1408                    'expiration': expiration
1409                    }
1410            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1411            param = {
1412                    'credential': cred,
1413                    'urn': slice_urn,
1414                    'type': 'Slice',
1415                    }
1416            new_scred = segment_commands.slice_authority_call('GetCredential',
1417                    param, ctxt)
1418
1419        except segment_commands.ProtoGENIError, e:
1420            self.log.error("Failed to extend slice %s: %s" % (name, e))
1421            return None
1422        try:
1423            param = {
1424                    'credentials': [new_scred,],
1425                    'slice_urn': slice_urn,
1426                    }
1427            r = segment_commands.component_manager_call('RenewSlice', param, 
1428                    ctxt)
1429        except segment_commands.ProtoGENIError, e:
1430            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1431
1432        return new_scred
1433   
1434
1435    def RenewSlices(self):
1436        self.log.info("Scanning for slices to renew")
1437        self.state_lock.acquire()
1438        aids = self.allocation.keys()
1439        self.state_lock.release()
1440
1441        for aid in aids:
1442            self.state_lock.acquire()
1443            if self.allocation.has_key(aid):
1444                name = self.allocation[aid].get('slice_name', None)
1445                scred = self.allocation[aid].get('slice_credential', None)
1446                slice_urn = self.allocation[aid].get('slice_urn', None)
1447                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1448            else:
1449                name = None
1450                scred = None
1451            self.state_lock.release()
1452
1453            if not os.access(cf, os.R_OK):
1454                self.log.error(
1455                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1456                continue
1457
1458            # There's a ProtoGENI slice associated with the segment; renew it.
1459            if name and scred and slice_urn:
1460                segment_commands = self.api_proxy(log=self.log, 
1461                        debug=self.create_debug, keyfile=ssh_key,
1462                        cm_url = self.cm_url, sa_url = self.sa_url,
1463                        ch_url = self.ch_url)
1464                new_scred = self.renew_segment(segment_commands, name, scred, 
1465                        slice_urn, self.renewal_interval, cf, cpw)
1466                if new_scred:
1467                    self.log.info("Slice %s renewed until %s GMT" % \
1468                            (name, time.asctime(time.gmtime(\
1469                                time.time()+self.renewal_interval))))
1470                    self.state_lock.acquire()
1471                    if self.allocation.has_key(aid):
1472                        self.allocation[aid]['slice_credential'] = new_scred
1473                        self.write_state()
1474                    self.state_lock.release()
1475                else:
1476                    self.log.info("Failed to renew slice %s " % name)
1477
1478        # Let's do this all again soon.  (4 tries before the slices time out)   
1479        t = Timer(self.renewal_interval/4, self.RenewSlices)
1480        t.start()
Note: See TracBrowser for help on using the repository browser.