source: fedd/federation/protogeni_access.py @ 4bfc015

Last change on this file since 4bfc015 was 6bedbdba, checked in by Ted Faber <faber@…>, 13 years ago

Split topdl and fedid out to different packages. Add differential
installs

  • Property mode set to 100644
File size: 47.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from deter import fedid, generate_fedid
20from authorizer import authorizer, abac_authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28from access import access_base
29from protogeni_proxy import protogeni_proxy
30from geniapi_proxy import geniapi_proxy
31
32from deter import topdl
33import list_log
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    def __init__(self, config=None, auth=None):
52        """
53        Initializer.  Pulls parameters out of the ConfigParser's access section.
54        """
55
56        access_base.__init__(self, config, auth)
57
58        self.domain = config.get("access", "domain")
59        self.userconfdir = config.get("access","userconfdir")
60        self.userconfcmd = config.get("access","userconfcmd")
61        self.userconfurl = config.get("access","userconfurl")
62        self.federation_software = config.get("access", "federation_software")
63        self.portal_software = config.get("access", "portal_software")
64        self.ssh_port = config.get("access","ssh_port") or "22"
65        self.sshd = config.get("access","sshd")
66        self.sshd_config = config.get("access", "sshd_config")
67        self.access_type = config.get("access", "type")
68        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
69        self.staging_host = config.get("access", "staging_host") \
70                or "ops.emulab.net"
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74   
75        self.dragon_endpoint = config.get("access", "dragon")
76        self.dragon_vlans = config.get("access", "dragon_vlans")
77        self.deter_internal = config.get("access", "deter_internal")
78
79        self.tunnel_config = config.getboolean("access", "tunnel_config")
80        self.portal_command = config.get("access", "portal_command")
81        self.portal_image = config.get("access", "portal_image")
82        self.portal_type = config.get("access", "portal_type") or "pc"
83        self.portal_startcommand = config.get("access", "portal_startcommand")
84        self.node_startcommand = config.get("access", "node_startcommand")
85
86        self.federation_software = self.software_list(self.federation_software)
87        self.portal_software = self.software_list(self.portal_software)
88        self.local_seer_software = self.software_list(self.local_seer_software)
89
90        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
91        self.renewal_interval = int(self.renewal_interval) * 60
92
93        self.ch_url = config.get("access", "ch_url")
94        self.sa_url = config.get("access", "sa_url")
95        self.cm_url = config.get("access", "cm_url")
96
97        self.restricted = [ ]
98
99        # read_state in the base_class
100        self.state_lock.acquire()
101        for a  in ('allocation', 'projects', 'keys', 'types'):
102            if a not in self.state:
103                self.state[a] = { }
104        self.allocation = self.state['allocation']
105        self.projects = self.state['projects']
106        self.keys = self.state['keys']
107        self.types = self.state['types']
108        self.state_lock.release()
109
110
111        self.log = logging.getLogger("fedd.access")
112        set_log_level(config, "access", self.log)
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'abac'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'abac':
121            self.auth = abac_authorizer(load=self.auth_dir)
122            self.access = [ ]
123            if accessdb:
124                self.read_access(accessdb, self.make_access_info)
125        else:
126            raise service_error(service_error.internal, 
127                    "Unknown auth_type: %s" % self.auth_type)
128        api = config.get("access", "api") or "protogeni"
129        if api == "protogeni":
130            self.api_proxy = protogeni_proxy
131        elif api == "geniapi":
132            self.api_proxy = geniapi_proxy
133        else:
134            self.log.debug("Unknown interface, using protogeni")
135            self.api_proxy = protogeni_proxy
136
137        self.call_SetValue = service_caller('SetValue')
138        self.call_GetValue = service_caller('GetValue')
139        self.exports = {
140                'local_seer_control': self.export_local_seer,
141                'seer_master': self.export_seer_master,
142                'hide_hosts': self.export_hide_hosts,
143                }
144
145        if not self.local_seer_image or not self.local_seer_software or \
146                not self.local_seer_start:
147            if 'local_seer_control' in self.exports:
148                del self.exports['local_seer_control']
149
150        if not self.local_seer_image or not self.local_seer_software or \
151                not self.seer_master_start:
152            if 'seer_master' in self.exports:
153                del self.exports['seer_master']
154
155        self.RenewSlices()
156
157        self.soap_services = {\
158            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
159            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
160            'StartSegment': soap_handler("StartSegment", self.StartSegment),
161            'TerminateSegment': soap_handler("TerminateSegment", 
162                self.TerminateSegment),
163            }
164        self.xmlrpc_services =  {\
165            'RequestAccess': xmlrpc_handler('RequestAccess',
166                self.RequestAccess),
167            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
168                self.ReleaseAccess),
169            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
170            'TerminateSegment': xmlrpc_handler('TerminateSegment',
171                self.TerminateSegment),
172            }
173
174    @staticmethod
175    def make_access_info(s):
176        """
177        Split a string of the form (id, id, id, id) ito its constituent tuples
178        and return them as a tuple.  Use to import access info from the
179        access_db.
180        """
181
182        ss = s.strip()
183        if ss.startswith('(') and ss.endswith(')'):
184            l = [ s.strip() for s  in ss[1:-1].split(",")]
185            if len(l) == 4:
186                return tuple(l)
187            else:
188                raise self.parse_error(
189                        "Exactly 4 elements in access info required")
190        else:
191            raise self.parse_error("Expecting parenthezied values")
192
193
194    def get_handler(self, path, fid):
195        self.log.info("Get handler %s %s" % (path, fid))
196        if self.auth.check_attribute(fid, path) and self.userconfdir:
197            return ("%s/%s" % (self.userconfdir, path), "application/binary")
198        else:
199            return (None, None)
200
201    def build_access_response(self, alloc_id, services, proof):
202        """
203        Create the SOAP response.
204
205        Build the dictionary description of the response and use
206        fedd_utils.pack_soap to create the soap message.  ap is the allocate
207        project message returned from a remote project allocation (even if that
208        allocation was done locally).
209        """
210        # Because alloc_id is already a fedd_services_types.IDType_Holder,
211        # there's no need to repack it
212        msg = { 
213                'allocID': alloc_id,
214                'fedAttr': [
215                    { 'attribute': 'domain', 'value': self.domain } , 
216                ],
217                'proof': proof.to_dict()
218            }
219        if self.dragon_endpoint:
220            msg['fedAttr'].append({'attribute': 'dragon',
221                'value': self.dragon_endpoint})
222        if self.deter_internal:
223            msg['fedAttr'].append({'attribute': 'deter_internal',
224                'value': self.deter_internal})
225        #XXX: ??
226        if self.dragon_vlans:
227            msg['fedAttr'].append({'attribute': 'vlans',
228                'value': self.dragon_vlans})
229
230        if services:
231            msg['service'] = services
232        return msg
233
234    def RequestAccess(self, req, fid):
235        """
236        Handle the access request.
237        """
238
239        # The dance to get into the request body
240        if req.has_key('RequestAccessRequestBody'):
241            req = req['RequestAccessRequestBody']
242        else:
243            raise service_error(service_error.req, "No request!?")
244
245        if req.has_key('destinationTestbed'):
246            dt = unpack_id(req['destinationTestbed'])
247
248        # Request for this fedd
249        found, match, owners, proof = self.lookup_access(req, fid)
250        services, svc_state = self.export_services(req.get('service',[]),
251                None, None)
252        # keep track of what's been added
253        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
254        aid = unicode(allocID)
255
256        self.state_lock.acquire()
257        self.allocation[aid] = { }
258        # The protoGENI certificate
259        self.allocation[aid]['credentials'] = found
260        # The list of owner FIDs
261        self.allocation[aid]['owners'] = owners
262        self.allocation[aid]['auth'] = set()
263        self.append_allocation_authorization(aid, 
264                ((fid, allocID), (allocID, allocID)), state_attr='allocation')
265        self.write_state()
266        self.state_lock.release()
267
268        try:
269            f = open("%s/%s.pem" % (self.certdir, aid), "w")
270            print >>f, alloc_cert
271            f.close()
272        except EnvironmentError, e:
273            raise service_error(service_error.internal, 
274                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
275        return self.build_access_response({ 'fedid': allocID }, None, proof)
276
277
278    def ReleaseAccess(self, req, fid):
279        # The dance to get into the request body
280        if req.has_key('ReleaseAccessRequestBody'):
281            req = req['ReleaseAccessRequestBody']
282        else:
283            raise service_error(service_error.req, "No request!?")
284
285        # Local request
286        try:
287            if req['allocID'].has_key('localname'):
288                auth_attr = aid = req['allocID']['localname']
289            elif req['allocID'].has_key('fedid'):
290                aid = unicode(req['allocID']['fedid'])
291                auth_attr = req['allocID']['fedid']
292            else:
293                raise service_error(service_error.req,
294                        "Only localnames and fedids are understood")
295        except KeyError:
296            raise service_error(service_error.req, "Badly formed request")
297
298        self.log.debug("[access] deallocation requested for %s", aid)
299        access_ok , proof = self.auth.check_attribute(fid, auth_attr,
300                with_proof=True)
301        if not access_ok:
302            self.log.debug("[access] deallocation denied for %s", aid)
303            raise service_error(service_error.access, "Access Denied")
304
305        self.state_lock.acquire()
306        if self.allocation.has_key(aid):
307            self.log.debug("Found allocation for %s" %aid)
308            self.clear_allocation_authorization(aid, state_attr='allocation')
309            del self.allocation[aid]
310            self.write_state()
311            self.state_lock.release()
312            # And remove the access cert
313            cf = "%s/%s.pem" % (self.certdir, aid)
314            self.log.debug("Removing %s" % cf)
315            os.remove(cf)
316            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
317        else:
318            self.state_lock.release()
319            raise service_error(service_error.req, "No such allocation")
320
321    def manifest_to_dict(self, manifest, ignore_debug=False):
322        """
323        Turn the manifest into a dict were each virtual nodename (i.e. the
324        topdl name) has an entry with the allocated machine in hostname and the
325        interfaces in 'interfaces'.  I love having XML parser code lying
326        around.
327        """
328        if self.create_debug and not ignore_debug: 
329            self.log.debug("Returning null manifest dict")
330            return { }
331
332        # The class allows us to keep a little state - the dict under
333        # consteruction and the current entry in that dict for the interface
334        # element code.
335        class manifest_parser:
336            def __init__(self):
337                self.d = { }
338                self.current_key=None
339
340            # If the element is a node, create a dict entry for it.  If it's an
341            # interface inside a node, add an entry in the interfaces list with
342            # the virtual name and component id.
343            def start_element(self, name, attrs):
344                if name == 'node':
345                    self.current_key = attrs.get('virtual_id',"")
346                    if self.current_key:
347                        self.d[self.current_key] = {
348                                'hostname': attrs.get('hostname', None),
349                                'interfaces': { },
350                                'mac': { }
351                                }
352                elif name == 'interface' and self.current_key:
353                    self.d[self.current_key]['interfaces']\
354                            [attrs.get('virtual_id','')] = \
355                            attrs.get('component_id', None)
356                elif name == 'interface_ref':
357                    # Collect mac address information from an interface_ref.
358                    # These appear after the node info has been parsed.
359                    nid = attrs.get('virtual_node_id', None)
360                    ifid = attrs.get('virtual_interface_id', None)
361                    mac = attrs.get('MAC', None)
362                    self.d[nid]['mac'][ifid] = mac
363            #  When a node is finished, clear current_key
364            def end_element(self, name):
365                if name == 'node': self.current_key = None
366
367        node = { }
368
369        mp = manifest_parser()
370        p = xml.parsers.expat.ParserCreate()
371        # These are bound to the class we just created
372        p.StartElementHandler = mp.start_element
373        p.EndElementHandler = mp.end_element
374
375        p.Parse(manifest)
376        # Make the node dict that the callers expect
377        for k in mp.d:
378            node[k] = mp.d.get('hostname', '')
379        return mp.d
380
381    def fake_manifest(self, topo):
382        """
383        Fake the output of manifest_to_dict with a bunch of generic node an
384        interface names, for debugging.
385        """
386        node = { }
387        for i, e in enumerate([ e for e in topo.elements \
388                if isinstance(e, topdl.Computer)]):
389            node[e.name] = { 
390                    'hostname': "node%03d" % i,
391                    'interfaces': { }
392                    }
393            for j, inf in enumerate(e.interface):
394                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
395
396        return node
397
398
399    def generate_portal_configs(self, topo, pubkey_base, 
400            secretkey_base, tmpdir, leid, connInfo, services, nodes):
401
402        def conninfo_to_dict(key, info):
403            """
404            Make a cpoy of the connection information about key, and flatten it
405            into a single dict by parsing out any feddAttrs.
406            """
407
408            rv = None
409            for i in info:
410                if key == i.get('portal', "") or \
411                        key in [e.get('element', "") \
412                        for e in i.get('member', [])]:
413                    rv = i.copy()
414                    break
415
416            else:
417                return rv
418
419            if 'fedAttr' in rv:
420                for a in rv['fedAttr']:
421                    attr = a.get('attribute', "")
422                    val = a.get('value', "")
423                    if attr and attr not in rv:
424                        rv[attr] = val
425                del rv['fedAttr']
426            return rv
427
428        # XXX: un hardcode this
429        def client_null(f, s):
430            print >>f, "Service: %s" % s['name']
431       
432        def client_seer_master(f, s):
433            print >>f, 'PortalAlias: seer-master'
434
435        def client_smb(f, s):
436            print >>f, "Service: %s" % s['name']
437            smbshare = None
438            smbuser = None
439            smbproj = None
440            for a in s.get('fedAttr', []):
441                if a.get('attribute', '') == 'SMBSHARE':
442                    smbshare = a.get('value', None)
443                elif a.get('attribute', '') == 'SMBUSER':
444                    smbuser = a.get('value', None)
445                elif a.get('attribute', '') == 'SMBPROJ':
446                    smbproj = a.get('value', None)
447
448            if all((smbshare, smbuser, smbproj)):
449                print >>f, "SMBshare: %s" % smbshare
450                print >>f, "ProjectUser: %s" % smbuser
451                print >>f, "ProjectName: %s" % smbproj
452
453        def client_hide_hosts(f, s):
454            for a in s.get('fedAttr', [ ]):
455                if a.get('attribute', "") == 'hosts':
456                    print >>f, 'Hide: %s' % a.get('value', "")
457
458        client_service_out = {
459                'SMB': client_smb,
460                'tmcd': client_null,
461                'seer': client_null,
462                'userconfig': client_null,
463                'project_export': client_null,
464                'seer_master': client_seer_master,
465                'hide_hosts': client_hide_hosts,
466            }
467
468        def client_seer_master_export(f, s):
469            print >>f, "AddedNode: seer-master"
470
471        def client_seer_local_export(f, s):
472            print >>f, "AddedNode: control"
473
474        client_export_service_out = {
475                'seer_master': client_seer_master_export,
476                'local_seer_control': client_seer_local_export,
477            }
478
479        def server_port(f, s):
480            p = urlparse(s.get('server', 'http://localhost'))
481            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
482
483        def server_null(f,s): pass
484
485        def server_seer(f, s):
486            print >>f, 'seer: true'
487
488        server_service_out = {
489                'SMB': server_port,
490                'tmcd': server_port,
491                'userconfig': server_null,
492                'project_export': server_null,
493                'seer': server_seer,
494                'seer_master': server_port,
495                'hide_hosts': server_null,
496            }
497        # XXX: end un hardcode this
498
499
500        seer_out = False
501        client_out = False
502        control_gw = None
503        for e in [ e for e in topo.elements \
504                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
505            myname = e.name
506            type = e.get_attribute('portal_type')
507
508            info = conninfo_to_dict(myname, connInfo)
509
510            if not info:
511                raise service_error(service_error.req,
512                        "No connectivity info for %s" % myname)
513
514            # Translate to physical name (ProtoGENI doesn't have DNS)
515            physname = nodes.get(myname, { }).get('hostname', None)
516            peer = info.get('peer', "")
517            ldomain = self.domain
518            ssh_port = info.get('ssh_port', 22)
519
520            # Collect this for the client.conf file
521            if 'masterexperiment' in info:
522                mproj, meid = info['masterexperiment'].split("/", 1)
523
524            active = info.get('active', 'False')
525
526            if type in ('control', 'both'):
527                testbed = e.get_attribute('testbed')
528                control_gw = myname
529
530            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
531            tunnelconfig = self.tunnel_config
532            try:
533                f = open(cfn, "w")
534                if active == 'True':
535                    print >>f, "active: True"
536                    print >>f, "ssh_port: %s" % ssh_port
537                    if type in ('control', 'both'):
538                        for s in [s for s in services \
539                                if s.get('name', "") in self.imports]:
540                            server_service_out[s['name']](f, s)
541
542                if tunnelconfig:
543                    print >>f, "tunnelip: %s" % tunnelconfig
544                print >>f, "peer: %s" % peer.lower()
545                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
546                        pubkey_base
547                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
548                        secretkey_base
549                f.close()
550            except EnvironmentError, e:
551                raise service_error(service_error.internal,
552                        "Can't write protal config %s: %s" % (cfn, e))
553
554        # Done with portals, write the client config file.
555        try:
556            f = open("%s/client.conf" % tmpdir, "w")
557            if control_gw:
558                print >>f, "ControlGateway: %s" % physname.lower()
559            for s in services:
560                if s.get('name',"") in self.imports and \
561                        s.get('visibility','') == 'import':
562                    client_service_out[s['name']](f, s)
563                if s.get('name', '') in self.exports and \
564                        s.get('visibility', '') == 'export' and \
565                        s['name'] in client_export_service_out:
566                    client_export_service_out[s['name']](f, s)
567            # Seer uses this.
568            if mproj and meid:
569                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
570            f.close()
571        except EnvironmentError, e:
572            raise service_error(service_error.internal,
573                    "Cannot write client.conf: %s" %s)
574
575
576
577    def export_store_info(self, cf, nodes, ssh_port, connInfo):
578        """
579        For the export requests in the connection info, install the peer names
580        at the experiment controller via SetValue calls.
581        """
582
583        for c in connInfo:
584            for p in [ p for p in c.get('parameter', []) \
585                    if p.get('type', '') == 'output']:
586
587                if p.get('name', '') == 'peer':
588                    k = p.get('key', None)
589                    surl = p.get('store', None)
590                    if surl and k and k.index('/') != -1:
591                        if self.create_debug:
592                            req = { 'name': k, 'value': 'debug' }
593                            self.call_SetValue(surl, req, cf)
594                        else:
595                            n = nodes.get(k[k.index('/')+1:], { })
596                            value = n.get('hostname', None)
597                            if value:
598                                req = { 'name': k, 'value': value }
599                                self.call_SetValue(surl, req, cf)
600                            else:
601                                self.log.error("No hostname for %s" % \
602                                        k[k.index('/'):])
603                    else:
604                        self.log.error("Bad export request: %s" % p)
605                elif p.get('name', '') == 'ssh_port':
606                    k = p.get('key', None)
607                    surl = p.get('store', None)
608                    if surl and k:
609                        req = { 'name': k, 'value': ssh_port }
610                        self.call_SetValue(surl, req, cf)
611                    else:
612                        self.log.error("Bad export request: %s" % p)
613                else:
614
615                    self.log.error("Unknown export parameter: %s" % \
616                            p.get('name'))
617                    continue
618
619    def write_node_config_script(self, elem, node, user, pubkey,
620            secretkey, stagingdir, tmpdir):
621        """
622        Write out the configuration script that is to run on the node
623        represented by elem in the topology.  This is called
624        once per node to configure.
625        """
626        # These little functions/functors just make things more readable.  Each
627        # one encapsulates a small task of copying software files or installing
628        # them.
629        class stage_file_type:
630            """
631            Write code copying file sfrom the staging host to the host on which
632            this will run.
633            """
634            def __init__(self, user, host, stagingdir):
635                self.user = user
636                self.host = host
637                self.stagingdir = stagingdir
638                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
639                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
640
641            def __call__(self, script, file, dest="."):
642                # If the file is a full pathname, do not use stagingdir
643                if file.find('/') == -1:
644                    file = "%s/%s" % (self.stagingdir, file)
645                print >>script, "%s %s@%s:%s %s" % \
646                        (self.scp, self.user, self.host, file, dest)
647
648        def install_tar(script, loc, base):
649            """
650            Print code to script to install a tarfile in loc.
651            """
652            tar = "/bin/tar"
653            mkdir="/bin/mkdir"
654
655            print >>script, "%s -p %s" % (mkdir, loc)
656            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
657
658        def install_rpm(script, base):
659            """
660            Print code to script to install an rpm
661            """
662            rpm = "/bin/rpm"
663            print >>script, "%s --install %s" % (rpm, base)
664
665        ifconfig = "/sbin/ifconfig"
666        findif = '/usr/local/etc/emulab/findif'
667        stage_file = stage_file_type(user, self.staging_host, stagingdir)
668        pname = node.get('hostname', None)
669        fed_dir = "/usr/local/federation"
670        fed_etc_dir = "%s/etc" % fed_dir
671        fed_bin_dir = "%s/bin" % fed_dir
672        fed_lib_dir = "%s/lib" % fed_dir
673
674        if pname:
675            sfile = "%s/%s.startup" % (tmpdir, pname)
676            script = open(sfile, "w")
677            # Reset the interfaces to the ones in the topo file
678            for i in [ i for i in elem.interface \
679                    if not i.get_attribute('portal')]:
680                if 'interfaces' in node:
681                    pinf = node['interfaces'].get(i.name, None)
682                else:
683                    pinf = None
684
685                if 'mac' in node:
686                    pmac = node['mac'].get(i.name, None)
687                else:
688                    pmac = None
689                addr = i.get_attribute('ip4_address') 
690                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
691                # The interface names in manifests are not to be trusted, so we
692                # find the interface to configure using the local node's script
693                # to match mac address to interface name.
694                if pinf and addr and pmac:
695                    print >>script, '# %s' % pinf
696                    print >>script, \
697                            "%s `%s %s` %s netmask %s"  % \
698                            (ifconfig, findif, pmac, addr, netmask)
699                else:
700                    self.log.error("Missing interface or address for %s" \
701                            % i.name)
702               
703            for l, f in self.federation_software:
704                base = os.path.basename(f)
705                stage_file(script, base)
706                if l: install_tar(script, l, base)
707                else: install_rpm(script, base)
708
709            for s in elem.software:
710                s_base = s.location.rpartition('/')[2]
711                stage_file(script, s_base)
712                if s.install: install_tar(script, s.install, s_base)
713                else: install_rpm(script, s_base)
714
715            for f in ('hosts', pubkey, secretkey, 'client.conf', 
716                    'userconf'):
717                stage_file(script, f, fed_etc_dir)
718            if self.sshd:
719                stage_file(script, self.sshd, fed_bin_dir)
720            if self.sshd_config:
721                stage_file(script, self.sshd_config, fed_etc_dir)
722
723            # Look in tmpdir to get the names.  They've all been copied
724            # into the (remote) staging dir
725            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
726                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
727
728            # Done with staging, remove the identity used to stage
729            print >>script, "#/bin/rm .ssh/id_rsa"
730
731            # Start commands
732            if elem.get_attribute('portal') and self.portal_startcommand:
733                # Install portal software
734                for l, f in self.portal_software:
735                    base = os.path.basename(f)
736                    stage_file(script, base)
737                    if l: install_tar(script, l, base)
738                    else: install_rpm(script, base)
739
740                # Portals never have a user-specified start command
741                print >>script, self.portal_startcommand
742            elif self.node_startcommand:
743                # XXX: debug
744                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
745                # XXX: debug
746                if elem.get_attribute('startup'):
747                    print >>script, "%s \\$USER '%s'" % \
748                            (self.node_startcommand,
749                                    elem.get_attribute('startup'))
750                else:
751                    print >>script, self.node_startcommand
752            script.close()
753            return sfile, pname
754        else:
755            return None, None
756
757
758    def configure_nodes(self, segment_commands, topo, nodes, user, 
759            pubkey, secretkey, stagingdir, tmpdir):
760        """
761        For each node in the topology, generate a script file that copies
762        software onto it and installs it in the proper places and then runs the
763        startup command (including the federation commands.
764        """
765
766
767
768        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
769            vname = e.name
770            sfile, pname = self.write_node_config_script(e,
771                    nodes.get(vname, { }),
772                    user, pubkey, secretkey, stagingdir, tmpdir)
773            if sfile:
774                if not segment_commands.scp_file(sfile, user, pname):
775                    self.log.error("Could not copy script to %s" % pname)
776            else:
777                self.log.error("Unmapped node: %s" % vname)
778
779    def start_node(self, user, host, node, segment_commands):
780        """
781        Copy an identity to a node for the configuration script to be able to
782        import data and then run the startup script remotely.
783        """
784        # Place an identity on the node so that the copying can succeed
785        segment_commands.scp_file( segment_commands.ssh_privkey_file,
786                user, node, ".ssh/id_rsa")
787        segment_commands.ssh_cmd(user, node, 
788                "sudo /bin/sh ./%s.startup &" % node)
789
790    def start_nodes(self, user, host, nodes, segment_commands):
791        """
792        Start a thread to initialize each node and wait for them to complete.
793        Each thread runs start_node.
794        """
795        threads = [ ]
796        for n in nodes:
797            t = Thread(target=self.start_node, args=(user, host, n, 
798                segment_commands))
799            t.start()
800            threads.append(t)
801
802        done = [not t.isAlive() for t in threads]
803        while not all(done):
804            self.log.info("Waiting for threads %s" % done)
805            time.sleep(10)
806            done = [not t.isAlive() for t in threads]
807
808    def set_up_staging_filespace(self, segment_commands, user, host,
809            stagingdir):
810        """
811        Set up teh staging area on the staging machine.  To reduce the number
812        of ssh commands, we compose a script and execute it remotely.
813        """
814
815        self.log.info("[start_segment]: creating script file")
816        try:
817            sf, scriptname = tempfile.mkstemp()
818            scriptfile = os.fdopen(sf, 'w')
819        except EnvironmentError:
820            return False
821
822        scriptbase = os.path.basename(scriptname)
823
824        # Script the filesystem changes
825        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
826        print >>scriptfile, 'mkdir -p %s' % stagingdir
827        print >>scriptfile, "rm -f %s" % scriptbase
828        scriptfile.close()
829
830        # Move the script to the remote machine
831        # XXX: could collide tempfile names on the remote host
832        if segment_commands.scp_file(scriptname, user, host, scriptbase):
833            os.remove(scriptname)
834        else:
835            return False
836
837        # Execute the script (and the script's last line deletes it)
838        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
839            return False
840
841    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
842        """
843        Protogeni interactions take a context and a protogeni certificate.
844        This establishes both for later calls and returns them.
845        """
846        if os.access(certfile, os.R_OK):
847            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
848        else:
849            self.log.error("[start_segment]: Cannot read certfile: %s" % \
850                    certfile)
851            return None, None
852
853        try:
854            gcred = segment_commands.slice_authority_call('GetCredential', 
855                    {}, ctxt)
856        except segment_commands.ProtoGENIError, e:
857            raise service_error(service_error.federant,
858                    "ProtoGENI: %s" % e)
859
860        return ctxt, gcred
861
862    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
863        """
864        Find a usable slice name by trying random ones until there's no
865        collision.
866        """
867
868        def random_slicename(user):
869            """
870            Return a random slicename by appending 5 letters to the username.
871            """
872            slicename = user
873            for i in range(0,5):
874                slicename += random.choice(string.ascii_letters)
875            return slicename
876
877        while True:
878            slicename = random_slicename(user)
879            try:
880                param = {
881                        'credential': gcred, 
882                        'hrn': slicename,
883                        'type': 'Slice'
884                        }
885
886                if not self.create_debug:
887                    segment_commands.slice_authority_call('Resolve', param, 
888                            ctxt)
889                else:
890                    raise segment_commands.ProtoGENIError(0,0,'Debug')
891            except segment_commands.ProtoGENIError, e:
892                print e
893                break
894
895        return slicename
896
897    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
898        """
899        Create the slice and allocate resources.  If any of this stuff fails,
900        the allocations will time out on PG in short order, so we just raise
901        the service_error.  Return the slice and sliver credentials as well as
902        the manifest.
903        """
904        try:
905            param = {
906                    'credential': gcred, 
907                    'hrn': slicename,
908                    'type': 'Slice'
909                    }
910            slice_cred = segment_commands.slice_authority_call('Register',
911                    param, ctxt)
912            # Resolve the slice to get the URN that PG has assigned it.
913            param = {
914                    'credential': gcred,
915                    'type': 'Slice',
916                    'hrn': slicename
917                    }
918            data = segment_commands.slice_authority_call('Resolve', param,
919                    ctxt)
920            if 'urn' in data:
921                slice_urn = data['urn']
922            else:
923                raise service_error(service_error.federant, 
924                        "No URN returned for slice %s" % slicename)
925
926            if 'creator_urn' in data:
927                creator_urn = data['creator_urn']
928            else:
929                raise service_error(service_error.federant, 
930                        "No creator URN returned for slice %s" % slicename)
931            # Populate the ssh keys (let PG format them)
932            param = {
933                    'credential': gcred, 
934                    }
935            keys =  segment_commands.slice_authority_call('GetKeys', param, 
936                    ctxt)
937            # Create a Sliver
938            param = {
939                    'credentials': [ slice_cred ],
940                    'rspec': rspec, 
941                    'users': [ {
942                        'urn': creator_urn,
943                        'keys': keys, 
944                        },
945                    ],
946                    'slice_urn': slice_urn,
947                    }
948            rv = segment_commands.component_manager_call(
949                    'CreateSliver', param, ctxt)
950
951            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
952            # API hands back a sliver credential.  This just finds the format
953            # of choice.
954            if isinstance(rv, tuple):
955                manifest = rv[1]
956            else:
957                manifest = rv
958        except segment_commands.ProtoGENIError, e:
959            raise service_error(service_error.federant,
960                    "ProtoGENI: %s %s" % (e.code, e))
961
962        return (slice_urn, slice_cred, manifest, rspec)
963
964    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
965            timeout=None):
966        """
967        Wait for the given slice to finish its startup.  Return the final
968        status.
969        """
970        completed_states = ('failed', 'ready')
971        status = 'changing'
972        if timeout is not None:
973            end = time.time() + timeout
974        try:
975            while status not in completed_states:
976                param = { 
977                        'credentials': [ slice_cred ],
978                        'slice_urn': slice_urn,
979                        }
980                r = segment_commands.component_manager_call(
981                        'SliverStatus', param, ctxt)
982                # GENIAPI uses geni_status as the key, so check for both
983                status = r.get('status', r.get('geni_status','changing'))
984                if status not in completed_states:
985                    if timeout is not None and time.time() > end:
986                        return 'timeout'
987                    time.sleep(30)
988        except segment_commands.ProtoGENIError, e:
989            raise service_error(service_error.federant,
990                    "ProtoGENI: %s %s" % (e.code, e))
991
992        return status
993
994    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
995        """
996        Delete the slice resources.  An error from the service is ignores,
997        because the soft state will go away anyway.
998        """
999        try:
1000            param = { 
1001                    'credentials': [ slice_cred, ],
1002                    'slice_urn': slice_urn,
1003                    }
1004            segment_commands.component_manager_call('DeleteSlice',
1005                    param, ctxt)
1006        except segment_commands.ProtoGENIError, e:
1007            self.log.warn("ProtoGENI: %s" % e)
1008
1009
1010
1011    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1012            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1013            export_certfile, topo, connInfo, services, timeout=0):
1014        """
1015        Start a sub-experiment on a federant.
1016
1017        Get the current state, modify or create as appropriate, ship data
1018        and configs and start the experiment.  There are small ordering
1019        differences based on the initial state of the sub-experiment.
1020        """
1021
1022        # Local software dir
1023        lsoftdir = "%s/software" % tmpdir
1024        host = self.staging_host
1025
1026        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1027                certfile, certpw)
1028
1029        if not ctxt: return False, {}
1030
1031        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1032        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1033        self.log.info("Creating %s" % slicename)
1034        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1035                segment_commands, slicename, rspec, gcred, ctxt)
1036
1037        # With manifest in hand, we can export the portal node names.
1038        if self.create_debug: nodes = self.fake_manifest(topo)
1039        else: nodes = self.manifest_to_dict(manifest)
1040
1041        self.export_store_info(export_certfile, nodes, self.ssh_port,
1042                connInfo)
1043        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1044                ename, connInfo, services, nodes)
1045
1046        # Copy software to the staging machine (done after generation to copy
1047        # those, too)
1048        for d in (tmpdir, lsoftdir):
1049            if os.path.isdir(d):
1050                for f in os.listdir(d):
1051                    if not os.path.isdir("%s/%s" % (d, f)):
1052                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1053                                user, host, "%s/%s" % (stagingdir, f)):
1054                            self.log.error("Scp failed")
1055                            return False, {}
1056
1057
1058        # Now we wait for the nodes to start on PG
1059        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1060                ctxt, timeout=300)
1061        if status == 'failed':
1062            self.log.error('Sliver failed to start on ProtoGENI')
1063            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1064            return False, {}
1065        elif status == 'timeout':
1066            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1067            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1068            return False, {}
1069        else:
1070            # All good: save ProtoGENI info in shared state
1071            self.state_lock.acquire()
1072            self.allocation[aid]['slice_urn'] = slice_urn
1073            self.allocation[aid]['slice_name'] = slicename
1074            self.allocation[aid]['slice_credential'] = slice_cred
1075            self.allocation[aid]['manifest'] = manifest
1076            self.allocation[aid]['rspec'] = rspec
1077            self.allocation[aid]['certfile'] = certfile
1078            self.allocation[aid]['certpw'] = certpw
1079            self.write_state()
1080            self.state_lock.release()
1081
1082        # Now we have configuration to do for ProtoGENI
1083        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1084                secretkey, stagingdir, tmpdir)
1085
1086        self.start_nodes(user, self.staging_host, 
1087                [ n.get('hostname', None) for n in nodes.values()],
1088                segment_commands)
1089
1090        # Everything has gone OK.
1091        return True, dict([(k, n.get('hostname', None)) \
1092                for k, n in nodes.items()])
1093
1094    def generate_rspec(self, topo, softdir, connInfo):
1095
1096        # Force a useful image.  Without picking this the nodes can get
1097        # different images and there is great pain.
1098        def image_filter(e):
1099            if isinstance(e, topdl.Computer):
1100                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1101                        'image+emulab-ops//FEDORA10-STD" />'
1102            else:
1103                return ""
1104        # Main line of generate
1105        t = topo.clone()
1106
1107        starts = { }
1108        # Weed out the things we aren't going to instantiate: Segments, portal
1109        # substrates, and portal interfaces.  (The copy in the for loop allows
1110        # us to delete from e.elements in side the for loop).  While we're
1111        # touching all the elements, we also adjust paths from the original
1112        # testbed to local testbed paths and put the federation commands and
1113        # startcommands into a dict so we can start them manually later.
1114        # ProtoGENI requires setup before the federation commands run, so we
1115        # run them by hand after we've seeded configurations.
1116        for e in [e for e in t.elements]:
1117            if isinstance(e, topdl.Segment):
1118                t.elements.remove(e)
1119            # Fix software paths
1120            for s in getattr(e, 'software', []):
1121                s.location = re.sub("^.*/", softdir, s.location)
1122            if isinstance(e, topdl.Computer):
1123                if e.get_attribute('portal') and self.portal_startcommand:
1124                    # Portals never have a user-specified start command
1125                    starts[e.name] = self.portal_startcommand
1126                elif self.node_startcommand:
1127                    if e.get_attribute('startup'):
1128                        starts[e.name] = "%s \\$USER '%s'" % \
1129                                (self.node_startcommand, 
1130                                        e.get_attribute('startup'))
1131                        e.remove_attribute('startup')
1132                    else:
1133                        starts[e.name] = self.node_startcommand
1134
1135                # Remove portal interfaces
1136                e.interface = [i for i in e.interface \
1137                        if not i.get_attribute('portal')]
1138
1139        t.substrates = [ s.clone() for s in t.substrates ]
1140        t.incorporate_elements()
1141
1142        # Customize the rspec output to use the image we like
1143        filters = [ image_filter ]
1144
1145        # Convert to rspec and return it
1146        exp_rspec = topdl.topology_to_rspec(t, filters)
1147
1148        return exp_rspec
1149
1150    def retrieve_software(self, topo, certfile, softdir):
1151        """
1152        Collect the software that nodes in the topology need loaded and stage
1153        it locally.  This implies retrieving it from the experiment_controller
1154        and placing it into softdir.  Certfile is used to prove that this node
1155        has access to that data (it's the allocation/segment fedid).  Finally
1156        local portal and federation software is also copied to the same staging
1157        directory for simplicity - all software needed for experiment creation
1158        is in softdir.
1159        """
1160        sw = set()
1161        for e in topo.elements:
1162            for s in getattr(e, 'software', []):
1163                sw.add(s.location)
1164        os.mkdir(softdir)
1165        for s in sw:
1166            self.log.debug("Retrieving %s" % s)
1167            try:
1168                get_url(s, certfile, softdir)
1169            except:
1170                t, v, st = sys.exc_info()
1171                raise service_error(service_error.internal,
1172                        "Error retrieving %s: %s" % (s, v))
1173
1174        # Copy local portal node software to the tempdir
1175        for s in (self.portal_software, self.federation_software):
1176            for l, f in s:
1177                base = os.path.basename(f)
1178                copy_file(f, "%s/%s" % (softdir, base))
1179
1180
1181    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1182        """
1183        Gather common configuration files, retrieve or create an experiment
1184        name and project name, and return the ssh_key filenames.  Create an
1185        allocation log bound to the state log variable as well.
1186        """
1187        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1188        ename = None
1189        pubkey_base = None
1190        secretkey_base = None
1191        alloc_log = None
1192
1193        for a in attrs:
1194            if a['attribute'] in configs:
1195                try:
1196                    self.log.debug("Retrieving %s" % a['value'])
1197                    get_url(a['value'], certfile, tmpdir)
1198                except:
1199                    t, v, st = sys.exc_info()
1200                    raise service_error(service_error.internal,
1201                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1202            if a['attribute'] == 'ssh_pubkey':
1203                pubkey_base = a['value'].rpartition('/')[2]
1204            if a['attribute'] == 'ssh_secretkey':
1205                secretkey_base = a['value'].rpartition('/')[2]
1206            if a['attribute'] == 'experiment_name':
1207                ename = a['value']
1208
1209        if not ename:
1210            ename = ""
1211            for i in range(0,5):
1212                ename += random.choice(string.ascii_letters)
1213            self.log.warn("No experiment name: picked one randomly: %s" \
1214                    % ename)
1215
1216        self.state_lock.acquire()
1217        if self.allocation.has_key(aid):
1218            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1219            self.allocation[aid]['experiment'] = ename
1220            self.allocation[aid]['log'] = [ ]
1221            # Create a logger that logs to the experiment's state object as
1222            # well as to the main log file.
1223            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1224            h = logging.StreamHandler(
1225                    list_log.list_log(self.allocation[aid]['log']))
1226            # XXX: there should be a global one of these rather than
1227            # repeating the code.
1228            h.setFormatter(logging.Formatter(
1229                "%(asctime)s %(name)s %(message)s",
1230                        '%d %b %y %H:%M:%S'))
1231            alloc_log.addHandler(h)
1232            self.write_state()
1233        else:
1234            self.log.error("No allocation for %s!?" % aid)
1235        self.state_lock.release()
1236
1237        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1238                cpw, alloc_log)
1239
1240    def finalize_experiment(self, topo, nodes, aid, alloc_id, proof):
1241        # Copy the assigned names into the return topology
1242        rvtopo = topo.clone()
1243        embedding = [ ]
1244        for k, n in nodes.items():
1245            embedding.append({ 
1246                'toponame': k,
1247                'physname': [n ],
1248                })
1249        # Grab the log (this is some anal locking, but better safe than
1250        # sorry)
1251        self.state_lock.acquire()
1252        logv = "".join(self.allocation[aid]['log'])
1253        # It's possible that the StartSegment call gets retried (!).
1254        # if the 'started' key is in the allocation, we'll return it rather
1255        # than redo the setup.
1256        self.allocation[aid]['started'] = { 
1257                'allocID': alloc_id,
1258                'allocationLog': logv,
1259                'segmentdescription': { 
1260                    'topdldescription': rvtopo.to_dict() },
1261                'embedding': embedding,
1262                'proof': proof.to_dict(),
1263                }
1264        retval = copy.deepcopy(self.allocation[aid]['started'])
1265        self.write_state()
1266        self.state_lock.release()
1267
1268        return retval
1269
1270    def StartSegment(self, req, fid):
1271        err = None  # Any service_error generated after tmpdir is created
1272        rv = None   # Return value from segment creation
1273
1274        try:
1275            req = req['StartSegmentRequestBody']
1276            topref = req['segmentdescription']['topdldescription']
1277        except KeyError:
1278            raise service_error(service_error.req, "Badly formed request")
1279
1280        connInfo = req.get('connection', [])
1281        services = req.get('service', [])
1282        auth_attr = req['allocID']['fedid']
1283        aid = "%s" % auth_attr
1284        attrs = req.get('fedAttr', [])
1285        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1286                with_proof=True)
1287        if not access_ok:
1288            raise service_error(service_error.access, "Access denied")
1289        else:
1290            # See if this is a replay of an earlier succeeded StartSegment -
1291            # sometimes SSL kills 'em.  If so, replay the response rather than
1292            # redoing the allocation.
1293            self.state_lock.acquire()
1294            retval = self.allocation[aid].get('started', None)
1295            self.state_lock.release()
1296            if retval:
1297                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1298                        "replaying response")
1299                return retval
1300
1301        if topref:
1302            topo = topdl.Topology(**topref)
1303        else:
1304            raise service_error(service_error.req, 
1305                    "Request missing segmentdescription'")
1306
1307        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1308        try:
1309            tmpdir = tempfile.mkdtemp(prefix="access-")
1310            softdir = "%s/software" % tmpdir
1311        except EnvironmentError:
1312            raise service_error(service_error.internal, "Cannot create tmp dir")
1313
1314        # Try block alllows us to clean up temporary files.
1315        try:
1316            self.retrieve_software(topo, certfile, softdir)
1317            self.configure_userconf(services, tmpdir)
1318            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1319                cpw, alloc_log = self.initialize_experiment_info(attrs,
1320                        aid, certfile, tmpdir)
1321            self.import_store_info(certfile, connInfo)
1322            rspec = self.generate_rspec(topo, "%s/%s/" \
1323                    % (self.staging_dir, ename), connInfo)
1324
1325            segment_commands = self.api_proxy(keyfile=ssh_key,
1326                    debug=self.create_debug, log=alloc_log,
1327                    ch_url = self.ch_url, sa_url=self.sa_url,
1328                    cm_url=self.cm_url)
1329            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1330                    pubkey_base, 
1331                    secretkey_base, ename,
1332                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1333                    certfile, topo, connInfo, services)
1334        except EnvironmentError, e:
1335            err = service_error(service_error.internal, "%s" % e)
1336        except service_error, e:
1337            err = e
1338        except:
1339            t, v, st = sys.exc_info()
1340            err = service_error(service_error.internal, "%s: %s" % \
1341                    (v, traceback.extract_tb(st)))
1342
1343        # Walk up tmpdir, deleting as we go
1344        if self.cleanup: self.remove_dirs(tmpdir)
1345        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1346
1347        if rv:
1348            return self.finalize_experiment(topo, nodes, aid, req['allocID'],
1349                    proof)
1350        elif err:
1351            raise service_error(service_error.federant,
1352                    "Swapin failed: %s" % err)
1353        else:
1354            raise service_error(service_error.federant, "Swapin failed")
1355
1356    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1357            slice_urn, certfile, certpw):
1358        """
1359        Stop a sub experiment by calling swapexp on the federant
1360        """
1361        host = self.staging_host
1362        rv = False
1363        try:
1364            # Clean out tar files: we've gone over quota in the past
1365            if stagingdir:
1366                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1367            if slice_cred:
1368                self.log.error('Removing Sliver on ProtoGENI')
1369                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1370                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1371            return True
1372        except self.ssh_cmd_timeout:
1373            rv = False
1374        return rv
1375
1376    def TerminateSegment(self, req, fid):
1377        try:
1378            req = req['TerminateSegmentRequestBody']
1379        except KeyError:
1380            raise service_error(service_error.req, "Badly formed request")
1381
1382        auth_attr = req['allocID']['fedid']
1383        aid = "%s" % auth_attr
1384        attrs = req.get('fedAttr', [])
1385        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1386                with_proof=True)
1387        if not access_ok:
1388            raise service_error(service_error.access, "Access denied")
1389
1390        self.state_lock.acquire()
1391        if self.allocation.has_key(aid):
1392            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1393            slice_cred = self.allocation[aid].get('slice_credential', None)
1394            slice_urn = self.allocation[aid].get('slice_urn', None)
1395            ename = self.allocation[aid].get('experiment', None)
1396        else:
1397            cf, user, ssh_key, cpw = (None, None, None, None)
1398            slice_cred = None
1399            slice_urn = None
1400            ename = None
1401        self.state_lock.release()
1402
1403        if ename:
1404            staging = "%s/%s" % ( self.staging_dir, ename)
1405        else:
1406            self.log.warn("Can't find experiment name for %s" % aid)
1407            staging = None
1408
1409        segment_commands = self.api_proxy(keyfile=ssh_key,
1410                debug=self.create_debug, ch_url = self.ch_url,
1411                sa_url=self.sa_url, cm_url=self.cm_url)
1412        self.stop_segment(segment_commands, user, staging, slice_cred,
1413                slice_urn, cf, cpw)
1414        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1415
1416    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1417            certfile, certpw):
1418        """
1419        Linear code through the segment renewal calls.
1420        """
1421        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1422        try:
1423            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1424                    time.gmtime(time.time() + interval))
1425            cred = segment_commands.slice_authority_call('GetCredential', 
1426                    {}, ctxt)
1427
1428            param = {
1429                    'credential': scred,
1430                    'expiration': expiration
1431                    }
1432            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1433            param = {
1434                    'credential': cred,
1435                    'urn': slice_urn,
1436                    'type': 'Slice',
1437                    }
1438            new_scred = segment_commands.slice_authority_call('GetCredential',
1439                    param, ctxt)
1440
1441        except segment_commands.ProtoGENIError, e:
1442            self.log.error("Failed to extend slice %s: %s" % (name, e))
1443            return None
1444        try:
1445            param = {
1446                    'credentials': [new_scred,],
1447                    'slice_urn': slice_urn,
1448                    }
1449            r = segment_commands.component_manager_call('RenewSlice', param, 
1450                    ctxt)
1451        except segment_commands.ProtoGENIError, e:
1452            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1453
1454        return new_scred
1455   
1456
1457    def RenewSlices(self):
1458        self.log.info("Scanning for slices to renew")
1459        self.state_lock.acquire()
1460        aids = self.allocation.keys()
1461        self.state_lock.release()
1462
1463        for aid in aids:
1464            self.state_lock.acquire()
1465            if self.allocation.has_key(aid):
1466                name = self.allocation[aid].get('slice_name', None)
1467                scred = self.allocation[aid].get('slice_credential', None)
1468                slice_urn = self.allocation[aid].get('slice_urn', None)
1469                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1470            else:
1471                name = None
1472                scred = None
1473            self.state_lock.release()
1474
1475            if not os.access(cf, os.R_OK):
1476                self.log.error(
1477                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1478                continue
1479
1480            # There's a ProtoGENI slice associated with the segment; renew it.
1481            if name and scred and slice_urn:
1482                segment_commands = self.api_proxy(log=self.log, 
1483                        debug=self.create_debug, keyfile=ssh_key,
1484                        cm_url = self.cm_url, sa_url = self.sa_url,
1485                        ch_url = self.ch_url)
1486                new_scred = self.renew_segment(segment_commands, name, scred, 
1487                        slice_urn, self.renewal_interval, cf, cpw)
1488                if new_scred:
1489                    self.log.info("Slice %s renewed until %s GMT" % \
1490                            (name, time.asctime(time.gmtime(\
1491                                time.time()+self.renewal_interval))))
1492                    self.state_lock.acquire()
1493                    if self.allocation.has_key(aid):
1494                        self.allocation[aid]['slice_credential'] = new_scred
1495                        self.write_state()
1496                    self.state_lock.release()
1497                else:
1498                    self.log.info("Failed to renew slice %s " % name)
1499
1500        # Let's do this all again soon.  (4 tries before the slices time out)   
1501        t = Timer(self.renewal_interval/4, self.RenewSlices)
1502        t.start()
Note: See TracBrowser for help on using the repository browser.