source: fedd/federation/protogeni_access.py @ 5c0d244

axis_examplecompt_changesinfo-ops
Last change on this file since 5c0d244 was 208797c, checked in by Ted Faber <faber@…>, 14 years ago

Split out the protogeni and GANIAPI proxies into separate files. Support
GENIAPI

  • Property mode set to 100644
File size: 46.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from access_project import access_project
20from fedid import fedid, generate_fedid
21from authorizer import authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24
25import httplib
26import tempfile
27from urlparse import urlparse
28
29from access import access_base
30from protogeni_proxy import protogeni_proxy
31from geniapi_proxy import geniapi_proxy
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.domain = config.get("access", "domain")
60        self.userconfdir = config.get("access","userconfdir")
61        self.userconfcmd = config.get("access","userconfcmd")
62        self.userconfurl = config.get("access","userconfurl")
63        self.federation_software = config.get("access", "federation_software")
64        self.portal_software = config.get("access", "portal_software")
65        self.ssh_port = config.get("access","ssh_port") or "22"
66        self.sshd = config.get("access","sshd")
67        self.sshd_config = config.get("access", "sshd_config")
68        self.access_type = config.get("access", "type")
69        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
70        self.staging_host = config.get("access", "staging_host") \
71                or "ops.emulab.net"
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75   
76        self.dragon_endpoint = config.get("access", "dragon")
77        self.dragon_vlans = config.get("access", "dragon_vlans")
78        self.deter_internal = config.get("access", "deter_internal")
79
80        self.tunnel_config = config.getboolean("access", "tunnel_config")
81        self.portal_command = config.get("access", "portal_command")
82        self.portal_image = config.get("access", "portal_image")
83        self.portal_type = config.get("access", "portal_type") or "pc"
84        self.portal_startcommand = config.get("access", "portal_startcommand")
85        self.node_startcommand = config.get("access", "node_startcommand")
86
87        self.federation_software = self.software_list(self.federation_software)
88        self.portal_software = self.software_list(self.portal_software)
89        self.local_seer_software = self.software_list(self.local_seer_software)
90
91        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
92        self.renewal_interval = int(self.renewal_interval) * 60
93
94        self.ch_url = config.get("access", "ch_url")
95        self.sa_url = config.get("access", "sa_url")
96        self.cm_url = config.get("access", "cm_url")
97
98        self.restricted = [ ]
99
100        # read_state in the base_class
101        self.state_lock.acquire()
102        for a  in ('allocation', 'projects', 'keys', 'types'):
103            if a not in self.state:
104                self.state[a] = { }
105        self.allocation = self.state['allocation']
106        self.projects = self.state['projects']
107        self.keys = self.state['keys']
108        self.types = self.state['types']
109        # Add the ownership attributes to the authorizer.  Note that the
110        # indices of the allocation dict are strings, but the attributes are
111        # fedids, so there is a conversion.
112        for k in self.state.get('allocation', {}).keys():
113            for o in self.state['allocation'][k].get('owners', []):
114                self.auth.set_attribute(o, fedid(hexstr=k))
115            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
116
117        self.state_lock.release()
118
119
120        self.log = logging.getLogger("fedd.access")
121        set_log_level(config, "access", self.log)
122
123        self.access = { }
124        if config.has_option("access", "accessdb"):
125            self.read_access(config.get("access", "accessdb"), 
126                    access_obj=self.make_access_info)
127
128        self.lookup_access = self.lookup_access_base
129
130        api = config.get("access", "api") or "protogeni"
131        if api == "protogeni":
132            self.api_proxy = protogeni_proxy
133        elif api == "geniapi":
134            self.api_proxy = geniapi_proxy
135        else:
136            self.log.debug("Unknown interface, using protogeni")
137            self.api_proxy = protogeni_proxy
138
139        self.call_SetValue = service_caller('SetValue')
140        self.call_GetValue = service_caller('GetValue')
141        self.exports = {
142                'local_seer_control': self.export_local_seer,
143                'seer_master': self.export_seer_master,
144                'hide_hosts': self.export_hide_hosts,
145                }
146
147        if not self.local_seer_image or not self.local_seer_software or \
148                not self.local_seer_start:
149            if 'local_seer_control' in self.exports:
150                del self.exports['local_seer_control']
151
152        if not self.local_seer_image or not self.local_seer_software or \
153                not self.seer_master_start:
154            if 'seer_master' in self.exports:
155                del self.exports['seer_master']
156
157        self.RenewSlices()
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment", 
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176    @staticmethod
177    def make_access_info(s):
178        """
179        Split a string of the form (id, id, id, id) ito its constituent tuples
180        and return them as a tuple.  Use to import access info from the
181        access_db.
182        """
183
184        ss = s.strip()
185        if ss.startswith('(') and ss.endswith(')'):
186            l = [ s.strip() for s  in ss[1:-1].split(",")]
187            if len(l) == 4:
188                return tuple(l)
189            else:
190                raise self.parse_error(
191                        "Exactly 4 elements in access info required")
192        else:
193            raise self.parse_error("Expecting parenthezied values")
194
195
196    def get_handler(self, path, fid):
197        self.log.info("Get handler %s %s" % (path, fid))
198        if self.auth.check_attribute(fid, path) and self.userconfdir:
199            return ("%s/%s" % (self.userconfdir, path), "application/binary")
200        else:
201            return (None, None)
202
203    def build_access_response(self, alloc_id, services):
204        """
205        Create the SOAP response.
206
207        Build the dictionary description of the response and use
208        fedd_utils.pack_soap to create the soap message.  ap is the allocate
209        project message returned from a remote project allocation (even if that
210        allocation was done locally).
211        """
212        # Because alloc_id is already a fedd_services_types.IDType_Holder,
213        # there's no need to repack it
214        msg = { 
215                'allocID': alloc_id,
216                'fedAttr': [
217                    { 'attribute': 'domain', 'value': self.domain } , 
218                ]
219            }
220        if self.dragon_endpoint:
221            msg['fedAttr'].append({'attribute': 'dragon',
222                'value': self.dragon_endpoint})
223        if self.deter_internal:
224            msg['fedAttr'].append({'attribute': 'deter_internal',
225                'value': self.deter_internal})
226        #XXX: ??
227        if self.dragon_vlans:
228            msg['fedAttr'].append({'attribute': 'vlans',
229                'value': self.dragon_vlans})
230
231        if services:
232            msg['service'] = services
233        return msg
234
235    def RequestAccess(self, req, fid):
236        """
237        Handle the access request.
238        """
239
240        # The dance to get into the request body
241        if req.has_key('RequestAccessRequestBody'):
242            req = req['RequestAccessRequestBody']
243        else:
244            raise service_error(service_error.req, "No request!?")
245
246        if req.has_key('destinationTestbed'):
247            dt = unpack_id(req['destinationTestbed'])
248
249        # Request for this fedd
250        found, match = self.lookup_access(req, fid)
251        services, svc_state = self.export_services(req.get('service',[]),
252                None, None)
253        # keep track of what's been added
254        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
255        aid = unicode(allocID)
256
257        self.state_lock.acquire()
258        self.allocation[aid] = { }
259        # The protoGENI certificate
260        self.allocation[aid]['credentials'] = found
261        # The list of owner FIDs
262        self.allocation[aid]['owners'] = [ fid ]
263        self.write_state()
264        self.state_lock.release()
265        self.auth.set_attribute(fid, allocID)
266        self.auth.set_attribute(allocID, allocID)
267
268        try:
269            f = open("%s/%s.pem" % (self.certdir, aid), "w")
270            print >>f, alloc_cert
271            f.close()
272        except EnvironmentError, e:
273            raise service_error(service_error.internal, 
274                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
275        return self.build_access_response({ 'fedid': allocID }, None)
276
277
278    def ReleaseAccess(self, req, fid):
279        # The dance to get into the request body
280        if req.has_key('ReleaseAccessRequestBody'):
281            req = req['ReleaseAccessRequestBody']
282        else:
283            raise service_error(service_error.req, "No request!?")
284
285        # Local request
286        try:
287            if req['allocID'].has_key('localname'):
288                auth_attr = aid = req['allocID']['localname']
289            elif req['allocID'].has_key('fedid'):
290                aid = unicode(req['allocID']['fedid'])
291                auth_attr = req['allocID']['fedid']
292            else:
293                raise service_error(service_error.req,
294                        "Only localnames and fedids are understood")
295        except KeyError:
296            raise service_error(service_error.req, "Badly formed request")
297
298        self.log.debug("[access] deallocation requested for %s", aid)
299        if not self.auth.check_attribute(fid, auth_attr):
300            self.log.debug("[access] deallocation denied for %s", aid)
301            raise service_error(service_error.access, "Access Denied")
302
303        self.state_lock.acquire()
304        if self.allocation.has_key(aid):
305            self.log.debug("Found allocation for %s" %aid)
306            del self.allocation[aid]
307            self.write_state()
308            self.state_lock.release()
309            # And remove the access cert
310            cf = "%s/%s.pem" % (self.certdir, aid)
311            self.log.debug("Removing %s" % cf)
312            os.remove(cf)
313            return { 'allocID': req['allocID'] } 
314        else:
315            self.state_lock.release()
316            raise service_error(service_error.req, "No such allocation")
317
318    def manifest_to_dict(self, manifest, ignore_debug=False):
319        """
320        Turn the manifest into a dict were each virtual nodename (i.e. the
321        topdl name) has an entry with the allocated machine in hostname and the
322        interfaces in 'interfaces'.  I love having XML parser code lying
323        around.
324        """
325        if self.create_debug and not ignore_debug: 
326            self.log.debug("Returning null manifest dict")
327            return { }
328
329        # The class allows us to keep a little state - the dict under
330        # consteruction and the current entry in that dict for the interface
331        # element code.
332        class manifest_parser:
333            def __init__(self):
334                self.d = { }
335                self.current_key=None
336
337            # If the element is a node, create a dict entry for it.  If it's an
338            # interface inside a node, add an entry in the interfaces list with
339            # the virtual name and component id.
340            def start_element(self, name, attrs):
341                if name == 'node':
342                    self.current_key = attrs.get('virtual_id',"")
343                    if self.current_key:
344                        self.d[self.current_key] = {
345                                'hostname': attrs.get('hostname', None),
346                                'interfaces': { }
347                                }
348                elif name == 'interface' and self.current_key:
349                    self.d[self.current_key]['interfaces']\
350                            [attrs.get('virtual_id','')] = \
351                            attrs.get('component_id', None)
352            #  When a node is finished, clear current_key
353            def end_element(self, name):
354                if name == 'node': self.current_key = None
355
356        node = { }
357
358        mp = manifest_parser()
359        p = xml.parsers.expat.ParserCreate()
360        # These are bound to the class we just created
361        p.StartElementHandler = mp.start_element
362        p.EndElementHandler = mp.end_element
363
364        p.Parse(manifest)
365        # Make the node dict that the callers expect
366        for k in mp.d:
367            node[k] = mp.d.get('hostname', '')
368        return mp.d
369
370    def fake_manifest(self, topo):
371        """
372        Fake the output of manifest_to_dict with a bunch of generic node an
373        interface names, for debugging.
374        """
375        node = { }
376        for i, e in enumerate([ e for e in topo.elements \
377                if isinstance(e, topdl.Computer)]):
378            node[e.name] = { 
379                    'hostname': "node%03d" % i,
380                    'interfaces': { }
381                    }
382            for j, inf in enumerate(e.interface):
383                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
384
385        return node
386
387
388    def generate_portal_configs(self, topo, pubkey_base, 
389            secretkey_base, tmpdir, leid, connInfo, services, nodes):
390
391        def conninfo_to_dict(key, info):
392            """
393            Make a cpoy of the connection information about key, and flatten it
394            into a single dict by parsing out any feddAttrs.
395            """
396
397            rv = None
398            for i in info:
399                if key == i.get('portal', "") or \
400                        key in [e.get('element', "") \
401                        for e in i.get('member', [])]:
402                    rv = i.copy()
403                    break
404
405            else:
406                return rv
407
408            if 'fedAttr' in rv:
409                for a in rv['fedAttr']:
410                    attr = a.get('attribute', "")
411                    val = a.get('value', "")
412                    if attr and attr not in rv:
413                        rv[attr] = val
414                del rv['fedAttr']
415            return rv
416
417        # XXX: un hardcode this
418        def client_null(f, s):
419            print >>f, "Service: %s" % s['name']
420       
421        def client_seer_master(f, s):
422            print >>f, 'PortalAlias: seer-master'
423
424        def client_smb(f, s):
425            print >>f, "Service: %s" % s['name']
426            smbshare = None
427            smbuser = None
428            smbproj = None
429            for a in s.get('fedAttr', []):
430                if a.get('attribute', '') == 'SMBSHARE':
431                    smbshare = a.get('value', None)
432                elif a.get('attribute', '') == 'SMBUSER':
433                    smbuser = a.get('value', None)
434                elif a.get('attribute', '') == 'SMBPROJ':
435                    smbproj = a.get('value', None)
436
437            if all((smbshare, smbuser, smbproj)):
438                print >>f, "SMBshare: %s" % smbshare
439                print >>f, "ProjectUser: %s" % smbuser
440                print >>f, "ProjectName: %s" % smbproj
441
442        def client_hide_hosts(f, s):
443            for a in s.get('fedAttr', [ ]):
444                if a.get('attribute', "") == 'hosts':
445                    print >>f, 'Hide: %s' % a.get('value', "")
446
447        client_service_out = {
448                'SMB': client_smb,
449                'tmcd': client_null,
450                'seer': client_null,
451                'userconfig': client_null,
452                'project_export': client_null,
453                'seer_master': client_seer_master,
454                'hide_hosts': client_hide_hosts,
455            }
456
457        def client_seer_master_export(f, s):
458            print >>f, "AddedNode: seer-master"
459
460        def client_seer_local_export(f, s):
461            print >>f, "AddedNode: control"
462
463        client_export_service_out = {
464                'seer_master': client_seer_master_export,
465                'local_seer_control': client_seer_local_export,
466            }
467
468        def server_port(f, s):
469            p = urlparse(s.get('server', 'http://localhost'))
470            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
471
472        def server_null(f,s): pass
473
474        def server_seer(f, s):
475            print >>f, 'seer: true'
476
477        server_service_out = {
478                'SMB': server_port,
479                'tmcd': server_port,
480                'userconfig': server_null,
481                'project_export': server_null,
482                'seer': server_seer,
483                'seer_master': server_port,
484                'hide_hosts': server_null,
485            }
486        # XXX: end un hardcode this
487
488
489        seer_out = False
490        client_out = False
491        for e in [ e for e in topo.elements \
492                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
493            myname = e.name
494            type = e.get_attribute('portal_type')
495
496            info = conninfo_to_dict(myname, connInfo)
497
498            if not info:
499                raise service_error(service_error.req,
500                        "No connectivity info for %s" % myname)
501
502            # Translate to physical name (ProtoGENI doesn't have DNS)
503            physname = nodes.get(myname, { }).get('hostname', None)
504            peer = info.get('peer', "")
505            ldomain = self.domain
506            ssh_port = info.get('ssh_port', 22)
507
508            # Collect this for the client.conf file
509            if 'masterexperiment' in info:
510                mproj, meid = info['masterexperiment'].split("/", 1)
511
512            active = info.get('active', 'False')
513
514            if type in ('control', 'both'):
515                testbed = e.get_attribute('testbed')
516                control_gw = myname
517
518            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
519            tunnelconfig = self.tunnel_config
520            try:
521                f = open(cfn, "w")
522                if active == 'True':
523                    print >>f, "active: True"
524                    print >>f, "ssh_port: %s" % ssh_port
525                    if type in ('control', 'both'):
526                        for s in [s for s in services \
527                                if s.get('name', "") in self.imports]:
528                            server_service_out[s['name']](f, s)
529
530                if tunnelconfig:
531                    print >>f, "tunnelip: %s" % tunnelconfig
532                print >>f, "peer: %s" % peer.lower()
533                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
534                        pubkey_base
535                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
536                        secretkey_base
537                f.close()
538            except EnvironmentError, e:
539                raise service_error(service_error.internal,
540                        "Can't write protal config %s: %s" % (cfn, e))
541
542        # Done with portals, write the client config file.
543        try:
544            f = open("%s/client.conf" % tmpdir, "w")
545            if control_gw:
546                print >>f, "ControlGateway: %s" % physname.lower()
547            for s in services:
548                if s.get('name',"") in self.imports and \
549                        s.get('visibility','') == 'import':
550                    client_service_out[s['name']](f, s)
551                if s.get('name', '') in self.exports and \
552                        s.get('visibility', '') == 'export' and \
553                        s['name'] in client_export_service_out:
554                    client_export_service_out[s['name']](f, s)
555            # Seer uses this.
556            if mproj and meid:
557                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
558            f.close()
559        except EnvironmentError, e:
560            raise service_error(service_error.internal,
561                    "Cannot write client.conf: %s" %s)
562
563
564
565    def export_store_info(self, cf, nodes, ssh_port, connInfo):
566        """
567        For the export requests in the connection info, install the peer names
568        at the experiment controller via SetValue calls.
569        """
570
571        for c in connInfo:
572            for p in [ p for p in c.get('parameter', []) \
573                    if p.get('type', '') == 'output']:
574
575                if p.get('name', '') == 'peer':
576                    k = p.get('key', None)
577                    surl = p.get('store', None)
578                    if surl and k and k.index('/') != -1:
579                        if self.create_debug:
580                            req = { 'name': k, 'value': 'debug' }
581                            self.call_SetValue(surl, req, cf)
582                        else:
583                            n = nodes.get(k[k.index('/')+1:], { })
584                            value = n.get('hostname', None)
585                            if value:
586                                req = { 'name': k, 'value': value }
587                                self.call_SetValue(surl, req, cf)
588                            else:
589                                self.log.error("No hostname for %s" % \
590                                        k[k.index('/'):])
591                    else:
592                        self.log.error("Bad export request: %s" % p)
593                elif p.get('name', '') == 'ssh_port':
594                    k = p.get('key', None)
595                    surl = p.get('store', None)
596                    if surl and k:
597                        req = { 'name': k, 'value': ssh_port }
598                        self.call_SetValue(surl, req, cf)
599                    else:
600                        self.log.error("Bad export request: %s" % p)
601                else:
602
603                    self.log.error("Unknown export parameter: %s" % \
604                            p.get('name'))
605                    continue
606
607    def write_node_config_script(self, elem, node, user, pubkey,
608            secretkey, stagingdir, tmpdir):
609        """
610        Write out the configuration script that is to run on the node
611        represented by elem in the topology.  This is called
612        once per node to configure.
613        """
614        # These little functions/functors just make things more readable.  Each
615        # one encapsulates a small task of copying software files or installing
616        # them.
617        class stage_file_type:
618            """
619            Write code copying file sfrom the staging host to the host on which
620            this will run.
621            """
622            def __init__(self, user, host, stagingdir):
623                self.user = user
624                self.host = host
625                self.stagingdir = stagingdir
626                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
627                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
628
629            def __call__(self, script, file, dest="."):
630                # If the file is a full pathname, do not use stagingdir
631                if file.find('/') == -1:
632                    file = "%s/%s" % (self.stagingdir, file)
633                print >>script, "%s %s@%s:%s %s" % \
634                        (self.scp, self.user, self.host, file, dest)
635
636        def install_tar(script, loc, base):
637            """
638            Print code to script to install a tarfile in loc.
639            """
640            tar = "/bin/tar"
641            mkdir="/bin/mkdir"
642
643            print >>script, "%s -p %s" % (mkdir, loc)
644            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
645
646        def install_rpm(script, base):
647            """
648            Print code to script to install an rpm
649            """
650            rpm = "/bin/rpm"
651            print >>script, "%s --install %s" % (rpm, base)
652
653        ifconfig = "/sbin/ifconfig"
654        stage_file = stage_file_type(user, self.staging_host, stagingdir)
655        pname = node.get('hostname', None)
656        fed_dir = "/usr/local/federation"
657        fed_etc_dir = "%s/etc" % fed_dir
658        fed_bin_dir = "%s/bin" % fed_dir
659        fed_lib_dir = "%s/lib" % fed_dir
660
661        if pname:
662            sfile = "%s/%s.startup" % (tmpdir, pname)
663            script = open(sfile, "w")
664            # Reset the interfaces to the ones in the topo file
665            for i in [ i for i in elem.interface \
666                    if not i.get_attribute('portal')]:
667                pinf = node['interfaces'].get(i.name, None)
668                addr = i.get_attribute('ip4_address') 
669                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
670                if pinf and addr:
671                    print >>script, \
672                            "%s %s %s netmask %s"  % \
673                            (ifconfig, pinf, addr, netmask)
674                else:
675                    self.log.error("Missing interface or address for %s" \
676                            % i.name)
677               
678            for l, f in self.federation_software:
679                base = os.path.basename(f)
680                stage_file(script, base)
681                if l: install_tar(script, l, base)
682                else: install_rpm(script, base)
683
684            for s in elem.software:
685                s_base = s.location.rpartition('/')[2]
686                stage_file(script, s_base)
687                if s.install: install_tar(script, s.install, s_base)
688                else: install_rpm(script, s_base)
689
690            for f in ('hosts', pubkey, secretkey, 'client.conf', 
691                    'userconf'):
692                stage_file(script, f, fed_etc_dir)
693            if self.sshd:
694                stage_file(script, self.sshd, fed_bin_dir)
695            if self.sshd_config:
696                stage_file(script, self.sshd_config, fed_etc_dir)
697
698            # Look in tmpdir to get the names.  They've all been copied
699            # into the (remote) staging dir
700            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
701                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
702
703            # Done with staging, remove the identity used to stage
704            print >>script, "#/bin/rm .ssh/id_rsa"
705
706            # Start commands
707            if elem.get_attribute('portal') and self.portal_startcommand:
708                # Install portal software
709                for l, f in self.portal_software:
710                    base = os.path.basename(f)
711                    stage_file(script, base)
712                    if l: install_tar(script, l, base)
713                    else: install_rpm(script, base)
714
715                # Portals never have a user-specified start command
716                print >>script, self.portal_startcommand
717            elif self.node_startcommand:
718                # XXX: debug
719                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
720                # XXX: debug
721                if elem.get_attribute('startup'):
722                    print >>script, "%s \\$USER '%s'" % \
723                            (self.node_startcommand,
724                                    elem.get_attribute('startup'))
725                else:
726                    print >>script, self.node_startcommand
727            script.close()
728            return sfile, pname
729        else:
730            return None, None
731
732
733    def configure_nodes(self, segment_commands, topo, nodes, user, 
734            pubkey, secretkey, stagingdir, tmpdir):
735        """
736        For each node in the topology, generate a script file that copies
737        software onto it and installs it in the proper places and then runs the
738        startup command (including the federation commands.
739        """
740
741
742
743        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
744            vname = e.name
745            sfile, pname = self.write_node_config_script(e,
746                    nodes.get(vname, { }),
747                    user, pubkey, secretkey, stagingdir, tmpdir)
748            if sfile:
749                if not segment_commands.scp_file(sfile, user, pname):
750                    self.log.error("Could not copy script to %s" % pname)
751            else:
752                self.log.error("Unmapped node: %s" % vname)
753
754    def start_node(self, user, host, node, segment_commands):
755        """
756        Copy an identity to a node for the configuration script to be able to
757        import data and then run the startup script remotely.
758        """
759        # Place an identity on the node so that the copying can succeed
760        segment_commands.scp_file( segment_commands.ssh_privkey_file,
761                user, node, ".ssh/id_rsa")
762        segment_commands.ssh_cmd(user, node, 
763                "sudo /bin/sh ./%s.startup &" % node)
764
765    def start_nodes(self, user, host, nodes, segment_commands):
766        """
767        Start a thread to initialize each node and wait for them to complete.
768        Each thread runs start_node.
769        """
770        threads = [ ]
771        for n in nodes:
772            t = Thread(target=self.start_node, args=(user, host, n, 
773                segment_commands))
774            t.start()
775            threads.append(t)
776
777        done = [not t.isAlive() for t in threads]
778        while not all(done):
779            self.log.info("Waiting for threads %s" % done)
780            time.sleep(10)
781            done = [not t.isAlive() for t in threads]
782
783    def set_up_staging_filespace(self, segment_commands, user, host,
784            stagingdir):
785        """
786        Set up teh staging area on the staging machine.  To reduce the number
787        of ssh commands, we compose a script and execute it remotely.
788        """
789
790        self.log.info("[start_segment]: creating script file")
791        try:
792            sf, scriptname = tempfile.mkstemp()
793            scriptfile = os.fdopen(sf, 'w')
794        except EnvironmentError:
795            return False
796
797        scriptbase = os.path.basename(scriptname)
798
799        # Script the filesystem changes
800        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
801        print >>scriptfile, 'mkdir -p %s' % stagingdir
802        print >>scriptfile, "rm -f %s" % scriptbase
803        scriptfile.close()
804
805        # Move the script to the remote machine
806        # XXX: could collide tempfile names on the remote host
807        if segment_commands.scp_file(scriptname, user, host, scriptbase):
808            os.remove(scriptname)
809        else:
810            return False
811
812        # Execute the script (and the script's last line deletes it)
813        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
814            return False
815
816    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
817        """
818        Protogeni interactions take a context and a protogeni certificate.
819        This establishes both for later calls and returns them.
820        """
821        if os.access(certfile, os.R_OK):
822            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
823        else:
824            self.log.error("[start_segment]: Cannot read certfile: %s" % \
825                    certfile)
826            return None, None
827
828        try:
829            gcred = segment_commands.slice_authority_call('GetCredential', 
830                    {}, ctxt)
831        except segment_commands.ProtoGENIError, e:
832            raise service_error(service_error.federant,
833                    "ProtoGENI: %s" % e)
834
835        return ctxt, gcred
836
837    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
838        """
839        Find a usable slice name by trying random ones until there's no
840        collision.
841        """
842
843        def random_slicename(user):
844            """
845            Return a random slicename by appending 5 letters to the username.
846            """
847            slicename = user
848            for i in range(0,5):
849                slicename += random.choice(string.ascii_letters)
850            return slicename
851
852        while True:
853            slicename = random_slicename(user)
854            try:
855                param = {
856                        'credential': gcred, 
857                        'hrn': slicename,
858                        'type': 'Slice'
859                        }
860                segment_commands.slice_authority_call('Resolve', param, ctxt)
861            except segment_commands.ProtoGENIError, e:
862                print e
863                break
864
865        return slicename
866
867    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
868        """
869        Create the slice and allocate resources.  If any of this stuff fails,
870        the allocations will time out on PG in short order, so we just raise
871        the service_error.  Return the slice and sliver credentials as well as
872        the manifest.
873        """
874        try:
875            param = {
876                    'credential': gcred, 
877                    'hrn': slicename,
878                    'type': 'Slice'
879                    }
880            slice_cred = segment_commands.slice_authority_call('Register',
881                    param, ctxt)
882            # Resolve the slice to get the URN that PG has assigned it.
883            param = {
884                    'credential': gcred,
885                    'type': 'Slice',
886                    'hrn': slicename
887                    }
888            data = segment_commands.slice_authority_call('Resolve', param,
889                    ctxt)
890            if 'urn' in data:
891                slice_urn = data['urn']
892            else:
893                raise service_error(service_error.federant, 
894                        "No URN returned for slice %s" % hrn)
895
896            if 'creator_urn' in data:
897                creator_urn = data['creator_urn']
898            else:
899                raise service_error(service_error.federant, 
900                        "No creator URN returned for slice %s" % hrn)
901            # Populate the ssh keys (let PG format them)
902            param = {
903                    'credential': gcred, 
904                    }
905            keys =  segment_commands.slice_authority_call('GetKeys', param, 
906                    ctxt)
907            # Create a Sliver
908            param = {
909                    'credentials': [ slice_cred ],
910                    'rspec': rspec, 
911                    'users': [ {
912                        'urn': creator_urn,
913                        'keys': keys, 
914                        },
915                    ],
916                    'slice_urn': slice_urn,
917                    }
918            rv = segment_commands.component_manager_call(
919                    'CreateSliver', param, ctxt)
920
921            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
922            # API hands back a sliver credential.  This just finds the format
923            # of choice.
924            if isinstance(rv, tuple):
925                manifest = rv[1]
926            else:
927                manifest = rv
928        except segment_commands.ProtoGENIError, e:
929            raise service_error(service_error.federant,
930                    "ProtoGENI: %s %s" % (e.code, e))
931
932        return (slice_urn, slice_cred, manifest)
933
934    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
935            timeout=None):
936        """
937        Wait for the given slice to finish its startup.  Return the final
938        status.
939        """
940        completed_states = ('failed', 'ready')
941        status = 'changing'
942        if timeout is not None:
943            end = time.time() + timeout
944        try:
945            while status not in completed_states:
946                param = { 
947                        'credentials': [ slice_cred ],
948                        'slice_urn': slice_urn,
949                        }
950                r = segment_commands.component_manager_call(
951                        'SliverStatus', param, ctxt)
952                # GENIAPI uses geni_status as the key, so check for both
953                status = r.get('status', r.get('geni_status','changing'))
954                if status not in completed_states:
955                    if timeout is not None and time.time() > end:
956                        return 'timeout'
957                    time.sleep(30)
958        except segment_commands.ProtoGENIError, e:
959            raise service_error(service_error.federant,
960                    "ProtoGENI: %s %s" % (e.code, e))
961
962        return status
963
964    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
965        """
966        Delete the slice resources.  An error from the service is ignores,
967        because the soft state will go away anyway.
968        """
969        try:
970            param = { 
971                    'credentials': [ slice_cred, ],
972                    'slice_urn': slice_urn,
973                    }
974            segment_commands.component_manager_call('DeleteSlice',
975                    param, ctxt)
976        except segment_commands.ProtoGENIError, e:
977            self.log.warn("ProtoGENI: %s" % e)
978
979
980
981    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
982            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
983            export_certfile, topo, connInfo, services, timeout=0):
984        """
985        Start a sub-experiment on a federant.
986
987        Get the current state, modify or create as appropriate, ship data
988        and configs and start the experiment.  There are small ordering
989        differences based on the initial state of the sub-experiment.
990        """
991
992        # Local software dir
993        lsoftdir = "%s/software" % tmpdir
994        host = self.staging_host
995
996        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
997                certfile, certpw)
998
999        if not ctxt: return False
1000
1001        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1002        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1003        self.log.info("Creating %s" % slicename)
1004        slice_urn, slice_cred, manifest = self.allocate_slice(
1005                segment_commands, slicename, rspec, gcred, ctxt)
1006
1007        # With manifest in hand, we can export the portal node names.
1008        if self.create_debug: nodes = self.fake_manifest(topo)
1009        else: nodes = self.manifest_to_dict(manifest)
1010
1011        self.export_store_info(export_certfile, nodes, self.ssh_port,
1012                connInfo)
1013        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1014                ename, connInfo, services, nodes)
1015
1016        # Copy software to the staging machine (done after generation to copy
1017        # those, too)
1018        for d in (tmpdir, lsoftdir):
1019            if os.path.isdir(d):
1020                for f in os.listdir(d):
1021                    if not os.path.isdir("%s/%s" % (d, f)):
1022                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1023                                user, host, "%s/%s" % (stagingdir, f)):
1024                            self.log.error("Scp failed")
1025                            return False
1026
1027
1028        # Now we wait for the nodes to start on PG
1029        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1030                ctxt, timeout=300)
1031        if status == 'failed':
1032            self.log.error('Sliver failed to start on ProtoGENI')
1033            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1034            return False
1035        elif status == 'timeout':
1036            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1037            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1038            return False
1039        else:
1040            # All good: save ProtoGENI info in shared state
1041            self.state_lock.acquire()
1042            self.allocation[aid]['slice_urn'] = slice_urn
1043            self.allocation[aid]['slice_name'] = slicename
1044            self.allocation[aid]['slice_credential'] = slice_cred
1045            self.allocation[aid]['manifest'] = manifest
1046            self.allocation[aid]['certfile'] = certfile
1047            self.allocation[aid]['certpw'] = certpw
1048            self.write_state()
1049            self.state_lock.release()
1050
1051        # Now we have configuration to do for ProtoGENI
1052        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1053                secretkey, stagingdir, tmpdir)
1054
1055        self.start_nodes(user, self.staging_host, 
1056                [ n.get('hostname', None) for n in nodes.values()],
1057                segment_commands)
1058
1059        # Everything has gone OK.
1060        return True, dict([(k, n.get('hostname', None)) \
1061                for k, n in nodes.items()])
1062
1063    def generate_rspec(self, topo, softdir, connInfo):
1064
1065        # Force a useful image.  Without picking this the nodes can get
1066        # different images and there is great pain.
1067        def image_filter(e):
1068            if isinstance(e, topdl.Computer):
1069                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1070                        'image+emulab-ops//FEDORA10-STD" />'
1071            else:
1072                return ""
1073        # Main line of generate
1074        t = topo.clone()
1075
1076        starts = { }
1077        # Weed out the things we aren't going to instantiate: Segments, portal
1078        # substrates, and portal interfaces.  (The copy in the for loop allows
1079        # us to delete from e.elements in side the for loop).  While we're
1080        # touching all the elements, we also adjust paths from the original
1081        # testbed to local testbed paths and put the federation commands and
1082        # startcommands into a dict so we can start them manually later.
1083        # ProtoGENI requires setup before the federation commands run, so we
1084        # run them by hand after we've seeded configurations.
1085        for e in [e for e in t.elements]:
1086            if isinstance(e, topdl.Segment):
1087                t.elements.remove(e)
1088            # Fix software paths
1089            for s in getattr(e, 'software', []):
1090                s.location = re.sub("^.*/", softdir, s.location)
1091            if isinstance(e, topdl.Computer):
1092                if e.get_attribute('portal') and self.portal_startcommand:
1093                    # Portals never have a user-specified start command
1094                    starts[e.name] = self.portal_startcommand
1095                elif self.node_startcommand:
1096                    if e.get_attribute('startup'):
1097                        starts[e.name] = "%s \\$USER '%s'" % \
1098                                (self.node_startcommand, 
1099                                        e.get_attribute('startup'))
1100                        e.remove_attribute('startup')
1101                    else:
1102                        starts[e.name] = self.node_startcommand
1103
1104                # Remove portal interfaces
1105                e.interface = [i for i in e.interface \
1106                        if not i.get_attribute('portal')]
1107
1108        t.substrates = [ s.clone() for s in t.substrates ]
1109        t.incorporate_elements()
1110
1111        # Customize the rspec output to use the image we like
1112        filters = [ image_filter ]
1113
1114        # Convert to rspec and return it
1115        exp_rspec = topdl.topology_to_rspec(t, filters)
1116
1117        return exp_rspec
1118
1119    def retrieve_software(self, topo, certfile, softdir):
1120        """
1121        Collect the software that nodes in the topology need loaded and stage
1122        it locally.  This implies retrieving it from the experiment_controller
1123        and placing it into softdir.  Certfile is used to prove that this node
1124        has access to that data (it's the allocation/segment fedid).  Finally
1125        local portal and federation software is also copied to the same staging
1126        directory for simplicity - all software needed for experiment creation
1127        is in softdir.
1128        """
1129        sw = set()
1130        for e in topo.elements:
1131            for s in getattr(e, 'software', []):
1132                sw.add(s.location)
1133        os.mkdir(softdir)
1134        for s in sw:
1135            self.log.debug("Retrieving %s" % s)
1136            try:
1137                get_url(s, certfile, softdir)
1138            except:
1139                t, v, st = sys.exc_info()
1140                raise service_error(service_error.internal,
1141                        "Error retrieving %s: %s" % (s, v))
1142
1143        # Copy local portal node software to the tempdir
1144        for s in (self.portal_software, self.federation_software):
1145            for l, f in s:
1146                base = os.path.basename(f)
1147                copy_file(f, "%s/%s" % (softdir, base))
1148
1149
1150    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1151        """
1152        Gather common configuration files, retrieve or create an experiment
1153        name and project name, and return the ssh_key filenames.  Create an
1154        allocation log bound to the state log variable as well.
1155        """
1156        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1157        ename = None
1158        pubkey_base = None
1159        secretkey_base = None
1160        alloc_log = None
1161
1162        for a in attrs:
1163            if a['attribute'] in configs:
1164                try:
1165                    self.log.debug("Retrieving %s" % a['value'])
1166                    get_url(a['value'], certfile, tmpdir)
1167                except:
1168                    t, v, st = sys.exc_info()
1169                    raise service_error(service_error.internal,
1170                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1171            if a['attribute'] == 'ssh_pubkey':
1172                pubkey_base = a['value'].rpartition('/')[2]
1173            if a['attribute'] == 'ssh_secretkey':
1174                secretkey_base = a['value'].rpartition('/')[2]
1175            if a['attribute'] == 'experiment_name':
1176                ename = a['value']
1177
1178        if not ename:
1179            ename = ""
1180            for i in range(0,5):
1181                ename += random.choice(string.ascii_letters)
1182            self.log.warn("No experiment name: picked one randomly: %s" \
1183                    % ename)
1184
1185        self.state_lock.acquire()
1186        if self.allocation.has_key(aid):
1187            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1188            self.allocation[aid]['experiment'] = ename
1189            self.allocation[aid]['log'] = [ ]
1190            # Create a logger that logs to the experiment's state object as
1191            # well as to the main log file.
1192            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1193            h = logging.StreamHandler(
1194                    list_log.list_log(self.allocation[aid]['log']))
1195            # XXX: there should be a global one of these rather than
1196            # repeating the code.
1197            h.setFormatter(logging.Formatter(
1198                "%(asctime)s %(name)s %(message)s",
1199                        '%d %b %y %H:%M:%S'))
1200            alloc_log.addHandler(h)
1201            self.write_state()
1202        else:
1203            self.log.error("No allocation for %s!?" % aid)
1204        self.state_lock.release()
1205
1206        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1207                cpw, alloc_log)
1208
1209    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1210        # Copy the assigned names into the return topology
1211        rvtopo = topo.clone()
1212        embedding = [ ]
1213        for k, n in nodes.items():
1214            embedding.append({ 
1215                'toponame': k,
1216                'physname': [n ],
1217                })
1218        # Grab the log (this is some anal locking, but better safe than
1219        # sorry)
1220        self.state_lock.acquire()
1221        logv = "".join(self.allocation[aid]['log'])
1222        # It's possible that the StartSegment call gets retried (!).
1223        # if the 'started' key is in the allocation, we'll return it rather
1224        # than redo the setup.
1225        self.allocation[aid]['started'] = { 
1226                'allocID': alloc_id,
1227                'allocationLog': logv,
1228                'segmentdescription': { 
1229                    'topdldescription': rvtopo.to_dict() },
1230                'embedding': embedding,
1231                }
1232        retval = copy.deepcopy(self.allocation[aid]['started'])
1233        self.write_state()
1234        self.state_lock.release()
1235
1236        return retval
1237
1238    def StartSegment(self, req, fid):
1239        err = None  # Any service_error generated after tmpdir is created
1240        rv = None   # Return value from segment creation
1241
1242        try:
1243            req = req['StartSegmentRequestBody']
1244            topref = req['segmentdescription']['topdldescription']
1245        except KeyError:
1246            raise service_error(service_error.req, "Badly formed request")
1247
1248        connInfo = req.get('connection', [])
1249        services = req.get('service', [])
1250        auth_attr = req['allocID']['fedid']
1251        aid = "%s" % auth_attr
1252        attrs = req.get('fedAttr', [])
1253        if not self.auth.check_attribute(fid, auth_attr):
1254            raise service_error(service_error.access, "Access denied")
1255        else:
1256            # See if this is a replay of an earlier succeeded StartSegment -
1257            # sometimes SSL kills 'em.  If so, replay the response rather than
1258            # redoing the allocation.
1259            self.state_lock.acquire()
1260            retval = self.allocation[aid].get('started', None)
1261            self.state_lock.release()
1262            if retval:
1263                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1264                        "replaying response")
1265                return retval
1266
1267        if topref:
1268            topo = topdl.Topology(**topref)
1269        else:
1270            raise service_error(service_error.req, 
1271                    "Request missing segmentdescription'")
1272
1273        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1274        try:
1275            tmpdir = tempfile.mkdtemp(prefix="access-")
1276            softdir = "%s/software" % tmpdir
1277        except EnvironmentError:
1278            raise service_error(service_error.internal, "Cannot create tmp dir")
1279
1280        # Try block alllows us to clean up temporary files.
1281        try:
1282            self.retrieve_software(topo, certfile, softdir)
1283            self.configure_userconf(services, tmpdir)
1284            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1285                cpw, alloc_log = self.initialize_experiment_info(attrs,
1286                        aid, certfile, tmpdir)
1287            self.import_store_info(certfile, connInfo)
1288            rspec = self.generate_rspec(topo, "%s/%s/" \
1289                    % (self.staging_dir, ename), connInfo)
1290
1291            segment_commands = self.api_proxy(keyfile=ssh_key,
1292                    debug=self.create_debug, log=alloc_log,
1293                    ch_url = self.ch_url, sa_url=self.sa_url,
1294                    cm_url=self.cm_url)
1295            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1296                    pubkey_base, 
1297                    secretkey_base, ename,
1298                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1299                    certfile, topo, connInfo, services)
1300        except EnvironmentError, e:
1301            err = service_error(service_error.internal, "%s" % e)
1302        except service_error, e:
1303            err = e
1304        except:
1305            t, v, st = sys.exc_info()
1306            err = service_error(service_error.internal, "%s: %s" % \
1307                    (v, traceback.extract_tb(st)))
1308
1309        # Walk up tmpdir, deleting as we go
1310        if self.cleanup: self.remove_dirs(tmpdir)
1311        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1312
1313        if rv:
1314            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1315        elif err:
1316            raise service_error(service_error.federant,
1317                    "Swapin failed: %s" % err)
1318        else:
1319            raise service_error(service_error.federant, "Swapin failed")
1320
1321    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1322            slice_urn, certfile, certpw):
1323        """
1324        Stop a sub experiment by calling swapexp on the federant
1325        """
1326        host = self.staging_host
1327        rv = False
1328        try:
1329            # Clean out tar files: we've gone over quota in the past
1330            if stagingdir:
1331                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1332            if slice_cred:
1333                self.log.error('Removing Sliver on ProtoGENI')
1334                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1335                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1336            return True
1337        except self.ssh_cmd_timeout:
1338            rv = False
1339        return rv
1340
1341    def TerminateSegment(self, req, fid):
1342        try:
1343            req = req['TerminateSegmentRequestBody']
1344        except KeyError:
1345            raise service_error(service_error.req, "Badly formed request")
1346
1347        auth_attr = req['allocID']['fedid']
1348        aid = "%s" % auth_attr
1349        attrs = req.get('fedAttr', [])
1350        if not self.auth.check_attribute(fid, auth_attr):
1351            raise service_error(service_error.access, "Access denied")
1352
1353        self.state_lock.acquire()
1354        if self.allocation.has_key(aid):
1355            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1356            slice_cred = self.allocation[aid].get('slice_credential', None)
1357            slice_urn = self.allocation[aid].get('slice_urn', None)
1358            ename = self.allocation[aid].get('experiment', None)
1359        else:
1360            cf, user, ssh_key, cpw = (None, None, None, None)
1361            slice_cred = None
1362            slice_urn = None
1363            ename = None
1364        self.state_lock.release()
1365
1366        if ename:
1367            staging = "%s/%s" % ( self.staging_dir, ename)
1368        else:
1369            self.log.warn("Can't find experiment name for %s" % aid)
1370            staging = None
1371
1372        segment_commands = self.api_proxy(keyfile=ssh_key,
1373                debug=self.create_debug, ch_url = self.ch_url,
1374                sa_url=self.sa_url, cm_url=self.cm_url)
1375        self.stop_segment(segment_commands, user, staging, slice_cred,
1376                slice_urn, cf, cpw)
1377        return { 'allocID': req['allocID'] }
1378
1379    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1380            certfile, certpw):
1381        """
1382        Linear code through the segment renewal calls.
1383        """
1384        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1385        try:
1386            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1387                    time.gmtime(time.time() + interval))
1388            cred = segment_commands.slice_authority_call('GetCredential', 
1389                    {}, ctxt)
1390
1391            param = {
1392                    'credential': scred,
1393                    'expiration': expiration
1394                    }
1395            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1396            param = {
1397                    'credential': cred,
1398                    'hrn': name,
1399                    'type': 'Slice',
1400                    }
1401            slice = segment_commands.slice_authority_call('Resolve', 
1402                    param, ctxt)
1403            uuid = slice.get('uuid', None)
1404            if uuid == None:
1405                sys.exit('No uuid for %s' % slicename)
1406
1407            print 'Calling GetCredential (uuid)'
1408            param = {
1409                    'credential': cred,
1410                    'uuid': uuid,
1411                    'type': 'Slice',
1412                    }
1413            new_scred = segment_commands.slice_authority_call('GetCredential',
1414                    param, ctxt)
1415
1416        except segment_commands.ProtoGENIError, e:
1417            self.log.error("Failed to extend slice %s: %s" % (name, e))
1418            return None
1419        try:
1420            print 'Calling RenewSlice (CM)'
1421            param = {
1422                    'credentials': [new_scred,],
1423                    'slice_urn': slice_urn,
1424                    }
1425            r = segment_commands.component_manager_call('RenewSlice', param, 
1426                    ctxt)
1427        except segment_commands.ProtoGENIError, e:
1428            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1429
1430        return new_scred
1431   
1432
1433    def RenewSlices(self):
1434        self.log.info("Scanning for slices to renew")
1435        self.state_lock.acquire()
1436        aids = self.allocation.keys()
1437        self.state_lock.release()
1438
1439        for aid in aids:
1440            self.state_lock.acquire()
1441            if self.allocation.has_key(aid):
1442                name = self.allocation[aid].get('slice_name', None)
1443                scred = self.allocation[aid].get('slice_credential', None)
1444                slice_urn = self.allocation[aid].get('slice_urn', None)
1445                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1446            else:
1447                name = None
1448                scred = None
1449            self.state_lock.release()
1450
1451            if not os.access(cf, os.R_OK):
1452                self.log.error(
1453                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1454                continue
1455
1456            # There's a ProtoGENI slice associated with the segment; renew it.
1457            if name and scred and slice_urn:
1458                segment_commands = self.api_proxy(log=self.log, 
1459                        debug=self.create_debug, keyfile=ssh_key,
1460                        cm_url = self.cm_url, sa_url = self.sa_url,
1461                        ch_url = self.ch_url)
1462                new_scred = self.renew_segment(segment_commands, name, scred, 
1463                        slice_urn, self.renewal_interval, cf, cpw)
1464                if new_scred:
1465                    self.log.info("Slice %s renewed until %s GMT" % \
1466                            (name, time.asctime(time.gmtime(\
1467                                time.time()+self.renewal_interval))))
1468                    self.state_lock.acquire()
1469                    if self.allocation.has_key(aid):
1470                        self.allocation[aid]['slice_credential'] = new_scred
1471                        self.write_state()
1472                    self.state_lock.release()
1473                else:
1474                    self.log.info("Failed to renew slice %s " % name)
1475
1476        # Let's do this all again soon.  (4 tries before the slices time out)   
1477        t = Timer(self.renewal_interval/4, self.RenewSlices)
1478        t.start()
Note: See TracBrowser for help on using the repository browser.