source: fedd/federation/protogeni_access.py @ add53ea

axis_examplecompt_changesinfo-ops
Last change on this file since add53ea was add53ea, checked in by Ted Faber <faber@…>, 14 years ago

capture rspec for debugging

  • Property mode set to 100644
File size: 46.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from access_project import access_project
20from fedid import fedid, generate_fedid
21from authorizer import authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24
25import httplib
26import tempfile
27from urlparse import urlparse
28
29from access import access_base
30from protogeni_proxy import protogeni_proxy
31from geniapi_proxy import geniapi_proxy
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.domain = config.get("access", "domain")
60        self.userconfdir = config.get("access","userconfdir")
61        self.userconfcmd = config.get("access","userconfcmd")
62        self.userconfurl = config.get("access","userconfurl")
63        self.federation_software = config.get("access", "federation_software")
64        self.portal_software = config.get("access", "portal_software")
65        self.ssh_port = config.get("access","ssh_port") or "22"
66        self.sshd = config.get("access","sshd")
67        self.sshd_config = config.get("access", "sshd_config")
68        self.access_type = config.get("access", "type")
69        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
70        self.staging_host = config.get("access", "staging_host") \
71                or "ops.emulab.net"
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75   
76        self.dragon_endpoint = config.get("access", "dragon")
77        self.dragon_vlans = config.get("access", "dragon_vlans")
78        self.deter_internal = config.get("access", "deter_internal")
79
80        self.tunnel_config = config.getboolean("access", "tunnel_config")
81        self.portal_command = config.get("access", "portal_command")
82        self.portal_image = config.get("access", "portal_image")
83        self.portal_type = config.get("access", "portal_type") or "pc"
84        self.portal_startcommand = config.get("access", "portal_startcommand")
85        self.node_startcommand = config.get("access", "node_startcommand")
86
87        self.federation_software = self.software_list(self.federation_software)
88        self.portal_software = self.software_list(self.portal_software)
89        self.local_seer_software = self.software_list(self.local_seer_software)
90
91        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
92        self.renewal_interval = int(self.renewal_interval) * 60
93
94        self.ch_url = config.get("access", "ch_url")
95        self.sa_url = config.get("access", "sa_url")
96        self.cm_url = config.get("access", "cm_url")
97
98        self.restricted = [ ]
99
100        # read_state in the base_class
101        self.state_lock.acquire()
102        for a  in ('allocation', 'projects', 'keys', 'types'):
103            if a not in self.state:
104                self.state[a] = { }
105        self.allocation = self.state['allocation']
106        self.projects = self.state['projects']
107        self.keys = self.state['keys']
108        self.types = self.state['types']
109        # Add the ownership attributes to the authorizer.  Note that the
110        # indices of the allocation dict are strings, but the attributes are
111        # fedids, so there is a conversion.
112        for k in self.state.get('allocation', {}).keys():
113            for o in self.state['allocation'][k].get('owners', []):
114                self.auth.set_attribute(o, fedid(hexstr=k))
115            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
116
117        self.state_lock.release()
118
119
120        self.log = logging.getLogger("fedd.access")
121        set_log_level(config, "access", self.log)
122
123        self.access = { }
124        if config.has_option("access", "accessdb"):
125            self.read_access(config.get("access", "accessdb"), 
126                    access_obj=self.make_access_info)
127
128        self.lookup_access = self.lookup_access_base
129
130        api = config.get("access", "api") or "protogeni"
131        if api == "protogeni":
132            self.api_proxy = protogeni_proxy
133        elif api == "geniapi":
134            self.api_proxy = geniapi_proxy
135        else:
136            self.log.debug("Unknown interface, using protogeni")
137            self.api_proxy = protogeni_proxy
138
139        self.call_SetValue = service_caller('SetValue')
140        self.call_GetValue = service_caller('GetValue')
141        self.exports = {
142                'local_seer_control': self.export_local_seer,
143                'seer_master': self.export_seer_master,
144                'hide_hosts': self.export_hide_hosts,
145                }
146
147        if not self.local_seer_image or not self.local_seer_software or \
148                not self.local_seer_start:
149            if 'local_seer_control' in self.exports:
150                del self.exports['local_seer_control']
151
152        if not self.local_seer_image or not self.local_seer_software or \
153                not self.seer_master_start:
154            if 'seer_master' in self.exports:
155                del self.exports['seer_master']
156
157        self.RenewSlices()
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment", 
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176    @staticmethod
177    def make_access_info(s):
178        """
179        Split a string of the form (id, id, id, id) ito its constituent tuples
180        and return them as a tuple.  Use to import access info from the
181        access_db.
182        """
183
184        ss = s.strip()
185        if ss.startswith('(') and ss.endswith(')'):
186            l = [ s.strip() for s  in ss[1:-1].split(",")]
187            if len(l) == 4:
188                return tuple(l)
189            else:
190                raise self.parse_error(
191                        "Exactly 4 elements in access info required")
192        else:
193            raise self.parse_error("Expecting parenthezied values")
194
195
196    def get_handler(self, path, fid):
197        self.log.info("Get handler %s %s" % (path, fid))
198        if self.auth.check_attribute(fid, path) and self.userconfdir:
199            return ("%s/%s" % (self.userconfdir, path), "application/binary")
200        else:
201            return (None, None)
202
203    def build_access_response(self, alloc_id, services):
204        """
205        Create the SOAP response.
206
207        Build the dictionary description of the response and use
208        fedd_utils.pack_soap to create the soap message.  ap is the allocate
209        project message returned from a remote project allocation (even if that
210        allocation was done locally).
211        """
212        # Because alloc_id is already a fedd_services_types.IDType_Holder,
213        # there's no need to repack it
214        msg = { 
215                'allocID': alloc_id,
216                'fedAttr': [
217                    { 'attribute': 'domain', 'value': self.domain } , 
218                ]
219            }
220        if self.dragon_endpoint:
221            msg['fedAttr'].append({'attribute': 'dragon',
222                'value': self.dragon_endpoint})
223        if self.deter_internal:
224            msg['fedAttr'].append({'attribute': 'deter_internal',
225                'value': self.deter_internal})
226        #XXX: ??
227        if self.dragon_vlans:
228            msg['fedAttr'].append({'attribute': 'vlans',
229                'value': self.dragon_vlans})
230
231        if services:
232            msg['service'] = services
233        return msg
234
235    def RequestAccess(self, req, fid):
236        """
237        Handle the access request.
238        """
239
240        # The dance to get into the request body
241        if req.has_key('RequestAccessRequestBody'):
242            req = req['RequestAccessRequestBody']
243        else:
244            raise service_error(service_error.req, "No request!?")
245
246        if req.has_key('destinationTestbed'):
247            dt = unpack_id(req['destinationTestbed'])
248
249        # Request for this fedd
250        found, match = self.lookup_access(req, fid)
251        services, svc_state = self.export_services(req.get('service',[]),
252                None, None)
253        # keep track of what's been added
254        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
255        aid = unicode(allocID)
256
257        self.state_lock.acquire()
258        self.allocation[aid] = { }
259        # The protoGENI certificate
260        self.allocation[aid]['credentials'] = found
261        # The list of owner FIDs
262        self.allocation[aid]['owners'] = [ fid ]
263        self.write_state()
264        self.state_lock.release()
265        self.auth.set_attribute(fid, allocID)
266        self.auth.set_attribute(allocID, allocID)
267
268        try:
269            f = open("%s/%s.pem" % (self.certdir, aid), "w")
270            print >>f, alloc_cert
271            f.close()
272        except EnvironmentError, e:
273            raise service_error(service_error.internal, 
274                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
275        return self.build_access_response({ 'fedid': allocID }, None)
276
277
278    def ReleaseAccess(self, req, fid):
279        # The dance to get into the request body
280        if req.has_key('ReleaseAccessRequestBody'):
281            req = req['ReleaseAccessRequestBody']
282        else:
283            raise service_error(service_error.req, "No request!?")
284
285        # Local request
286        try:
287            if req['allocID'].has_key('localname'):
288                auth_attr = aid = req['allocID']['localname']
289            elif req['allocID'].has_key('fedid'):
290                aid = unicode(req['allocID']['fedid'])
291                auth_attr = req['allocID']['fedid']
292            else:
293                raise service_error(service_error.req,
294                        "Only localnames and fedids are understood")
295        except KeyError:
296            raise service_error(service_error.req, "Badly formed request")
297
298        self.log.debug("[access] deallocation requested for %s", aid)
299        if not self.auth.check_attribute(fid, auth_attr):
300            self.log.debug("[access] deallocation denied for %s", aid)
301            raise service_error(service_error.access, "Access Denied")
302
303        self.state_lock.acquire()
304        if self.allocation.has_key(aid):
305            self.log.debug("Found allocation for %s" %aid)
306            del self.allocation[aid]
307            self.write_state()
308            self.state_lock.release()
309            # And remove the access cert
310            cf = "%s/%s.pem" % (self.certdir, aid)
311            self.log.debug("Removing %s" % cf)
312            os.remove(cf)
313            return { 'allocID': req['allocID'] } 
314        else:
315            self.state_lock.release()
316            raise service_error(service_error.req, "No such allocation")
317
318    def manifest_to_dict(self, manifest, ignore_debug=False):
319        """
320        Turn the manifest into a dict were each virtual nodename (i.e. the
321        topdl name) has an entry with the allocated machine in hostname and the
322        interfaces in 'interfaces'.  I love having XML parser code lying
323        around.
324        """
325        if self.create_debug and not ignore_debug: 
326            self.log.debug("Returning null manifest dict")
327            return { }
328
329        # The class allows us to keep a little state - the dict under
330        # consteruction and the current entry in that dict for the interface
331        # element code.
332        class manifest_parser:
333            def __init__(self):
334                self.d = { }
335                self.current_key=None
336
337            # If the element is a node, create a dict entry for it.  If it's an
338            # interface inside a node, add an entry in the interfaces list with
339            # the virtual name and component id.
340            def start_element(self, name, attrs):
341                if name == 'node':
342                    self.current_key = attrs.get('virtual_id',"")
343                    if self.current_key:
344                        self.d[self.current_key] = {
345                                'hostname': attrs.get('hostname', None),
346                                'interfaces': { }
347                                }
348                elif name == 'interface' and self.current_key:
349                    self.d[self.current_key]['interfaces']\
350                            [attrs.get('virtual_id','')] = \
351                            attrs.get('component_id', None)
352            #  When a node is finished, clear current_key
353            def end_element(self, name):
354                if name == 'node': self.current_key = None
355
356        node = { }
357
358        mp = manifest_parser()
359        p = xml.parsers.expat.ParserCreate()
360        # These are bound to the class we just created
361        p.StartElementHandler = mp.start_element
362        p.EndElementHandler = mp.end_element
363
364        p.Parse(manifest)
365        # Make the node dict that the callers expect
366        for k in mp.d:
367            node[k] = mp.d.get('hostname', '')
368        return mp.d
369
370    def fake_manifest(self, topo):
371        """
372        Fake the output of manifest_to_dict with a bunch of generic node an
373        interface names, for debugging.
374        """
375        node = { }
376        for i, e in enumerate([ e for e in topo.elements \
377                if isinstance(e, topdl.Computer)]):
378            node[e.name] = { 
379                    'hostname': "node%03d" % i,
380                    'interfaces': { }
381                    }
382            for j, inf in enumerate(e.interface):
383                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
384
385        return node
386
387
388    def generate_portal_configs(self, topo, pubkey_base, 
389            secretkey_base, tmpdir, leid, connInfo, services, nodes):
390
391        def conninfo_to_dict(key, info):
392            """
393            Make a cpoy of the connection information about key, and flatten it
394            into a single dict by parsing out any feddAttrs.
395            """
396
397            rv = None
398            for i in info:
399                if key == i.get('portal', "") or \
400                        key in [e.get('element', "") \
401                        for e in i.get('member', [])]:
402                    rv = i.copy()
403                    break
404
405            else:
406                return rv
407
408            if 'fedAttr' in rv:
409                for a in rv['fedAttr']:
410                    attr = a.get('attribute', "")
411                    val = a.get('value', "")
412                    if attr and attr not in rv:
413                        rv[attr] = val
414                del rv['fedAttr']
415            return rv
416
417        # XXX: un hardcode this
418        def client_null(f, s):
419            print >>f, "Service: %s" % s['name']
420       
421        def client_seer_master(f, s):
422            print >>f, 'PortalAlias: seer-master'
423
424        def client_smb(f, s):
425            print >>f, "Service: %s" % s['name']
426            smbshare = None
427            smbuser = None
428            smbproj = None
429            for a in s.get('fedAttr', []):
430                if a.get('attribute', '') == 'SMBSHARE':
431                    smbshare = a.get('value', None)
432                elif a.get('attribute', '') == 'SMBUSER':
433                    smbuser = a.get('value', None)
434                elif a.get('attribute', '') == 'SMBPROJ':
435                    smbproj = a.get('value', None)
436
437            if all((smbshare, smbuser, smbproj)):
438                print >>f, "SMBshare: %s" % smbshare
439                print >>f, "ProjectUser: %s" % smbuser
440                print >>f, "ProjectName: %s" % smbproj
441
442        def client_hide_hosts(f, s):
443            for a in s.get('fedAttr', [ ]):
444                if a.get('attribute', "") == 'hosts':
445                    print >>f, 'Hide: %s' % a.get('value', "")
446
447        client_service_out = {
448                'SMB': client_smb,
449                'tmcd': client_null,
450                'seer': client_null,
451                'userconfig': client_null,
452                'project_export': client_null,
453                'seer_master': client_seer_master,
454                'hide_hosts': client_hide_hosts,
455            }
456
457        def client_seer_master_export(f, s):
458            print >>f, "AddedNode: seer-master"
459
460        def client_seer_local_export(f, s):
461            print >>f, "AddedNode: control"
462
463        client_export_service_out = {
464                'seer_master': client_seer_master_export,
465                'local_seer_control': client_seer_local_export,
466            }
467
468        def server_port(f, s):
469            p = urlparse(s.get('server', 'http://localhost'))
470            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
471
472        def server_null(f,s): pass
473
474        def server_seer(f, s):
475            print >>f, 'seer: true'
476
477        server_service_out = {
478                'SMB': server_port,
479                'tmcd': server_port,
480                'userconfig': server_null,
481                'project_export': server_null,
482                'seer': server_seer,
483                'seer_master': server_port,
484                'hide_hosts': server_null,
485            }
486        # XXX: end un hardcode this
487
488
489        seer_out = False
490        client_out = False
491        for e in [ e for e in topo.elements \
492                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
493            myname = e.name
494            type = e.get_attribute('portal_type')
495
496            info = conninfo_to_dict(myname, connInfo)
497
498            if not info:
499                raise service_error(service_error.req,
500                        "No connectivity info for %s" % myname)
501
502            # Translate to physical name (ProtoGENI doesn't have DNS)
503            physname = nodes.get(myname, { }).get('hostname', None)
504            peer = info.get('peer', "")
505            ldomain = self.domain
506            ssh_port = info.get('ssh_port', 22)
507
508            # Collect this for the client.conf file
509            if 'masterexperiment' in info:
510                mproj, meid = info['masterexperiment'].split("/", 1)
511
512            active = info.get('active', 'False')
513
514            if type in ('control', 'both'):
515                testbed = e.get_attribute('testbed')
516                control_gw = myname
517
518            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
519            tunnelconfig = self.tunnel_config
520            try:
521                f = open(cfn, "w")
522                if active == 'True':
523                    print >>f, "active: True"
524                    print >>f, "ssh_port: %s" % ssh_port
525                    if type in ('control', 'both'):
526                        for s in [s for s in services \
527                                if s.get('name', "") in self.imports]:
528                            server_service_out[s['name']](f, s)
529
530                if tunnelconfig:
531                    print >>f, "tunnelip: %s" % tunnelconfig
532                print >>f, "peer: %s" % peer.lower()
533                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
534                        pubkey_base
535                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
536                        secretkey_base
537                f.close()
538            except EnvironmentError, e:
539                raise service_error(service_error.internal,
540                        "Can't write protal config %s: %s" % (cfn, e))
541
542        # Done with portals, write the client config file.
543        try:
544            f = open("%s/client.conf" % tmpdir, "w")
545            if control_gw:
546                print >>f, "ControlGateway: %s" % physname.lower()
547            for s in services:
548                if s.get('name',"") in self.imports and \
549                        s.get('visibility','') == 'import':
550                    client_service_out[s['name']](f, s)
551                if s.get('name', '') in self.exports and \
552                        s.get('visibility', '') == 'export' and \
553                        s['name'] in client_export_service_out:
554                    client_export_service_out[s['name']](f, s)
555            # Seer uses this.
556            if mproj and meid:
557                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
558            f.close()
559        except EnvironmentError, e:
560            raise service_error(service_error.internal,
561                    "Cannot write client.conf: %s" %s)
562
563
564
565    def export_store_info(self, cf, nodes, ssh_port, connInfo):
566        """
567        For the export requests in the connection info, install the peer names
568        at the experiment controller via SetValue calls.
569        """
570
571        for c in connInfo:
572            for p in [ p for p in c.get('parameter', []) \
573                    if p.get('type', '') == 'output']:
574
575                if p.get('name', '') == 'peer':
576                    k = p.get('key', None)
577                    surl = p.get('store', None)
578                    if surl and k and k.index('/') != -1:
579                        if self.create_debug:
580                            req = { 'name': k, 'value': 'debug' }
581                            self.call_SetValue(surl, req, cf)
582                        else:
583                            n = nodes.get(k[k.index('/')+1:], { })
584                            value = n.get('hostname', None)
585                            if value:
586                                req = { 'name': k, 'value': value }
587                                self.call_SetValue(surl, req, cf)
588                            else:
589                                self.log.error("No hostname for %s" % \
590                                        k[k.index('/'):])
591                    else:
592                        self.log.error("Bad export request: %s" % p)
593                elif p.get('name', '') == 'ssh_port':
594                    k = p.get('key', None)
595                    surl = p.get('store', None)
596                    if surl and k:
597                        req = { 'name': k, 'value': ssh_port }
598                        self.call_SetValue(surl, req, cf)
599                    else:
600                        self.log.error("Bad export request: %s" % p)
601                else:
602
603                    self.log.error("Unknown export parameter: %s" % \
604                            p.get('name'))
605                    continue
606
607    def write_node_config_script(self, elem, node, user, pubkey,
608            secretkey, stagingdir, tmpdir):
609        """
610        Write out the configuration script that is to run on the node
611        represented by elem in the topology.  This is called
612        once per node to configure.
613        """
614        # These little functions/functors just make things more readable.  Each
615        # one encapsulates a small task of copying software files or installing
616        # them.
617        class stage_file_type:
618            """
619            Write code copying file sfrom the staging host to the host on which
620            this will run.
621            """
622            def __init__(self, user, host, stagingdir):
623                self.user = user
624                self.host = host
625                self.stagingdir = stagingdir
626                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
627                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
628
629            def __call__(self, script, file, dest="."):
630                # If the file is a full pathname, do not use stagingdir
631                if file.find('/') == -1:
632                    file = "%s/%s" % (self.stagingdir, file)
633                print >>script, "%s %s@%s:%s %s" % \
634                        (self.scp, self.user, self.host, file, dest)
635
636        def install_tar(script, loc, base):
637            """
638            Print code to script to install a tarfile in loc.
639            """
640            tar = "/bin/tar"
641            mkdir="/bin/mkdir"
642
643            print >>script, "%s -p %s" % (mkdir, loc)
644            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
645
646        def install_rpm(script, base):
647            """
648            Print code to script to install an rpm
649            """
650            rpm = "/bin/rpm"
651            print >>script, "%s --install %s" % (rpm, base)
652
653        ifconfig = "/sbin/ifconfig"
654        stage_file = stage_file_type(user, self.staging_host, stagingdir)
655        pname = node.get('hostname', None)
656        fed_dir = "/usr/local/federation"
657        fed_etc_dir = "%s/etc" % fed_dir
658        fed_bin_dir = "%s/bin" % fed_dir
659        fed_lib_dir = "%s/lib" % fed_dir
660
661        if pname:
662            sfile = "%s/%s.startup" % (tmpdir, pname)
663            script = open(sfile, "w")
664            # Reset the interfaces to the ones in the topo file
665            for i in [ i for i in elem.interface \
666                    if not i.get_attribute('portal')]:
667                pinf = node['interfaces'].get(i.name, None)
668                addr = i.get_attribute('ip4_address') 
669                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
670                if pinf and addr:
671                    print >>script, \
672                            "%s %s %s netmask %s"  % \
673                            (ifconfig, pinf, addr, netmask)
674                else:
675                    self.log.error("Missing interface or address for %s" \
676                            % i.name)
677               
678            for l, f in self.federation_software:
679                base = os.path.basename(f)
680                stage_file(script, base)
681                if l: install_tar(script, l, base)
682                else: install_rpm(script, base)
683
684            for s in elem.software:
685                s_base = s.location.rpartition('/')[2]
686                stage_file(script, s_base)
687                if s.install: install_tar(script, s.install, s_base)
688                else: install_rpm(script, s_base)
689
690            for f in ('hosts', pubkey, secretkey, 'client.conf', 
691                    'userconf'):
692                stage_file(script, f, fed_etc_dir)
693            if self.sshd:
694                stage_file(script, self.sshd, fed_bin_dir)
695            if self.sshd_config:
696                stage_file(script, self.sshd_config, fed_etc_dir)
697
698            # Look in tmpdir to get the names.  They've all been copied
699            # into the (remote) staging dir
700            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
701                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
702
703            # Done with staging, remove the identity used to stage
704            print >>script, "#/bin/rm .ssh/id_rsa"
705
706            # Start commands
707            if elem.get_attribute('portal') and self.portal_startcommand:
708                # Install portal software
709                for l, f in self.portal_software:
710                    base = os.path.basename(f)
711                    stage_file(script, base)
712                    if l: install_tar(script, l, base)
713                    else: install_rpm(script, base)
714
715                # Portals never have a user-specified start command
716                print >>script, self.portal_startcommand
717            elif self.node_startcommand:
718                # XXX: debug
719                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
720                # XXX: debug
721                if elem.get_attribute('startup'):
722                    print >>script, "%s \\$USER '%s'" % \
723                            (self.node_startcommand,
724                                    elem.get_attribute('startup'))
725                else:
726                    print >>script, self.node_startcommand
727            script.close()
728            return sfile, pname
729        else:
730            return None, None
731
732
733    def configure_nodes(self, segment_commands, topo, nodes, user, 
734            pubkey, secretkey, stagingdir, tmpdir):
735        """
736        For each node in the topology, generate a script file that copies
737        software onto it and installs it in the proper places and then runs the
738        startup command (including the federation commands.
739        """
740
741
742
743        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
744            vname = e.name
745            sfile, pname = self.write_node_config_script(e,
746                    nodes.get(vname, { }),
747                    user, pubkey, secretkey, stagingdir, tmpdir)
748            if sfile:
749                if not segment_commands.scp_file(sfile, user, pname):
750                    self.log.error("Could not copy script to %s" % pname)
751            else:
752                self.log.error("Unmapped node: %s" % vname)
753
754    def start_node(self, user, host, node, segment_commands):
755        """
756        Copy an identity to a node for the configuration script to be able to
757        import data and then run the startup script remotely.
758        """
759        # Place an identity on the node so that the copying can succeed
760        segment_commands.scp_file( segment_commands.ssh_privkey_file,
761                user, node, ".ssh/id_rsa")
762        segment_commands.ssh_cmd(user, node, 
763                "sudo /bin/sh ./%s.startup &" % node)
764
765    def start_nodes(self, user, host, nodes, segment_commands):
766        """
767        Start a thread to initialize each node and wait for them to complete.
768        Each thread runs start_node.
769        """
770        threads = [ ]
771        for n in nodes:
772            t = Thread(target=self.start_node, args=(user, host, n, 
773                segment_commands))
774            t.start()
775            threads.append(t)
776
777        done = [not t.isAlive() for t in threads]
778        while not all(done):
779            self.log.info("Waiting for threads %s" % done)
780            time.sleep(10)
781            done = [not t.isAlive() for t in threads]
782
783    def set_up_staging_filespace(self, segment_commands, user, host,
784            stagingdir):
785        """
786        Set up teh staging area on the staging machine.  To reduce the number
787        of ssh commands, we compose a script and execute it remotely.
788        """
789
790        self.log.info("[start_segment]: creating script file")
791        try:
792            sf, scriptname = tempfile.mkstemp()
793            scriptfile = os.fdopen(sf, 'w')
794        except EnvironmentError:
795            return False
796
797        scriptbase = os.path.basename(scriptname)
798
799        # Script the filesystem changes
800        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
801        print >>scriptfile, 'mkdir -p %s' % stagingdir
802        print >>scriptfile, "rm -f %s" % scriptbase
803        scriptfile.close()
804
805        # Move the script to the remote machine
806        # XXX: could collide tempfile names on the remote host
807        if segment_commands.scp_file(scriptname, user, host, scriptbase):
808            os.remove(scriptname)
809        else:
810            return False
811
812        # Execute the script (and the script's last line deletes it)
813        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
814            return False
815
816    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
817        """
818        Protogeni interactions take a context and a protogeni certificate.
819        This establishes both for later calls and returns them.
820        """
821        if os.access(certfile, os.R_OK):
822            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
823        else:
824            self.log.error("[start_segment]: Cannot read certfile: %s" % \
825                    certfile)
826            return None, None
827
828        try:
829            gcred = segment_commands.slice_authority_call('GetCredential', 
830                    {}, ctxt)
831        except segment_commands.ProtoGENIError, e:
832            raise service_error(service_error.federant,
833                    "ProtoGENI: %s" % e)
834
835        return ctxt, gcred
836
837    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
838        """
839        Find a usable slice name by trying random ones until there's no
840        collision.
841        """
842
843        def random_slicename(user):
844            """
845            Return a random slicename by appending 5 letters to the username.
846            """
847            slicename = user
848            for i in range(0,5):
849                slicename += random.choice(string.ascii_letters)
850            return slicename
851
852        while True:
853            slicename = random_slicename(user)
854            try:
855                param = {
856                        'credential': gcred, 
857                        'hrn': slicename,
858                        'type': 'Slice'
859                        }
860                segment_commands.slice_authority_call('Resolve', param, ctxt)
861            except segment_commands.ProtoGENIError, e:
862                print e
863                break
864
865        return slicename
866
867    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
868        """
869        Create the slice and allocate resources.  If any of this stuff fails,
870        the allocations will time out on PG in short order, so we just raise
871        the service_error.  Return the slice and sliver credentials as well as
872        the manifest.
873        """
874        try:
875            param = {
876                    'credential': gcred, 
877                    'hrn': slicename,
878                    'type': 'Slice'
879                    }
880            slice_cred = segment_commands.slice_authority_call('Register',
881                    param, ctxt)
882            # Resolve the slice to get the URN that PG has assigned it.
883            param = {
884                    'credential': gcred,
885                    'type': 'Slice',
886                    'hrn': slicename
887                    }
888            data = segment_commands.slice_authority_call('Resolve', param,
889                    ctxt)
890            if 'urn' in data:
891                slice_urn = data['urn']
892            else:
893                raise service_error(service_error.federant, 
894                        "No URN returned for slice %s" % hrn)
895
896            if 'creator_urn' in data:
897                creator_urn = data['creator_urn']
898            else:
899                raise service_error(service_error.federant, 
900                        "No creator URN returned for slice %s" % hrn)
901            # Populate the ssh keys (let PG format them)
902            param = {
903                    'credential': gcred, 
904                    }
905            keys =  segment_commands.slice_authority_call('GetKeys', param, 
906                    ctxt)
907            # Create a Sliver
908            param = {
909                    'credentials': [ slice_cred ],
910                    'rspec': rspec, 
911                    'users': [ {
912                        'urn': creator_urn,
913                        'keys': keys, 
914                        },
915                    ],
916                    'slice_urn': slice_urn,
917                    }
918            rv = segment_commands.component_manager_call(
919                    'CreateSliver', param, ctxt)
920
921            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
922            # API hands back a sliver credential.  This just finds the format
923            # of choice.
924            if isinstance(rv, tuple):
925                manifest = rv[1]
926            else:
927                manifest = rv
928        except segment_commands.ProtoGENIError, e:
929            raise service_error(service_error.federant,
930                    "ProtoGENI: %s %s" % (e.code, e))
931
932        return (slice_urn, slice_cred, manifest, rspec)
933
934    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
935            timeout=None):
936        """
937        Wait for the given slice to finish its startup.  Return the final
938        status.
939        """
940        completed_states = ('failed', 'ready')
941        status = 'changing'
942        if timeout is not None:
943            end = time.time() + timeout
944        try:
945            while status not in completed_states:
946                param = { 
947                        'credentials': [ slice_cred ],
948                        'slice_urn': slice_urn,
949                        }
950                r = segment_commands.component_manager_call(
951                        'SliverStatus', param, ctxt)
952                # GENIAPI uses geni_status as the key, so check for both
953                status = r.get('status', r.get('geni_status','changing'))
954                if status not in completed_states:
955                    if timeout is not None and time.time() > end:
956                        return 'timeout'
957                    time.sleep(30)
958        except segment_commands.ProtoGENIError, e:
959            raise service_error(service_error.federant,
960                    "ProtoGENI: %s %s" % (e.code, e))
961
962        return status
963
964    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
965        """
966        Delete the slice resources.  An error from the service is ignores,
967        because the soft state will go away anyway.
968        """
969        try:
970            param = { 
971                    'credentials': [ slice_cred, ],
972                    'slice_urn': slice_urn,
973                    }
974            segment_commands.component_manager_call('DeleteSlice',
975                    param, ctxt)
976        except segment_commands.ProtoGENIError, e:
977            self.log.warn("ProtoGENI: %s" % e)
978
979
980
981    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
982            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
983            export_certfile, topo, connInfo, services, timeout=0):
984        """
985        Start a sub-experiment on a federant.
986
987        Get the current state, modify or create as appropriate, ship data
988        and configs and start the experiment.  There are small ordering
989        differences based on the initial state of the sub-experiment.
990        """
991
992        # Local software dir
993        lsoftdir = "%s/software" % tmpdir
994        host = self.staging_host
995
996        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
997                certfile, certpw)
998
999        if not ctxt: return False
1000
1001        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1002        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1003        self.log.info("Creating %s" % slicename)
1004        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1005                segment_commands, slicename, rspec, gcred, ctxt)
1006
1007        # With manifest in hand, we can export the portal node names.
1008        if self.create_debug: nodes = self.fake_manifest(topo)
1009        else: nodes = self.manifest_to_dict(manifest)
1010
1011        self.export_store_info(export_certfile, nodes, self.ssh_port,
1012                connInfo)
1013        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1014                ename, connInfo, services, nodes)
1015
1016        # Copy software to the staging machine (done after generation to copy
1017        # those, too)
1018        for d in (tmpdir, lsoftdir):
1019            if os.path.isdir(d):
1020                for f in os.listdir(d):
1021                    if not os.path.isdir("%s/%s" % (d, f)):
1022                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1023                                user, host, "%s/%s" % (stagingdir, f)):
1024                            self.log.error("Scp failed")
1025                            return False
1026
1027
1028        # Now we wait for the nodes to start on PG
1029        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1030                ctxt, timeout=300)
1031        if status == 'failed':
1032            self.log.error('Sliver failed to start on ProtoGENI')
1033            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1034            return False
1035        elif status == 'timeout':
1036            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1037            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1038            return False
1039        else:
1040            # All good: save ProtoGENI info in shared state
1041            self.state_lock.acquire()
1042            self.allocation[aid]['slice_urn'] = slice_urn
1043            self.allocation[aid]['slice_name'] = slicename
1044            self.allocation[aid]['slice_credential'] = slice_cred
1045            self.allocation[aid]['manifest'] = manifest
1046            self.allocation[aid]['rspec'] = rspec
1047            self.allocation[aid]['certfile'] = certfile
1048            self.allocation[aid]['certpw'] = certpw
1049            self.write_state()
1050            self.state_lock.release()
1051
1052        # Now we have configuration to do for ProtoGENI
1053        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1054                secretkey, stagingdir, tmpdir)
1055
1056        self.start_nodes(user, self.staging_host, 
1057                [ n.get('hostname', None) for n in nodes.values()],
1058                segment_commands)
1059
1060        # Everything has gone OK.
1061        return True, dict([(k, n.get('hostname', None)) \
1062                for k, n in nodes.items()])
1063
1064    def generate_rspec(self, topo, softdir, connInfo):
1065
1066        # Force a useful image.  Without picking this the nodes can get
1067        # different images and there is great pain.
1068        def image_filter(e):
1069            if isinstance(e, topdl.Computer):
1070                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1071                        'image+emulab-ops//FEDORA10-STD" />'
1072            else:
1073                return ""
1074        # Main line of generate
1075        t = topo.clone()
1076
1077        starts = { }
1078        # Weed out the things we aren't going to instantiate: Segments, portal
1079        # substrates, and portal interfaces.  (The copy in the for loop allows
1080        # us to delete from e.elements in side the for loop).  While we're
1081        # touching all the elements, we also adjust paths from the original
1082        # testbed to local testbed paths and put the federation commands and
1083        # startcommands into a dict so we can start them manually later.
1084        # ProtoGENI requires setup before the federation commands run, so we
1085        # run them by hand after we've seeded configurations.
1086        for e in [e for e in t.elements]:
1087            if isinstance(e, topdl.Segment):
1088                t.elements.remove(e)
1089            # Fix software paths
1090            for s in getattr(e, 'software', []):
1091                s.location = re.sub("^.*/", softdir, s.location)
1092            if isinstance(e, topdl.Computer):
1093                if e.get_attribute('portal') and self.portal_startcommand:
1094                    # Portals never have a user-specified start command
1095                    starts[e.name] = self.portal_startcommand
1096                elif self.node_startcommand:
1097                    if e.get_attribute('startup'):
1098                        starts[e.name] = "%s \\$USER '%s'" % \
1099                                (self.node_startcommand, 
1100                                        e.get_attribute('startup'))
1101                        e.remove_attribute('startup')
1102                    else:
1103                        starts[e.name] = self.node_startcommand
1104
1105                # Remove portal interfaces
1106                e.interface = [i for i in e.interface \
1107                        if not i.get_attribute('portal')]
1108
1109        t.substrates = [ s.clone() for s in t.substrates ]
1110        t.incorporate_elements()
1111
1112        # Customize the rspec output to use the image we like
1113        filters = [ image_filter ]
1114
1115        # Convert to rspec and return it
1116        exp_rspec = topdl.topology_to_rspec(t, filters)
1117
1118        return exp_rspec
1119
1120    def retrieve_software(self, topo, certfile, softdir):
1121        """
1122        Collect the software that nodes in the topology need loaded and stage
1123        it locally.  This implies retrieving it from the experiment_controller
1124        and placing it into softdir.  Certfile is used to prove that this node
1125        has access to that data (it's the allocation/segment fedid).  Finally
1126        local portal and federation software is also copied to the same staging
1127        directory for simplicity - all software needed for experiment creation
1128        is in softdir.
1129        """
1130        sw = set()
1131        for e in topo.elements:
1132            for s in getattr(e, 'software', []):
1133                sw.add(s.location)
1134        os.mkdir(softdir)
1135        for s in sw:
1136            self.log.debug("Retrieving %s" % s)
1137            try:
1138                get_url(s, certfile, softdir)
1139            except:
1140                t, v, st = sys.exc_info()
1141                raise service_error(service_error.internal,
1142                        "Error retrieving %s: %s" % (s, v))
1143
1144        # Copy local portal node software to the tempdir
1145        for s in (self.portal_software, self.federation_software):
1146            for l, f in s:
1147                base = os.path.basename(f)
1148                copy_file(f, "%s/%s" % (softdir, base))
1149
1150
1151    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1152        """
1153        Gather common configuration files, retrieve or create an experiment
1154        name and project name, and return the ssh_key filenames.  Create an
1155        allocation log bound to the state log variable as well.
1156        """
1157        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1158        ename = None
1159        pubkey_base = None
1160        secretkey_base = None
1161        alloc_log = None
1162
1163        for a in attrs:
1164            if a['attribute'] in configs:
1165                try:
1166                    self.log.debug("Retrieving %s" % a['value'])
1167                    get_url(a['value'], certfile, tmpdir)
1168                except:
1169                    t, v, st = sys.exc_info()
1170                    raise service_error(service_error.internal,
1171                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1172            if a['attribute'] == 'ssh_pubkey':
1173                pubkey_base = a['value'].rpartition('/')[2]
1174            if a['attribute'] == 'ssh_secretkey':
1175                secretkey_base = a['value'].rpartition('/')[2]
1176            if a['attribute'] == 'experiment_name':
1177                ename = a['value']
1178
1179        if not ename:
1180            ename = ""
1181            for i in range(0,5):
1182                ename += random.choice(string.ascii_letters)
1183            self.log.warn("No experiment name: picked one randomly: %s" \
1184                    % ename)
1185
1186        self.state_lock.acquire()
1187        if self.allocation.has_key(aid):
1188            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1189            self.allocation[aid]['experiment'] = ename
1190            self.allocation[aid]['log'] = [ ]
1191            # Create a logger that logs to the experiment's state object as
1192            # well as to the main log file.
1193            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1194            h = logging.StreamHandler(
1195                    list_log.list_log(self.allocation[aid]['log']))
1196            # XXX: there should be a global one of these rather than
1197            # repeating the code.
1198            h.setFormatter(logging.Formatter(
1199                "%(asctime)s %(name)s %(message)s",
1200                        '%d %b %y %H:%M:%S'))
1201            alloc_log.addHandler(h)
1202            self.write_state()
1203        else:
1204            self.log.error("No allocation for %s!?" % aid)
1205        self.state_lock.release()
1206
1207        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1208                cpw, alloc_log)
1209
1210    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1211        # Copy the assigned names into the return topology
1212        rvtopo = topo.clone()
1213        embedding = [ ]
1214        for k, n in nodes.items():
1215            embedding.append({ 
1216                'toponame': k,
1217                'physname': [n ],
1218                })
1219        # Grab the log (this is some anal locking, but better safe than
1220        # sorry)
1221        self.state_lock.acquire()
1222        logv = "".join(self.allocation[aid]['log'])
1223        # It's possible that the StartSegment call gets retried (!).
1224        # if the 'started' key is in the allocation, we'll return it rather
1225        # than redo the setup.
1226        self.allocation[aid]['started'] = { 
1227                'allocID': alloc_id,
1228                'allocationLog': logv,
1229                'segmentdescription': { 
1230                    'topdldescription': rvtopo.to_dict() },
1231                'embedding': embedding,
1232                }
1233        retval = copy.deepcopy(self.allocation[aid]['started'])
1234        self.write_state()
1235        self.state_lock.release()
1236
1237        return retval
1238
1239    def StartSegment(self, req, fid):
1240        err = None  # Any service_error generated after tmpdir is created
1241        rv = None   # Return value from segment creation
1242
1243        try:
1244            req = req['StartSegmentRequestBody']
1245            topref = req['segmentdescription']['topdldescription']
1246        except KeyError:
1247            raise service_error(service_error.req, "Badly formed request")
1248
1249        connInfo = req.get('connection', [])
1250        services = req.get('service', [])
1251        auth_attr = req['allocID']['fedid']
1252        aid = "%s" % auth_attr
1253        attrs = req.get('fedAttr', [])
1254        if not self.auth.check_attribute(fid, auth_attr):
1255            raise service_error(service_error.access, "Access denied")
1256        else:
1257            # See if this is a replay of an earlier succeeded StartSegment -
1258            # sometimes SSL kills 'em.  If so, replay the response rather than
1259            # redoing the allocation.
1260            self.state_lock.acquire()
1261            retval = self.allocation[aid].get('started', None)
1262            self.state_lock.release()
1263            if retval:
1264                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1265                        "replaying response")
1266                return retval
1267
1268        if topref:
1269            topo = topdl.Topology(**topref)
1270        else:
1271            raise service_error(service_error.req, 
1272                    "Request missing segmentdescription'")
1273
1274        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1275        try:
1276            tmpdir = tempfile.mkdtemp(prefix="access-")
1277            softdir = "%s/software" % tmpdir
1278        except EnvironmentError:
1279            raise service_error(service_error.internal, "Cannot create tmp dir")
1280
1281        # Try block alllows us to clean up temporary files.
1282        try:
1283            self.retrieve_software(topo, certfile, softdir)
1284            self.configure_userconf(services, tmpdir)
1285            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1286                cpw, alloc_log = self.initialize_experiment_info(attrs,
1287                        aid, certfile, tmpdir)
1288            self.import_store_info(certfile, connInfo)
1289            rspec = self.generate_rspec(topo, "%s/%s/" \
1290                    % (self.staging_dir, ename), connInfo)
1291
1292            segment_commands = self.api_proxy(keyfile=ssh_key,
1293                    debug=self.create_debug, log=alloc_log,
1294                    ch_url = self.ch_url, sa_url=self.sa_url,
1295                    cm_url=self.cm_url)
1296            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1297                    pubkey_base, 
1298                    secretkey_base, ename,
1299                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1300                    certfile, topo, connInfo, services)
1301        except EnvironmentError, e:
1302            err = service_error(service_error.internal, "%s" % e)
1303        except service_error, e:
1304            err = e
1305        except:
1306            t, v, st = sys.exc_info()
1307            err = service_error(service_error.internal, "%s: %s" % \
1308                    (v, traceback.extract_tb(st)))
1309
1310        # Walk up tmpdir, deleting as we go
1311        if self.cleanup: self.remove_dirs(tmpdir)
1312        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1313
1314        if rv:
1315            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1316        elif err:
1317            raise service_error(service_error.federant,
1318                    "Swapin failed: %s" % err)
1319        else:
1320            raise service_error(service_error.federant, "Swapin failed")
1321
1322    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1323            slice_urn, certfile, certpw):
1324        """
1325        Stop a sub experiment by calling swapexp on the federant
1326        """
1327        host = self.staging_host
1328        rv = False
1329        try:
1330            # Clean out tar files: we've gone over quota in the past
1331            if stagingdir:
1332                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1333            if slice_cred:
1334                self.log.error('Removing Sliver on ProtoGENI')
1335                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1336                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1337            return True
1338        except self.ssh_cmd_timeout:
1339            rv = False
1340        return rv
1341
1342    def TerminateSegment(self, req, fid):
1343        try:
1344            req = req['TerminateSegmentRequestBody']
1345        except KeyError:
1346            raise service_error(service_error.req, "Badly formed request")
1347
1348        auth_attr = req['allocID']['fedid']
1349        aid = "%s" % auth_attr
1350        attrs = req.get('fedAttr', [])
1351        if not self.auth.check_attribute(fid, auth_attr):
1352            raise service_error(service_error.access, "Access denied")
1353
1354        self.state_lock.acquire()
1355        if self.allocation.has_key(aid):
1356            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1357            slice_cred = self.allocation[aid].get('slice_credential', None)
1358            slice_urn = self.allocation[aid].get('slice_urn', None)
1359            ename = self.allocation[aid].get('experiment', None)
1360        else:
1361            cf, user, ssh_key, cpw = (None, None, None, None)
1362            slice_cred = None
1363            slice_urn = None
1364            ename = None
1365        self.state_lock.release()
1366
1367        if ename:
1368            staging = "%s/%s" % ( self.staging_dir, ename)
1369        else:
1370            self.log.warn("Can't find experiment name for %s" % aid)
1371            staging = None
1372
1373        segment_commands = self.api_proxy(keyfile=ssh_key,
1374                debug=self.create_debug, ch_url = self.ch_url,
1375                sa_url=self.sa_url, cm_url=self.cm_url)
1376        self.stop_segment(segment_commands, user, staging, slice_cred,
1377                slice_urn, cf, cpw)
1378        return { 'allocID': req['allocID'] }
1379
1380    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1381            certfile, certpw):
1382        """
1383        Linear code through the segment renewal calls.
1384        """
1385        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1386        try:
1387            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1388                    time.gmtime(time.time() + interval))
1389            cred = segment_commands.slice_authority_call('GetCredential', 
1390                    {}, ctxt)
1391
1392            param = {
1393                    'credential': scred,
1394                    'expiration': expiration
1395                    }
1396            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1397            param = {
1398                    'credential': cred,
1399                    'hrn': name,
1400                    'type': 'Slice',
1401                    }
1402            slice = segment_commands.slice_authority_call('Resolve', 
1403                    param, ctxt)
1404            uuid = slice.get('uuid', None)
1405            if uuid == None:
1406                sys.exit('No uuid for %s' % slicename)
1407
1408            print 'Calling GetCredential (uuid)'
1409            param = {
1410                    'credential': cred,
1411                    'uuid': uuid,
1412                    'type': 'Slice',
1413                    }
1414            new_scred = segment_commands.slice_authority_call('GetCredential',
1415                    param, ctxt)
1416
1417        except segment_commands.ProtoGENIError, e:
1418            self.log.error("Failed to extend slice %s: %s" % (name, e))
1419            return None
1420        try:
1421            print 'Calling RenewSlice (CM)'
1422            param = {
1423                    'credentials': [new_scred,],
1424                    'slice_urn': slice_urn,
1425                    }
1426            r = segment_commands.component_manager_call('RenewSlice', param, 
1427                    ctxt)
1428        except segment_commands.ProtoGENIError, e:
1429            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1430
1431        return new_scred
1432   
1433
1434    def RenewSlices(self):
1435        self.log.info("Scanning for slices to renew")
1436        self.state_lock.acquire()
1437        aids = self.allocation.keys()
1438        self.state_lock.release()
1439
1440        for aid in aids:
1441            self.state_lock.acquire()
1442            if self.allocation.has_key(aid):
1443                name = self.allocation[aid].get('slice_name', None)
1444                scred = self.allocation[aid].get('slice_credential', None)
1445                slice_urn = self.allocation[aid].get('slice_urn', None)
1446                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1447            else:
1448                name = None
1449                scred = None
1450            self.state_lock.release()
1451
1452            if not os.access(cf, os.R_OK):
1453                self.log.error(
1454                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1455                continue
1456
1457            # There's a ProtoGENI slice associated with the segment; renew it.
1458            if name and scred and slice_urn:
1459                segment_commands = self.api_proxy(log=self.log, 
1460                        debug=self.create_debug, keyfile=ssh_key,
1461                        cm_url = self.cm_url, sa_url = self.sa_url,
1462                        ch_url = self.ch_url)
1463                new_scred = self.renew_segment(segment_commands, name, scred, 
1464                        slice_urn, self.renewal_interval, cf, cpw)
1465                if new_scred:
1466                    self.log.info("Slice %s renewed until %s GMT" % \
1467                            (name, time.asctime(time.gmtime(\
1468                                time.time()+self.renewal_interval))))
1469                    self.state_lock.acquire()
1470                    if self.allocation.has_key(aid):
1471                        self.allocation[aid]['slice_credential'] = new_scred
1472                        self.write_state()
1473                    self.state_lock.release()
1474                else:
1475                    self.log.info("Failed to renew slice %s " % name)
1476
1477        # Let's do this all again soon.  (4 tries before the slices time out)   
1478        t = Timer(self.renewal_interval/4, self.RenewSlices)
1479        t.start()
Note: See TracBrowser for help on using the repository browser.