source: fedd/federation/protogeni_access.py @ 5c35160

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5c35160 was 5c35160, checked in by Ted Faber <faber@…>, 14 years ago

remove debugging

  • Property mode set to 100644
File size: 47.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17from M2Crypto.SSL import SSLError
18
19from util import *
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from access import access_base
31from proxy_segment import proxy_segment
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class protogeni_proxy(proxy_segment):
45    class ProtoGENIError(Exception): 
46        def __init__(self, op, code, output):
47            Exception.__init__(self, output)
48            self.op = op
49            self.code = code
50            self.output = output
51
52    def __init__(self, log=None, keyfile=None, debug=False, 
53            ch_url=None, sa_url=None, cm_url=None):
54        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
55
56        self.ProtoGENIError = protogeni_proxy.ProtoGENIError
57        self.ch_url = ch_url
58        self.sa_url = sa_url
59        self.cm_url = cm_url
60
61        self.call_SetValue = service_caller('SetValue')
62
63        self.debug_fail = ['Resolve']
64        self.debug_response = {
65                'RedeemTicket': ("XML blob1", "XML blob2"),
66                'SliceStatus': { 'status': 'ready' },
67            }
68
69
70    def pg_call(self, url, method, params, context):
71        max_retries = 5
72        retries = 0
73
74        s = service_caller(method, request_body_name="", strict=False)
75        self.log.debug("Calling %s %s" % (url, method))
76        if not self.debug:
77            while retries < max_retries:
78                r = s.call_xmlrpc_service(url, params, context=context)
79                code = r.get('code', -1)
80                if code == 0:
81                    # Success leaves the loop here
82                    return r.get('value', None)
83                elif code == 14 and retries +1 < max_retries:
84                    # Busy resource
85                    retries+= 1
86                    self.log.info("Resource busy, retrying in 30 secs")
87                    time.sleep(30)
88                else:
89                    # NB: out of retries falls through to here
90                    raise self.ProtoGENIError(op=method, 
91                            code=r.get('code', 'unknown'), 
92                            output=r.get('output', 'No output'))
93        else:
94            if method in self.debug_fail:
95                raise self.ProtoGENIError(op=method, code='unknown',
96                        output='No output')
97            elif self.debug_response.has_key(method):
98                return self.debug_response[method]
99            else:
100                return "%s XML blob" % method
101
102
103
104class access(access_base):
105    """
106    The implementation of access control based on mapping users to projects.
107
108    Users can be mapped to existing projects or have projects created
109    dynamically.  This implements both direct requests and proxies.
110    """
111
112    def __init__(self, config=None, auth=None):
113        """
114        Initializer.  Pulls parameters out of the ConfigParser's access section.
115        """
116
117        access_base.__init__(self, config, auth)
118
119        self.domain = config.get("access", "domain")
120        self.userconfdir = config.get("access","userconfdir")
121        self.userconfcmd = config.get("access","userconfcmd")
122        self.userconfurl = config.get("access","userconfurl")
123        self.federation_software = config.get("access", "federation_software")
124        self.portal_software = config.get("access", "portal_software")
125        self.ssh_port = config.get("access","ssh_port") or "22"
126        self.sshd = config.get("access","sshd")
127        self.sshd_config = config.get("access", "sshd_config")
128        self.access_type = config.get("access", "type")
129        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
130        self.staging_host = config.get("access", "staging_host") \
131                or "ops.emulab.net"
132        self.local_seer_software = config.get("access", "local_seer_software")
133        self.local_seer_image = config.get("access", "local_seer_image")
134        self.local_seer_start = config.get("access", "local_seer_start")
135   
136        self.dragon_endpoint = config.get("access", "dragon")
137        self.dragon_vlans = config.get("access", "dragon_vlans")
138        self.deter_internal = config.get("access", "deter_internal")
139
140        self.tunnel_config = config.getboolean("access", "tunnel_config")
141        self.portal_command = config.get("access", "portal_command")
142        self.portal_image = config.get("access", "portal_image")
143        self.portal_type = config.get("access", "portal_type") or "pc"
144        self.portal_startcommand = config.get("access", "portal_startcommand")
145        self.node_startcommand = config.get("access", "node_startcommand")
146
147        self.federation_software = self.software_list(self.federation_software)
148        self.portal_software = self.software_list(self.portal_software)
149        self.local_seer_software = self.software_list(self.local_seer_software)
150
151        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
152        self.renewal_interval = int(self.renewal_interval) * 60
153
154        self.ch_url = config.get("access", "ch_url")
155        self.sa_url = config.get("access", "sa_url")
156        self.cm_url = config.get("access", "cm_url")
157
158        self.restricted = [ ]
159
160        # read_state in the base_class
161        self.state_lock.acquire()
162        for a  in ('allocation', 'projects', 'keys', 'types'):
163            if a not in self.state:
164                self.state[a] = { }
165        self.allocation = self.state['allocation']
166        self.projects = self.state['projects']
167        self.keys = self.state['keys']
168        self.types = self.state['types']
169        # Add the ownership attributes to the authorizer.  Note that the
170        # indices of the allocation dict are strings, but the attributes are
171        # fedids, so there is a conversion.
172        for k in self.state.get('allocation', {}).keys():
173            for o in self.state['allocation'][k].get('owners', []):
174                self.auth.set_attribute(o, fedid(hexstr=k))
175            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
176
177        self.state_lock.release()
178
179
180        self.log = logging.getLogger("fedd.access")
181        set_log_level(config, "access", self.log)
182
183        self.access = { }
184        if config.has_option("access", "accessdb"):
185            self.read_access(config.get("access", "accessdb"), 
186                    access_obj=self.make_access_info)
187
188        self.lookup_access = self.lookup_access_base
189
190        self.call_SetValue = service_caller('SetValue')
191        self.call_GetValue = service_caller('GetValue')
192        self.exports = {
193                'local_seer_control': self.export_local_seer,
194                'seer_master': self.export_seer_master,
195                'hide_hosts': self.export_hide_hosts,
196                }
197
198        if not self.local_seer_image or not self.local_seer_software or \
199                not self.local_seer_start:
200            if 'local_seer_control' in self.exports:
201                del self.exports['local_seer_control']
202
203        if not self.local_seer_image or not self.local_seer_software or \
204                not self.seer_master_start:
205            if 'seer_master' in self.exports:
206                del self.exports['seer_master']
207
208        self.RenewSlices()
209
210        self.soap_services = {\
211            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
212            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
213            'StartSegment': soap_handler("StartSegment", self.StartSegment),
214            'TerminateSegment': soap_handler("TerminateSegment", 
215                self.TerminateSegment),
216            }
217        self.xmlrpc_services =  {\
218            'RequestAccess': xmlrpc_handler('RequestAccess',
219                self.RequestAccess),
220            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
221                self.ReleaseAccess),
222            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
223            'TerminateSegment': xmlrpc_handler('TerminateSegment',
224                self.TerminateSegment),
225            }
226
227    @staticmethod
228    def make_access_info(s):
229        """
230        Split a string of the form (id, id, id, id) ito its constituent tuples
231        and return them as a tuple.  Use to import access info from the
232        access_db.
233        """
234
235        ss = s.strip()
236        if ss.startswith('(') and ss.endswith(')'):
237            l = [ s.strip() for s  in ss[1:-1].split(",")]
238            if len(l) == 4:
239                return tuple(l)
240            else:
241                raise self.parse_error(
242                        "Exactly 4 elements in access info required")
243        else:
244            raise self.parse_error("Expecting parenthezied values")
245
246
247    def get_handler(self, path, fid):
248        self.log.info("Get handler %s %s" % (path, fid))
249        if self.auth.check_attribute(fid, path) and self.userconfdir:
250            return ("%s/%s" % (self.userconfdir, path), "application/binary")
251        else:
252            return (None, None)
253
254    def build_access_response(self, alloc_id, services):
255        """
256        Create the SOAP response.
257
258        Build the dictionary description of the response and use
259        fedd_utils.pack_soap to create the soap message.  ap is the allocate
260        project message returned from a remote project allocation (even if that
261        allocation was done locally).
262        """
263        # Because alloc_id is already a fedd_services_types.IDType_Holder,
264        # there's no need to repack it
265        msg = { 
266                'allocID': alloc_id,
267                'fedAttr': [
268                    { 'attribute': 'domain', 'value': self.domain } , 
269                ]
270            }
271        if self.dragon_endpoint:
272            msg['fedAttr'].append({'attribute': 'dragon',
273                'value': self.dragon_endpoint})
274        if self.deter_internal:
275            msg['fedAttr'].append({'attribute': 'deter_internal',
276                'value': self.deter_internal})
277        #XXX: ??
278        if self.dragon_vlans:
279            msg['fedAttr'].append({'attribute': 'vlans',
280                'value': self.dragon_vlans})
281
282        if services:
283            msg['service'] = services
284        return msg
285
286    def RequestAccess(self, req, fid):
287        """
288        Handle the access request.
289        """
290
291        # The dance to get into the request body
292        if req.has_key('RequestAccessRequestBody'):
293            req = req['RequestAccessRequestBody']
294        else:
295            raise service_error(service_error.req, "No request!?")
296
297        if req.has_key('destinationTestbed'):
298            dt = unpack_id(req['destinationTestbed'])
299
300        # Request for this fedd
301        found, match = self.lookup_access(req, fid)
302        services, svc_state = self.export_services(req.get('service',[]),
303                None, None)
304        # keep track of what's been added
305        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
306        aid = unicode(allocID)
307
308        self.state_lock.acquire()
309        self.allocation[aid] = { }
310        # The protoGENI certificate
311        self.allocation[aid]['credentials'] = found
312        # The list of owner FIDs
313        self.allocation[aid]['owners'] = [ fid ]
314        self.write_state()
315        self.state_lock.release()
316        self.auth.set_attribute(fid, allocID)
317        self.auth.set_attribute(allocID, allocID)
318
319        try:
320            f = open("%s/%s.pem" % (self.certdir, aid), "w")
321            print >>f, alloc_cert
322            f.close()
323        except EnvironmentError, e:
324            raise service_error(service_error.internal, 
325                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
326        return self.build_access_response({ 'fedid': allocID }, None)
327
328
329    def ReleaseAccess(self, req, fid):
330        # The dance to get into the request body
331        if req.has_key('ReleaseAccessRequestBody'):
332            req = req['ReleaseAccessRequestBody']
333        else:
334            raise service_error(service_error.req, "No request!?")
335
336        # Local request
337        try:
338            if req['allocID'].has_key('localname'):
339                auth_attr = aid = req['allocID']['localname']
340            elif req['allocID'].has_key('fedid'):
341                aid = unicode(req['allocID']['fedid'])
342                auth_attr = req['allocID']['fedid']
343            else:
344                raise service_error(service_error.req,
345                        "Only localnames and fedids are understood")
346        except KeyError:
347            raise service_error(service_error.req, "Badly formed request")
348
349        self.log.debug("[access] deallocation requested for %s", aid)
350        if not self.auth.check_attribute(fid, auth_attr):
351            self.log.debug("[access] deallocation denied for %s", aid)
352            raise service_error(service_error.access, "Access Denied")
353
354        self.state_lock.acquire()
355        if self.allocation.has_key(aid):
356            self.log.debug("Found allocation for %s" %aid)
357            del self.allocation[aid]
358            self.write_state()
359            self.state_lock.release()
360            # And remove the access cert
361            cf = "%s/%s.pem" % (self.certdir, aid)
362            self.log.debug("Removing %s" % cf)
363            os.remove(cf)
364            return { 'allocID': req['allocID'] } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    def manifest_to_dict(self, manifest, ignore_debug=False):
370        """
371        Turn the manifest into a dict were each virtual nodename (i.e. the
372        topdl name) has an entry with the allocated machine in hostname and the
373        interfaces in 'interfaces'.  I love having XML parser code lying
374        around.
375        """
376        if self.create_debug and not ignore_debug: 
377            self.log.debug("Returning null manifest dict")
378            return { }
379
380        # The class allows us to keep a little state - the dict under
381        # consteruction and the current entry in that dict for the interface
382        # element code.
383        class manifest_parser:
384            def __init__(self):
385                self.d = { }
386                self.current_key=None
387
388            # If the element is a node, create a dict entry for it.  If it's an
389            # interface inside a node, add an entry in the interfaces list with
390            # the virtual name and component id.
391            def start_element(self, name, attrs):
392                if name == 'node':
393                    self.current_key = attrs.get('virtual_id',"")
394                    if self.current_key:
395                        self.d[self.current_key] = {
396                                'hostname': attrs.get('hostname', None),
397                                'interfaces': { }
398                                }
399                elif name == 'interface' and self.current_key:
400                    self.d[self.current_key]['interfaces']\
401                            [attrs.get('virtual_id','')] = \
402                            attrs.get('component_id', None)
403            #  When a node is finished, clear current_key
404            def end_element(self, name):
405                if name == 'node': self.current_key = None
406
407        node = { }
408
409        mp = manifest_parser()
410        p = xml.parsers.expat.ParserCreate()
411        # These are bound to the class we just created
412        p.StartElementHandler = mp.start_element
413        p.EndElementHandler = mp.end_element
414
415        p.Parse(manifest)
416        # Make the node dict that the callers expect
417        for k in mp.d:
418            node[k] = mp.d.get('hostname', '')
419        return mp.d
420
421    def fake_manifest(self, topo):
422        """
423        Fake the output of manifest_to_dict with a bunch of generic node an
424        interface names, for debugging.
425        """
426        node = { }
427        for i, e in enumerate([ e for e in topo.elements \
428                if isinstance(e, topdl.Computer)]):
429            node[e.name] = { 
430                    'hostname': "node%03d" % i,
431                    'interfaces': { }
432                    }
433            for j, inf in enumerate(e.interface):
434                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
435
436        return node
437
438
439    def generate_portal_configs(self, topo, pubkey_base, 
440            secretkey_base, tmpdir, leid, connInfo, services, nodes):
441
442        def conninfo_to_dict(key, info):
443            """
444            Make a cpoy of the connection information about key, and flatten it
445            into a single dict by parsing out any feddAttrs.
446            """
447
448            rv = None
449            for i in info:
450                if key == i.get('portal', "") or \
451                        key in [e.get('element', "") \
452                        for e in i.get('member', [])]:
453                    rv = i.copy()
454                    break
455
456            else:
457                return rv
458
459            if 'fedAttr' in rv:
460                for a in rv['fedAttr']:
461                    attr = a.get('attribute', "")
462                    val = a.get('value', "")
463                    if attr and attr not in rv:
464                        rv[attr] = val
465                del rv['fedAttr']
466            return rv
467
468        # XXX: un hardcode this
469        def client_null(f, s):
470            print >>f, "Service: %s" % s['name']
471       
472        def client_seer_master(f, s):
473            print >>f, 'PortalAlias: seer-master'
474
475        def client_smb(f, s):
476            print >>f, "Service: %s" % s['name']
477            smbshare = None
478            smbuser = None
479            smbproj = None
480            for a in s.get('fedAttr', []):
481                if a.get('attribute', '') == 'SMBSHARE':
482                    smbshare = a.get('value', None)
483                elif a.get('attribute', '') == 'SMBUSER':
484                    smbuser = a.get('value', None)
485                elif a.get('attribute', '') == 'SMBPROJ':
486                    smbproj = a.get('value', None)
487
488            if all((smbshare, smbuser, smbproj)):
489                print >>f, "SMBshare: %s" % smbshare
490                print >>f, "ProjectUser: %s" % smbuser
491                print >>f, "ProjectName: %s" % smbproj
492
493        def client_hide_hosts(f, s):
494            for a in s.get('fedAttr', [ ]):
495                if a.get('attribute', "") == 'hosts':
496                    print >>f, 'Hide: %s' % a.get('value', "")
497
498        client_service_out = {
499                'SMB': client_smb,
500                'tmcd': client_null,
501                'seer': client_null,
502                'userconfig': client_null,
503                'project_export': client_null,
504                'seer_master': client_seer_master,
505                'hide_hosts': client_hide_hosts,
506            }
507
508        def client_seer_master_export(f, s):
509            print >>f, "AddedNode: seer-master"
510
511        def client_seer_local_export(f, s):
512            print >>f, "AddedNode: control"
513
514        client_export_service_out = {
515                'seer_master': client_seer_master_export,
516                'local_seer_control': client_seer_local_export,
517            }
518
519        def server_port(f, s):
520            p = urlparse(s.get('server', 'http://localhost'))
521            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
522
523        def server_null(f,s): pass
524
525        def server_seer(f, s):
526            print >>f, 'seer: true'
527
528        server_service_out = {
529                'SMB': server_port,
530                'tmcd': server_port,
531                'userconfig': server_null,
532                'project_export': server_null,
533                'seer': server_seer,
534                'seer_master': server_port,
535                'hide_hosts': server_null,
536            }
537        # XXX: end un hardcode this
538
539
540        seer_out = False
541        client_out = False
542        for e in [ e for e in topo.elements \
543                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
544            myname = e.name
545            type = e.get_attribute('portal_type')
546
547            info = conninfo_to_dict(myname, connInfo)
548
549            if not info:
550                raise service_error(service_error.req,
551                        "No connectivity info for %s" % myname)
552
553            # Translate to physical name (ProtoGENI doesn't have DNS)
554            physname = nodes.get(myname, { }).get('hostname', None)
555            peer = info.get('peer', "")
556            ldomain = self.domain
557            ssh_port = info.get('ssh_port', 22)
558
559            # Collect this for the client.conf file
560            if 'masterexperiment' in info:
561                mproj, meid = info['masterexperiment'].split("/", 1)
562
563            active = info.get('active', 'False')
564
565            if type in ('control', 'both'):
566                testbed = e.get_attribute('testbed')
567                control_gw = myname
568
569            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
570            tunnelconfig = self.tunnel_config
571            try:
572                f = open(cfn, "w")
573                if active == 'True':
574                    print >>f, "active: True"
575                    print >>f, "ssh_port: %s" % ssh_port
576                    if type in ('control', 'both'):
577                        for s in [s for s in services \
578                                if s.get('name', "") in self.imports]:
579                            server_service_out[s['name']](f, s)
580
581                if tunnelconfig:
582                    print >>f, "tunnelip: %s" % tunnelconfig
583                print >>f, "peer: %s" % peer.lower()
584                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
585                        pubkey_base
586                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
587                        secretkey_base
588                f.close()
589            except EnvironmentError, e:
590                raise service_error(service_error.internal,
591                        "Can't write protal config %s: %s" % (cfn, e))
592
593        # Done with portals, write the client config file.
594        try:
595            f = open("%s/client.conf" % tmpdir, "w")
596            if control_gw:
597                print >>f, "ControlGateway: %s" % physname.lower()
598            for s in services:
599                if s.get('name',"") in self.imports and \
600                        s.get('visibility','') == 'import':
601                    client_service_out[s['name']](f, s)
602                if s.get('name', '') in self.exports and \
603                        s.get('visibility', '') == 'export' and \
604                        s['name'] in client_export_service_out:
605                    client_export_service_out[s['name']](f, s)
606            # Seer uses this.
607            if mproj and meid:
608                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
609            f.close()
610        except EnvironmentError, e:
611            raise service_error(service_error.internal,
612                    "Cannot write client.conf: %s" %s)
613
614
615
616    def export_store_info(self, cf, nodes, ssh_port, connInfo):
617        """
618        For the export requests in the connection info, install the peer names
619        at the experiment controller via SetValue calls.
620        """
621
622        for c in connInfo:
623            for p in [ p for p in c.get('parameter', []) \
624                    if p.get('type', '') == 'output']:
625
626                if p.get('name', '') == 'peer':
627                    k = p.get('key', None)
628                    surl = p.get('store', None)
629                    if surl and k and k.index('/') != -1:
630                        if self.create_debug:
631                            req = { 'name': k, 'value': 'debug' }
632                            self.call_SetValue(surl, req, cf)
633                        else:
634                            n = nodes.get(k[k.index('/')+1:], { })
635                            value = n.get('hostname', None)
636                            if value:
637                                req = { 'name': k, 'value': value }
638                                self.call_SetValue(surl, req, cf)
639                            else:
640                                self.log.error("No hostname for %s" % \
641                                        k[k.index('/'):])
642                    else:
643                        self.log.error("Bad export request: %s" % p)
644                elif p.get('name', '') == 'ssh_port':
645                    k = p.get('key', None)
646                    surl = p.get('store', None)
647                    if surl and k:
648                        req = { 'name': k, 'value': ssh_port }
649                        self.call_SetValue(surl, req, cf)
650                    else:
651                        self.log.error("Bad export request: %s" % p)
652                else:
653
654                    self.log.error("Unknown export parameter: %s" % \
655                            p.get('name'))
656                    continue
657
658    def write_node_config_script(self, elem, node, user, pubkey,
659            secretkey, stagingdir, tmpdir):
660        """
661        Write out the configuration script that is to run on the node
662        represented by elem in the topology.  This is called
663        once per node to configure.
664        """
665        # These little functions/functors just make things more readable.  Each
666        # one encapsulates a small task of copying software files or installing
667        # them.
668        class stage_file_type:
669            """
670            Write code copying file sfrom the staging host to the host on which
671            this will run.
672            """
673            def __init__(self, user, host, stagingdir):
674                self.user = user
675                self.host = host
676                self.stagingdir = stagingdir
677                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
678                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
679
680            def __call__(self, script, file, dest="."):
681                # If the file is a full pathname, do not use stagingdir
682                if file.find('/') == -1:
683                    file = "%s/%s" % (self.stagingdir, file)
684                print >>script, "%s %s@%s:%s %s" % \
685                        (self.scp, self.user, self.host, file, dest)
686
687        def install_tar(script, loc, base):
688            """
689            Print code to script to install a tarfile in loc.
690            """
691            tar = "/bin/tar"
692            mkdir="/bin/mkdir"
693
694            print >>script, "%s -p %s" % (mkdir, loc)
695            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
696
697        def install_rpm(script, base):
698            """
699            Print code to script to install an rpm
700            """
701            rpm = "/bin/rpm"
702            print >>script, "%s --install %s" % (rpm, base)
703
704        ifconfig = "/sbin/ifconfig"
705        stage_file = stage_file_type(user, self.staging_host, stagingdir)
706        pname = node.get('hostname', None)
707        fed_dir = "/usr/local/federation"
708        fed_etc_dir = "%s/etc" % fed_dir
709        fed_bin_dir = "%s/bin" % fed_dir
710        fed_lib_dir = "%s/lib" % fed_dir
711
712        if pname:
713            sfile = "%s/%s.startup" % (tmpdir, pname)
714            script = open(sfile, "w")
715            # Reset the interfaces to the ones in the topo file
716            for i in [ i for i in elem.interface \
717                    if not i.get_attribute('portal')]:
718                pinf = node['interfaces'].get(i.name, None)
719                addr = i.get_attribute('ip4_address') 
720                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
721                if pinf and addr:
722                    print >>script, \
723                            "%s %s %s netmask %s"  % \
724                            (ifconfig, pinf, addr, netmask)
725                else:
726                    self.log.error("Missing interface or address for %s" \
727                            % i.name)
728               
729            for l, f in self.federation_software:
730                base = os.path.basename(f)
731                stage_file(script, base)
732                if l: install_tar(script, l, base)
733                else: install_rpm(script, base)
734
735            for s in elem.software:
736                s_base = s.location.rpartition('/')[2]
737                stage_file(script, s_base)
738                if s.install: install_tar(script, s.install, s_base)
739                else: install_rpm(script, s_base)
740
741            for f in ('hosts', pubkey, secretkey, 'client.conf', 
742                    'userconf'):
743                stage_file(script, f, fed_etc_dir)
744            if self.sshd:
745                stage_file(script, self.sshd, fed_bin_dir)
746            if self.sshd_config:
747                stage_file(script, self.sshd_config, fed_etc_dir)
748
749            # Look in tmpdir to get the names.  They've all been copied
750            # into the (remote) staging dir
751            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
752                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
753
754            # Done with staging, remove the identity used to stage
755            print >>script, "#/bin/rm .ssh/id_rsa"
756
757            # Start commands
758            if elem.get_attribute('portal') and self.portal_startcommand:
759                # Install portal software
760                for l, f in self.portal_software:
761                    base = os.path.basename(f)
762                    stage_file(script, base)
763                    if l: install_tar(script, l, base)
764                    else: install_rpm(script, base)
765
766                # Portals never have a user-specified start command
767                print >>script, self.portal_startcommand
768            elif self.node_startcommand:
769                # XXX: debug
770                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
771                # XXX: debug
772                if elem.get_attribute('startup'):
773                    print >>script, "%s \\$USER '%s'" % \
774                            (self.node_startcommand,
775                                    elem.get_attribute('startup'))
776                else:
777                    print >>script, self.node_startcommand
778            script.close()
779            return sfile, pname
780        else:
781            return None, None
782
783
784    def configure_nodes(self, segment_commands, topo, nodes, user, 
785            pubkey, secretkey, stagingdir, tmpdir):
786        """
787        For each node in the topology, generate a script file that copies
788        software onto it and installs it in the proper places and then runs the
789        startup command (including the federation commands.
790        """
791
792
793
794        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
795            vname = e.name
796            sfile, pname = self.write_node_config_script(e,
797                    nodes.get(vname, { }),
798                    user, pubkey, secretkey, stagingdir, tmpdir)
799            if sfile:
800                if not segment_commands.scp_file(sfile, user, pname):
801                    self.log.error("Could not copy script to %s" % pname)
802            else:
803                self.log.error("Unmapped node: %s" % vname)
804
805    def start_node(self, user, host, node, segment_commands):
806        """
807        Copy an identity to a node for the configuration script to be able to
808        import data and then run the startup script remotely.
809        """
810        # Place an identity on the node so that the copying can succeed
811        segment_commands.scp_file( segment_commands.ssh_privkey_file,
812                user, node, ".ssh/id_rsa")
813        segment_commands.ssh_cmd(user, node, 
814                "sudo /bin/sh ./%s.startup &" % node)
815
816    def start_nodes(self, user, host, nodes, segment_commands):
817        """
818        Start a thread to initialize each node and wait for them to complete.
819        Each thread runs start_node.
820        """
821        threads = [ ]
822        for n in nodes:
823            t = Thread(target=self.start_node, args=(user, host, n, 
824                segment_commands))
825            t.start()
826            threads.append(t)
827
828        done = [not t.isAlive() for t in threads]
829        while not all(done):
830            self.log.info("Waiting for threads %s" % done)
831            time.sleep(10)
832            done = [not t.isAlive() for t in threads]
833
834    def set_up_staging_filespace(self, segment_commands, user, host,
835            stagingdir):
836        """
837        Set up teh staging area on the staging machine.  To reduce the number
838        of ssh commands, we compose a script and execute it remotely.
839        """
840
841        self.log.info("[start_segment]: creating script file")
842        try:
843            sf, scriptname = tempfile.mkstemp()
844            scriptfile = os.fdopen(sf, 'w')
845        except EnvironmentError:
846            return False
847
848        scriptbase = os.path.basename(scriptname)
849
850        # Script the filesystem changes
851        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
852        print >>scriptfile, 'mkdir -p %s' % stagingdir
853        print >>scriptfile, "rm -f %s" % scriptbase
854        scriptfile.close()
855
856        # Move the script to the remote machine
857        # XXX: could collide tempfile names on the remote host
858        if segment_commands.scp_file(scriptname, user, host, scriptbase):
859            os.remove(scriptname)
860        else:
861            return False
862
863        # Execute the script (and the script's last line deletes it)
864        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
865            return False
866
867    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
868        """
869        Protogeni interactions take a context and a protogeni certificate.
870        This establishes both for later calls and returns them.
871        """
872        if os.access(certfile, os.R_OK):
873            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
874        else:
875            self.log.error("[start_segment]: Cannot read certfile: %s" % \
876                    certfile)
877            return None, None
878
879        try:
880            gcred = segment_commands.pg_call(self.sa_url, 
881                    'GetCredential', {}, ctxt)
882        except self.ProtoGENIError, e:
883            raise service_error(service_error.federant,
884                    "ProtoGENI: %s" % e)
885
886        return ctxt, gcred
887
888    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
889        """
890        Find a usable slice name by trying random ones until there's no
891        collision.
892        """
893
894        def random_slicename(user):
895            """
896            Return a random slicename by appending 5 letters to the username.
897            """
898            slicename = user
899            for i in range(0,5):
900                slicename += random.choice(string.ascii_letters)
901            return slicename
902
903        while True:
904            slicename = random_slicename(user)
905            try:
906                param = {
907                        'credential': gcred, 
908                        'hrn': slicename,
909                        'type': 'Slice'
910                        }
911                segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt)
912            except segment_commands.ProtoGENIError, e:
913                print e
914                break
915
916        return slicename
917
918    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
919        """
920        Create the slice and allocate resources.  If any of this stuff fails,
921        the allocations will time out on PG in short order, so we just raise
922        the service_error.  Return the slice and sliver credentials as well as
923        the manifest.
924        """
925        try:
926            param = {
927                    'credential': gcred, 
928                    'hrn': slicename,
929                    'type': 'Slice'
930                    }
931            slice_cred = segment_commands.pg_call(self.sa_url, 'Register',
932                    param, ctxt)
933            #f = open("./slice_cred", "w")
934            #print >>f, slice_cred
935            #f.close()
936            # Populate the ssh keys (let PG format them)
937            param = {
938                    'credential': gcred, 
939                    }
940            keys =  segment_commands.pg_call(self.sa_url, 'GetKeys', param, 
941                    ctxt)
942            # Grab and redeem a ticket
943            param = {
944                    'credential': slice_cred, 
945                    'rspec': rspec,
946                    }
947            ticket = segment_commands.pg_call(self.cm_url, 'GetTicket', param,
948                    ctxt)
949            #f = open("./ticket", "w")
950            #print >>f, ticket
951            #f.close()
952            param = { 
953                    'credential': slice_cred, 
954                    'keys': keys,
955                    'ticket': ticket,
956                    }
957            sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 
958                    'RedeemTicket', param, ctxt)
959            #f = open("./sliver_cred", "w")
960            #print >>f, sliver_cred
961            #f.close()
962            #f = open("./manifest", "w")
963            #print >>f, manifest
964            #f.close()
965            # start 'em up
966            param = { 
967                    'credential': sliver_cred,
968                    }
969            segment_commands.pg_call(self.cm_url, 'StartSliver', param, ctxt)
970        except segment_commands.ProtoGENIError, e:
971            raise service_error(service_error.federant,
972                    "ProtoGENI: %s %s" % (e.code, e))
973
974        return (slice_cred, sliver_cred, manifest)
975
976    def wait_for_slice(self, segment_commands, slice_cred, ctxt, timeout=None):
977        """
978        Wait for the given slice to finish its startup.  Return the final
979        status.
980        """
981        status = 'notready'
982        if timeout is not None:
983            end = time.time() + timeout
984        try:
985            while status == 'notready':
986                param = { 
987                        'credential': slice_cred
988                        }
989                r = segment_commands.pg_call(self.cm_url,
990                        'SliceStatus', param, ctxt)
991                status = r.get('status', 'notready')
992                if status == 'notready':
993                    if timeout is not None and time.time() > end:
994                        return 'timeout'
995                    time.sleep(30)
996        except segment_commands.ProtoGENIError, e:
997            raise service_error(service_error.federant,
998                    "ProtoGENI: %s %s" % (e.code, e))
999
1000        return status
1001
1002    def delete_slice(self, segment_commands, slice_cred, ctxt):
1003        """
1004        Delete the slice resources.  An error from the service is ignores,
1005        because the soft state will go away anyway.
1006        """
1007        try:
1008            param = { 'credential': slice_cred }
1009            segment_commands.pg_call(self.cm_url, 'DeleteSlice',
1010                    param, ctxt)
1011        except segment_commands.ProtoGENIError, e:
1012            self.log.warn("ProtoGENI: %s" % e)
1013
1014
1015
1016    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1017            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1018            export_certfile, topo, connInfo, services, timeout=0):
1019        """
1020        Start a sub-experiment on a federant.
1021
1022        Get the current state, modify or create as appropriate, ship data
1023        and configs and start the experiment.  There are small ordering
1024        differences based on the initial state of the sub-experiment.
1025        """
1026
1027        # Local software dir
1028        lsoftdir = "%s/software" % tmpdir
1029        host = self.staging_host
1030
1031        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1032                certfile, certpw)
1033
1034        if not ctxt: return False
1035
1036        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1037        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1038        self.log.info("Creating %s" % slicename)
1039        slice_cred, sliver_cred, manifest = self.allocate_slice(
1040                segment_commands, slicename, rspec, gcred, ctxt)
1041
1042        # With manifest in hand, we can export the portal node names.
1043        if self.create_debug: nodes = self.fake_manifest(topo)
1044        else: nodes = self.manifest_to_dict(manifest)
1045
1046        self.export_store_info(export_certfile, nodes, self.ssh_port,
1047                connInfo)
1048        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1049                ename, connInfo, services, nodes)
1050
1051        # Copy software to the staging machine (done after generation to copy
1052        # those, too)
1053        for d in (tmpdir, lsoftdir):
1054            if os.path.isdir(d):
1055                for f in os.listdir(d):
1056                    if not os.path.isdir("%s/%s" % (d, f)):
1057                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1058                                user, host, "%s/%s" % (stagingdir, f)):
1059                            self.log.error("Scp failed")
1060                            return False
1061
1062
1063        # Now we wait for the nodes to start on PG
1064        status = self.wait_for_slice(segment_commands, slice_cred, ctxt,
1065                timeout=300)
1066        if status == 'failed':
1067            self.log.error('Sliver failed to start on ProtoGENI')
1068            self.delete_slice(segment_commands, slice_cred, ctxt)
1069            return False
1070        elif status == 'timeout':
1071            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1072            self.delete_slice(segment_commands, slice_cred, ctxt)
1073            return False
1074        else:
1075            # All good: save ProtoGENI info in shared state
1076            self.state_lock.acquire()
1077            self.allocation[aid]['slice_name'] = slicename
1078            self.allocation[aid]['slice_credential'] = slice_cred
1079            self.allocation[aid]['sliver_credential'] = sliver_cred
1080            self.allocation[aid]['manifest'] = manifest
1081            self.allocation[aid]['certfile'] = certfile
1082            self.allocation[aid]['certpw'] = certpw
1083            self.write_state()
1084            self.state_lock.release()
1085
1086        # Now we have configuration to do for ProtoGENI
1087        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1088                secretkey, stagingdir, tmpdir)
1089
1090        self.start_nodes(user, self.staging_host, 
1091                [ n.get('hostname', None) for n in nodes.values()],
1092                segment_commands)
1093
1094        # Everything has gone OK.
1095        return True, dict([(k, n.get('hostname', None)) \
1096                for k, n in nodes.items()])
1097
1098    def generate_rspec(self, topo, softdir, connInfo):
1099
1100        # Force a useful image.  Without picking this the nodes can get
1101        # different images and there is great pain.
1102        def image_filter(e):
1103            if isinstance(e, topdl.Computer):
1104                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1105                        'image+emulab-ops//FEDORA10-STD" />'
1106            else:
1107                return ""
1108        # Main line of generate
1109        t = topo.clone()
1110
1111        starts = { }
1112        # Weed out the things we aren't going to instantiate: Segments, portal
1113        # substrates, and portal interfaces.  (The copy in the for loop allows
1114        # us to delete from e.elements in side the for loop).  While we're
1115        # touching all the elements, we also adjust paths from the original
1116        # testbed to local testbed paths and put the federation commands and
1117        # startcommands into a dict so we can start them manually later.
1118        # ProtoGENI requires setup before the federation commands run, so we
1119        # run them by hand after we've seeded configurations.
1120        for e in [e for e in t.elements]:
1121            if isinstance(e, topdl.Segment):
1122                t.elements.remove(e)
1123            # Fix software paths
1124            for s in getattr(e, 'software', []):
1125                s.location = re.sub("^.*/", softdir, s.location)
1126            if isinstance(e, topdl.Computer):
1127                if e.get_attribute('portal') and self.portal_startcommand:
1128                    # Portals never have a user-specified start command
1129                    starts[e.name] = self.portal_startcommand
1130                elif self.node_startcommand:
1131                    if e.get_attribute('startup'):
1132                        starts[e.name] = "%s \\$USER '%s'" % \
1133                                (self.node_startcommand, 
1134                                        e.get_attribute('startup'))
1135                        e.remove_attribute('startup')
1136                    else:
1137                        starts[e.name] = self.node_startcommand
1138
1139                # Remove portal interfaces
1140                e.interface = [i for i in e.interface \
1141                        if not i.get_attribute('portal')]
1142
1143        t.substrates = [ s.clone() for s in t.substrates ]
1144        t.incorporate_elements()
1145
1146        # Customize the rspec output to use the image we like
1147        filters = [ image_filter ]
1148
1149        # Convert to rspec and return it
1150        exp_rspec = topdl.topology_to_rspec(t, filters)
1151
1152        return exp_rspec
1153
1154    def retrieve_software(self, topo, certfile, softdir):
1155        """
1156        Collect the software that nodes in the topology need loaded and stage
1157        it locally.  This implies retrieving it from the experiment_controller
1158        and placing it into softdir.  Certfile is used to prove that this node
1159        has access to that data (it's the allocation/segment fedid).  Finally
1160        local portal and federation software is also copied to the same staging
1161        directory for simplicity - all software needed for experiment creation
1162        is in softdir.
1163        """
1164        sw = set()
1165        for e in topo.elements:
1166            for s in getattr(e, 'software', []):
1167                sw.add(s.location)
1168        os.mkdir(softdir)
1169        for s in sw:
1170            self.log.debug("Retrieving %s" % s)
1171            try:
1172                get_url(s, certfile, softdir)
1173            except:
1174                t, v, st = sys.exc_info()
1175                raise service_error(service_error.internal,
1176                        "Error retrieving %s: %s" % (s, v))
1177
1178        # Copy local portal node software to the tempdir
1179        for s in (self.portal_software, self.federation_software):
1180            for l, f in s:
1181                base = os.path.basename(f)
1182                copy_file(f, "%s/%s" % (softdir, base))
1183
1184
1185    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1186        """
1187        Gather common configuration files, retrieve or create an experiment
1188        name and project name, and return the ssh_key filenames.  Create an
1189        allocation log bound to the state log variable as well.
1190        """
1191        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1192        ename = None
1193        pubkey_base = None
1194        secretkey_base = None
1195        alloc_log = None
1196
1197        for a in attrs:
1198            if a['attribute'] in configs:
1199                try:
1200                    self.log.debug("Retrieving %s" % a['value'])
1201                    get_url(a['value'], certfile, tmpdir)
1202                except:
1203                    t, v, st = sys.exc_info()
1204                    raise service_error(service_error.internal,
1205                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1206            if a['attribute'] == 'ssh_pubkey':
1207                pubkey_base = a['value'].rpartition('/')[2]
1208            if a['attribute'] == 'ssh_secretkey':
1209                secretkey_base = a['value'].rpartition('/')[2]
1210            if a['attribute'] == 'experiment_name':
1211                ename = a['value']
1212
1213        if not ename:
1214            ename = ""
1215            for i in range(0,5):
1216                ename += random.choice(string.ascii_letters)
1217            self.log.warn("No experiment name: picked one randomly: %s" \
1218                    % ename)
1219
1220        self.state_lock.acquire()
1221        if self.allocation.has_key(aid):
1222            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1223            self.allocation[aid]['experiment'] = ename
1224            self.allocation[aid]['log'] = [ ]
1225            # Create a logger that logs to the experiment's state object as
1226            # well as to the main log file.
1227            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1228            h = logging.StreamHandler(
1229                    list_log.list_log(self.allocation[aid]['log']))
1230            # XXX: there should be a global one of these rather than
1231            # repeating the code.
1232            h.setFormatter(logging.Formatter(
1233                "%(asctime)s %(name)s %(message)s",
1234                        '%d %b %y %H:%M:%S'))
1235            alloc_log.addHandler(h)
1236            self.write_state()
1237        else:
1238            self.log.error("No allocation for %s!?" % aid)
1239        self.state_lock.release()
1240
1241        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1242                cpw, alloc_log)
1243
1244    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1245        # Copy the assigned names into the return topology
1246        rvtopo = topo.clone()
1247        embedding = [ ]
1248        for k, n in nodes.items():
1249            embedding.append({ 
1250                'toponame': k,
1251                'physname': [n ],
1252                })
1253        # Grab the log (this is some anal locking, but better safe than
1254        # sorry)
1255        self.state_lock.acquire()
1256        logv = "".join(self.allocation[aid]['log'])
1257        # It's possible that the StartSegment call gets retried (!).
1258        # if the 'started' key is in the allocation, we'll return it rather
1259        # than redo the setup.
1260        self.allocation[aid]['started'] = { 
1261                'allocID': alloc_id,
1262                'allocationLog': logv,
1263                'segmentdescription': { 
1264                    'topdldescription': rvtopo.to_dict() },
1265                'embedding': embedding,
1266                }
1267        retval = copy.deepcopy(self.allocation[aid]['started'])
1268        self.write_state()
1269        self.state_lock.release()
1270
1271        return retval
1272
1273    def StartSegment(self, req, fid):
1274        err = None  # Any service_error generated after tmpdir is created
1275        rv = None   # Return value from segment creation
1276
1277        try:
1278            req = req['StartSegmentRequestBody']
1279            topref = req['segmentdescription']['topdldescription']
1280        except KeyError:
1281            raise service_error(service_error.req, "Badly formed request")
1282
1283        connInfo = req.get('connection', [])
1284        services = req.get('service', [])
1285        auth_attr = req['allocID']['fedid']
1286        aid = "%s" % auth_attr
1287        attrs = req.get('fedAttr', [])
1288        if not self.auth.check_attribute(fid, auth_attr):
1289            raise service_error(service_error.access, "Access denied")
1290        else:
1291            # See if this is a replay of an earlier succeeded StartSegment -
1292            # sometimes SSL kills 'em.  If so, replay the response rather than
1293            # redoing the allocation.
1294            self.state_lock.acquire()
1295            retval = self.allocation[aid].get('started', None)
1296            self.state_lock.release()
1297            if retval:
1298                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1299                        "replaying response")
1300                return retval
1301
1302        if topref:
1303            topo = topdl.Topology(**topref)
1304        else:
1305            raise service_error(service_error.req, 
1306                    "Request missing segmentdescription'")
1307
1308        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1309        try:
1310            tmpdir = tempfile.mkdtemp(prefix="access-")
1311            softdir = "%s/software" % tmpdir
1312        except EnvironmentError:
1313            raise service_error(service_error.internal, "Cannot create tmp dir")
1314
1315        # Try block alllows us to clean up temporary files.
1316        try:
1317            self.retrieve_software(topo, certfile, softdir)
1318            self.configure_userconf(services, tmpdir)
1319            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1320                cpw, alloc_log = self.initialize_experiment_info(attrs,
1321                        aid, certfile, tmpdir)
1322            self.import_store_info(certfile, connInfo)
1323            rspec = self.generate_rspec(topo, "%s/%s/" \
1324                    % (self.staging_dir, ename), connInfo)
1325
1326            segment_commands = protogeni_proxy(keyfile=ssh_key,
1327                    debug=self.create_debug, log=alloc_log,
1328                    ch_url = self.ch_url, sa_url=self.sa_url,
1329                    cm_url=self.cm_url)
1330            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1331                    pubkey_base, 
1332                    secretkey_base, ename,
1333                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1334                    certfile, topo, connInfo, services)
1335        except EnvironmentError, e:
1336            err = service_error(service_error.internal, "%s" % e)
1337        except service_error, e:
1338            err = e
1339        except:
1340            t, v, st = sys.exc_info()
1341            err = service_error(service_error.internal, "%s: %s" % \
1342                    (v, traceback.extract_tb(st)))
1343
1344        # Walk up tmpdir, deleting as we go
1345        if self.cleanup: self.remove_dirs(tmpdir)
1346        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1347
1348        if rv:
1349            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1350        elif err:
1351            raise service_error(service_error.federant,
1352                    "Swapin failed: %s" % err)
1353        else:
1354            raise service_error(service_error.federant, "Swapin failed")
1355
1356    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1357            certfile, certpw):
1358        """
1359        Stop a sub experiment by calling swapexp on the federant
1360        """
1361        host = self.staging_host
1362        rv = False
1363        try:
1364            # Clean out tar files: we've gone over quota in the past
1365            if stagingdir:
1366                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1367            if slice_cred:
1368                self.log.error('Removing Sliver on ProtoGENI')
1369                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1370                self.delete_slice(segment_commands, slice_cred, ctxt)
1371            return True
1372        except self.ssh_cmd_timeout:
1373            rv = False
1374        return rv
1375
1376    def TerminateSegment(self, req, fid):
1377        try:
1378            req = req['TerminateSegmentRequestBody']
1379        except KeyError:
1380            raise service_error(service_error.req, "Badly formed request")
1381
1382        auth_attr = req['allocID']['fedid']
1383        aid = "%s" % auth_attr
1384        attrs = req.get('fedAttr', [])
1385        if not self.auth.check_attribute(fid, auth_attr):
1386            raise service_error(service_error.access, "Access denied")
1387
1388        self.state_lock.acquire()
1389        if self.allocation.has_key(aid):
1390            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1391            slice_cred = self.allocation[aid].get('slice_credential', None)
1392            ename = self.allocation[aid].get('experiment', None)
1393        else:
1394            cf, user, ssh_key, cpw = (None, None, None, None)
1395            slice_cred = None
1396            ename = None
1397        self.state_lock.release()
1398
1399        if ename:
1400            staging = "%s/%s" % ( self.staging_dir, ename)
1401        else:
1402            self.log.warn("Can't find experiment name for %s" % aid)
1403            staging = None
1404
1405        segment_commands = protogeni_proxy(keyfile=ssh_key,
1406                debug=self.create_debug, ch_url = self.ch_url,
1407                sa_url=self.sa_url, cm_url=self.cm_url)
1408        self.stop_segment(segment_commands, user, staging, slice_cred, cf, cpw)
1409        return { 'allocID': req['allocID'] }
1410
1411    def renew_segment(self, segment_commands, name, scred, interval, 
1412            certfile, certpw):
1413        """
1414        Linear code through the segment renewal calls.
1415        """
1416        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1417        try:
1418            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1419                    time.gmtime(time.time() + interval))
1420            cred = segment_commands.pg_call(self.sa_url, 'GetCredential', 
1421                    {}, ctxt)
1422
1423            param = {
1424                    'credential': scred,
1425                    'expiration': expiration
1426                    }
1427            r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt)
1428            param = {
1429                    'credential': cred,
1430                    'hrn': name,
1431                    'type': 'Slice',
1432                    }
1433            slice = segment_commands.pg_call(self.sa_url, 'Resolve', 
1434                    param, ctxt)
1435            uuid = slice.get('uuid', None)
1436            if uuid == None:
1437                sys.exit('No uuid for %s' % slicename)
1438
1439            print 'Calling GetCredential (uuid)'
1440            param = {
1441                    'credential': cred,
1442                    'uuid': uuid,
1443                    'type': 'Slice',
1444                    }
1445            new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential',
1446                    param, ctxt)
1447            #f = open('./new_slice_cred', 'w')
1448            #print >>f, new_scred
1449            #f.close()
1450
1451        except segment_commands.ProtoGENIError, e:
1452            self.log.error("Failed to extend slice %s: %s" % (name, e))
1453            return None
1454        try:
1455            print 'Calling RenewSlice (CM)'
1456            param = {
1457                    'credential': new_scred,
1458                    }
1459            r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt)
1460        except segment_commands.ProtoGENIError, e:
1461            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1462
1463        return new_scred
1464   
1465
1466    def RenewSlices(self):
1467        self.log.info("Scanning for slices to renew")
1468        self.state_lock.acquire()
1469        aids = self.allocation.keys()
1470        self.state_lock.release()
1471
1472        for aid in aids:
1473            self.state_lock.acquire()
1474            if self.allocation.has_key(aid):
1475                name = self.allocation[aid].get('slice_name', None)
1476                scred = self.allocation[aid].get('slice_credential', None)
1477                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1478            else:
1479                name = None
1480                scred = None
1481            self.state_lock.release()
1482
1483            if not os.access(cf, os.R_OK):
1484                self.log.error(
1485                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1486                continue
1487
1488            # There's a ProtoGENI slice associated with the segment; renew it.
1489            if name and scred:
1490                segment_commands = protogeni_proxy(log=self.log, 
1491                        debug=self.create_debug, keyfile=ssh_key,
1492                        cm_url = self.cm_url, sa_url = self.sa_url,
1493                        ch_url = self.ch_url)
1494                new_scred = self.renew_segment(segment_commands, name, scred, 
1495                        self.renewal_interval, cf, cpw)
1496                if new_scred:
1497                    self.log.info("Slice %s renewed until %s GMT" % \
1498                            (name, time.asctime(time.gmtime(\
1499                                time.time()+self.renewal_interval))))
1500                    self.state_lock.acquire()
1501                    if self.allocation.has_key(aid):
1502                        self.allocation[aid]['slice_credential'] = new_scred
1503                    self.state_lock.release()
1504                else:
1505                    self.log.info("Failed to renew slice %s " % name)
1506
1507        # Let's do this all again soon.  (4 tries before the slices time out)   
1508        t = Timer(self.renewal_interval/4, self.RenewSlices)
1509        t.start()
Note: See TracBrowser for help on using the repository browser.