source: fedd/federation/protogeni_access.py @ 37ed9a5

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 37ed9a5 was 37ed9a5, checked in by Ted Faber <faber@…>, 14 years ago

Much more nicely factored now.

  • Property mode set to 100644
File size: 47.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14
15from threading import Thread, Timer, Lock
16from M2Crypto.SSL import SSLError
17
18from util import *
19from access_project import access_project
20from fedid import fedid, generate_fedid
21from authorizer import authorizer
22from service_error import service_error
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24
25import httplib
26import tempfile
27from urlparse import urlparse
28
29from access import access_base
30from proxy_segment import proxy_segment
31
32import topdl
33import list_log
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class protogeni_proxy(proxy_segment):
44    class ProtoGENIError(Exception): 
45        def __init__(self, op, code, output):
46            Exception.__init__(self, output)
47            self.op = op
48            self.code = code
49            self.output = output
50
51    def __init__(self, log=None, keyfile=None, debug=False, 
52            ch_url=None, sa_url=None, cm_url=None):
53        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
54
55        self.ProtoGENIError = protogeni_proxy.ProtoGENIError
56        self.ch_url = ch_url
57        self.sa_url = sa_url
58        self.cm_url = cm_url
59
60        self.call_SetValue = service_caller('SetValue')
61
62        self.debug_fail = ['Resolve']
63        self.debug_response = {
64                'RedeemTicket': ("XML blob1", "XML blob2"),
65                'SliceStatus': { 'status': 'ready' },
66            }
67
68
69    def pg_call(self, url, method, params, context):
70        max_retries = 5
71        retries = 0
72
73        s = service_caller(method, request_body_name="", strict=False)
74        self.log.debug("Calling %s %s" % (url, method))
75        if not self.debug:
76            while retries < max_retries:
77                r = s.call_xmlrpc_service(url, params, context=context)
78                code = r.get('code', -1)
79                if code == 0:
80                    # Success leaves the loop here
81                    return r.get('value', None)
82                elif code == 14 and retries +1 < max_retries:
83                    # Busy resource
84                    retries+= 1
85                    self.log.info("Resource busy, retrying in 30 secs")
86                    time.sleep(30)
87                else:
88                    # NB: out of retries falls through to here
89                    raise self.ProtoGENIError(op=method, 
90                            code=r.get('code', 'unknown'), 
91                            output=r.get('output', 'No output'))
92        else:
93            if method in self.debug_fail:
94                raise self.ProtoGENIError(op=method, code='unknown',
95                        output='No output')
96            elif self.debug_response.has_key(method):
97                return self.debug_response[method]
98            else:
99                return "%s XML blob" % method
100
101
102
103class access(access_base):
104    """
105    The implementation of access control based on mapping users to projects.
106
107    Users can be mapped to existing projects or have projects created
108    dynamically.  This implements both direct requests and proxies.
109    """
110
111    def __init__(self, config=None, auth=None):
112        """
113        Initializer.  Pulls parameters out of the ConfigParser's access section.
114        """
115
116        access_base.__init__(self, config, auth)
117
118        self.domain = config.get("access", "domain")
119        self.userconfdir = config.get("access","userconfdir")
120        self.userconfcmd = config.get("access","userconfcmd")
121        self.userconfurl = config.get("access","userconfurl")
122        self.federation_software = config.get("access", "federation_software")
123        self.portal_software = config.get("access", "portal_software")
124        self.ssh_port = config.get("access","ssh_port") or "22"
125        self.sshd = config.get("access","sshd")
126        self.sshd_config = config.get("access", "sshd_config")
127        self.access_type = config.get("access", "type")
128        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
129        self.staging_host = config.get("access", "staging_host") \
130                or "ops.emulab.net"
131        self.local_seer_software = config.get("access", "local_seer_software")
132        self.local_seer_image = config.get("access", "local_seer_image")
133        self.local_seer_start = config.get("access", "local_seer_start")
134   
135        self.dragon_endpoint = config.get("access", "dragon")
136        self.dragon_vlans = config.get("access", "dragon_vlans")
137        self.deter_internal = config.get("access", "deter_internal")
138
139        self.tunnel_config = config.getboolean("access", "tunnel_config")
140        self.portal_command = config.get("access", "portal_command")
141        self.portal_image = config.get("access", "portal_image")
142        self.portal_type = config.get("access", "portal_type") or "pc"
143        self.portal_startcommand = config.get("access", "portal_startcommand")
144        self.node_startcommand = config.get("access", "node_startcommand")
145
146        self.federation_software = self.software_list(self.federation_software)
147        self.portal_software = self.software_list(self.portal_software)
148        self.local_seer_software = self.software_list(self.local_seer_software)
149
150        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
151        self.renewal_interval = int(self.renewal_interval) * 60
152
153        self.ch_url = config.get("access", "ch_url")
154        self.sa_url = config.get("access", "sa_url")
155        self.cm_url = config.get("access", "cm_url")
156
157        self.restricted = [ ]
158
159        # read_state in the base_class
160        self.state_lock.acquire()
161        for a  in ('allocation', 'projects', 'keys', 'types'):
162            if a not in self.state:
163                self.state[a] = { }
164        self.allocation = self.state['allocation']
165        self.projects = self.state['projects']
166        self.keys = self.state['keys']
167        self.types = self.state['types']
168        # Add the ownership attributes to the authorizer.  Note that the
169        # indices of the allocation dict are strings, but the attributes are
170        # fedids, so there is a conversion.
171        for k in self.state.get('allocation', {}).keys():
172            for o in self.state['allocation'][k].get('owners', []):
173                self.auth.set_attribute(o, fedid(hexstr=k))
174            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
175
176        self.state_lock.release()
177
178
179        self.log = logging.getLogger("fedd.access")
180        set_log_level(config, "access", self.log)
181
182        self.access = { }
183        if config.has_option("access", "accessdb"):
184            self.read_access(config.get("access", "accessdb"), 
185                    access_obj=self.make_access_info)
186
187        self.lookup_access = self.lookup_access_base
188
189        self.call_SetValue = service_caller('SetValue')
190        self.call_GetValue = service_caller('GetValue')
191        self.exports = {
192                'local_seer_control': self.export_local_seer,
193                'seer_master': self.export_seer_master,
194                'hide_hosts': self.export_hide_hosts,
195                }
196
197        if not self.local_seer_image or not self.local_seer_software or \
198                not self.local_seer_start:
199            if 'local_seer_control' in self.exports:
200                del self.exports['local_seer_control']
201
202        if not self.local_seer_image or not self.local_seer_software or \
203                not self.seer_master_start:
204            if 'seer_master' in self.exports:
205                del self.exports['seer_master']
206
207        self.RenewSlices()
208
209        self.soap_services = {\
210            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
211            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
212            'StartSegment': soap_handler("StartSegment", self.StartSegment),
213            'TerminateSegment': soap_handler("TerminateSegment", 
214                self.TerminateSegment),
215            }
216        self.xmlrpc_services =  {\
217            'RequestAccess': xmlrpc_handler('RequestAccess',
218                self.RequestAccess),
219            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
220                self.ReleaseAccess),
221            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
222            'TerminateSegment': xmlrpc_handler('TerminateSegment',
223                self.TerminateSegment),
224            }
225
226    @staticmethod
227    def make_access_info(s):
228        """
229        Split a string of the form (id, id, id, id) ito its constituent tuples
230        and return them as a tuple.  Use to import access info from the
231        access_db.
232        """
233
234        ss = s.strip()
235        if ss.startswith('(') and ss.endswith(')'):
236            l = [ s.strip() for s  in ss[1:-1].split(",")]
237            if len(l) == 4:
238                return tuple(l)
239            else:
240                raise self.parse_error(
241                        "Exactly 4 elements in access info required")
242        else:
243            raise self.parse_error("Expecting parenthezied values")
244
245
246    def get_handler(self, path, fid):
247        self.log.info("Get handler %s %s" % (path, fid))
248        if self.auth.check_attribute(fid, path) and self.userconfdir:
249            return ("%s/%s" % (self.userconfdir, path), "application/binary")
250        else:
251            return (None, None)
252
253    def build_access_response(self, alloc_id, services):
254        """
255        Create the SOAP response.
256
257        Build the dictionary description of the response and use
258        fedd_utils.pack_soap to create the soap message.  ap is the allocate
259        project message returned from a remote project allocation (even if that
260        allocation was done locally).
261        """
262        # Because alloc_id is already a fedd_services_types.IDType_Holder,
263        # there's no need to repack it
264        msg = { 
265                'allocID': alloc_id,
266                'fedAttr': [
267                    { 'attribute': 'domain', 'value': self.domain } , 
268                ]
269            }
270        if self.dragon_endpoint:
271            msg['fedAttr'].append({'attribute': 'dragon',
272                'value': self.dragon_endpoint})
273        if self.deter_internal:
274            msg['fedAttr'].append({'attribute': 'deter_internal',
275                'value': self.deter_internal})
276        #XXX: ??
277        if self.dragon_vlans:
278            msg['fedAttr'].append({'attribute': 'vlans',
279                'value': self.dragon_vlans})
280
281        if services:
282            msg['service'] = services
283        return msg
284
285    def RequestAccess(self, req, fid):
286        """
287        Handle the access request.
288        """
289
290        # The dance to get into the request body
291        if req.has_key('RequestAccessRequestBody'):
292            req = req['RequestAccessRequestBody']
293        else:
294            raise service_error(service_error.req, "No request!?")
295
296        if req.has_key('destinationTestbed'):
297            dt = unpack_id(req['destinationTestbed'])
298
299        # Request for this fedd
300        found, match = self.lookup_access(req, fid)
301        services, svc_state = self.export_services(req.get('service',[]),
302                None, None)
303        # keep track of what's been added
304        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
305        aid = unicode(allocID)
306
307        self.state_lock.acquire()
308        self.allocation[aid] = { }
309        # The protoGENI certificate
310        self.allocation[aid]['credentials'] = found
311        # The list of owner FIDs
312        self.allocation[aid]['owners'] = [ fid ]
313        self.write_state()
314        self.state_lock.release()
315        self.auth.set_attribute(fid, allocID)
316        self.auth.set_attribute(allocID, allocID)
317
318        try:
319            f = open("%s/%s.pem" % (self.certdir, aid), "w")
320            print >>f, alloc_cert
321            f.close()
322        except EnvironmentError, e:
323            raise service_error(service_error.internal, 
324                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
325        return self.build_access_response({ 'fedid': allocID }, None)
326
327
328    def ReleaseAccess(self, req, fid):
329        # The dance to get into the request body
330        if req.has_key('ReleaseAccessRequestBody'):
331            req = req['ReleaseAccessRequestBody']
332        else:
333            raise service_error(service_error.req, "No request!?")
334
335        # Local request
336        try:
337            if req['allocID'].has_key('localname'):
338                auth_attr = aid = req['allocID']['localname']
339            elif req['allocID'].has_key('fedid'):
340                aid = unicode(req['allocID']['fedid'])
341                auth_attr = req['allocID']['fedid']
342            else:
343                raise service_error(service_error.req,
344                        "Only localnames and fedids are understood")
345        except KeyError:
346            raise service_error(service_error.req, "Badly formed request")
347
348        self.log.debug("[access] deallocation requested for %s", aid)
349        if not self.auth.check_attribute(fid, auth_attr):
350            self.log.debug("[access] deallocation denied for %s", aid)
351            raise service_error(service_error.access, "Access Denied")
352
353        self.state_lock.acquire()
354        if self.allocation.has_key(aid):
355            self.log.debug("Found allocation for %s" %aid)
356            del self.allocation[aid]
357            self.write_state()
358            self.state_lock.release()
359            # And remove the access cert
360            cf = "%s/%s.pem" % (self.certdir, aid)
361            self.log.debug("Removing %s" % cf)
362            os.remove(cf)
363            return { 'allocID': req['allocID'] } 
364        else:
365            self.state_lock.release()
366            raise service_error(service_error.req, "No such allocation")
367
368    def manifest_to_dict(self, manifest, ignore_debug=False):
369        """
370        Turn the manifest into a dict were each virtual nodename (i.e. the
371        topdl name) has an entry with the allocated machine in hostname and the
372        interfaces in 'interfaces'.  I love having XML parser code lying
373        around.
374        """
375        if self.create_debug and not ignore_debug: 
376            self.log.debug("Returning null manifest dict")
377            return { }
378
379        # The class allows us to keep a little state - the dict under
380        # consteruction and the current entry in that dict for the interface
381        # element code.
382        class manifest_parser:
383            def __init__(self):
384                self.d = { }
385                self.current_key=None
386
387            # If the element is a node, create a dict entry for it.  If it's an
388            # interface inside a node, add an entry in the interfaces list with
389            # the virtual name and component id.
390            def start_element(self, name, attrs):
391                if name == 'node':
392                    self.current_key = attrs.get('virtual_id',"")
393                    if self.current_key:
394                        self.d[self.current_key] = {
395                                'hostname': attrs.get('hostname', None),
396                                'interfaces': { }
397                                }
398                elif name == 'interface' and self.current_key:
399                    self.d[self.current_key]['interfaces']\
400                            [attrs.get('virtual_id','')] = \
401                            attrs.get('component_id', None)
402            #  When a node is finished, clear current_key
403            def end_element(self, name):
404                if name == 'node': self.current_key = None
405
406        node = { }
407
408        mp = manifest_parser()
409        p = xml.parsers.expat.ParserCreate()
410        # These are bound to the class we just created
411        p.StartElementHandler = mp.start_element
412        p.EndElementHandler = mp.end_element
413
414        p.Parse(manifest)
415        # Make the node dict that the callers expect
416        for k in mp.d:
417            node[k] = mp.d.get('hostname', '')
418        return mp.d
419
420    def fake_manifest(self, topo):
421        """
422        Fake the output of manifest_to_dict with a bunch of generic node an
423        interface names, for debugging.
424        """
425        node = { }
426        for i, e in enumerate([ e for e in topo.elements \
427                if isinstance(e, topdl.Computer)]):
428            node[e.name] = { 
429                    'hostname': "node%03d" % i,
430                    'interfaces': { }
431                    }
432            for j, inf in enumerate(e.interface):
433                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
434
435        return node
436
437
438    def generate_portal_configs(self, topo, pubkey_base, 
439            secretkey_base, tmpdir, leid, connInfo, services, nodes):
440
441        def conninfo_to_dict(key, info):
442            """
443            Make a cpoy of the connection information about key, and flatten it
444            into a single dict by parsing out any feddAttrs.
445            """
446
447            rv = None
448            for i in info:
449                if key == i.get('portal', "") or \
450                        key in [e.get('element', "") \
451                        for e in i.get('member', [])]:
452                    rv = i.copy()
453                    break
454
455            else:
456                return rv
457
458            if 'fedAttr' in rv:
459                for a in rv['fedAttr']:
460                    attr = a.get('attribute', "")
461                    val = a.get('value', "")
462                    if attr and attr not in rv:
463                        rv[attr] = val
464                del rv['fedAttr']
465            return rv
466
467        # XXX: un hardcode this
468        def client_null(f, s):
469            print >>f, "Service: %s" % s['name']
470       
471        def client_seer_master(f, s):
472            print >>f, 'PortalAlias: seer-master'
473
474        def client_smb(f, s):
475            print >>f, "Service: %s" % s['name']
476            smbshare = None
477            smbuser = None
478            smbproj = None
479            for a in s.get('fedAttr', []):
480                if a.get('attribute', '') == 'SMBSHARE':
481                    smbshare = a.get('value', None)
482                elif a.get('attribute', '') == 'SMBUSER':
483                    smbuser = a.get('value', None)
484                elif a.get('attribute', '') == 'SMBPROJ':
485                    smbproj = a.get('value', None)
486
487            if all((smbshare, smbuser, smbproj)):
488                print >>f, "SMBshare: %s" % smbshare
489                print >>f, "ProjectUser: %s" % smbuser
490                print >>f, "ProjectName: %s" % smbproj
491
492        def client_hide_hosts(f, s):
493            for a in s.get('fedAttr', [ ]):
494                if a.get('attribute', "") == 'hosts':
495                    print >>f, 'Hide: %s' % a.get('value', "")
496
497        client_service_out = {
498                'SMB': client_smb,
499                'tmcd': client_null,
500                'seer': client_null,
501                'userconfig': client_null,
502                'project_export': client_null,
503                'seer_master': client_seer_master,
504                'hide_hosts': client_hide_hosts,
505            }
506
507        def client_seer_master_export(f, s):
508            print >>f, "AddedNode: seer-master"
509
510        def client_seer_local_export(f, s):
511            print >>f, "AddedNode: control"
512
513        client_export_service_out = {
514                'seer_master': client_seer_master_export,
515                'local_seer_control': client_seer_local_export,
516            }
517
518        def server_port(f, s):
519            p = urlparse(s.get('server', 'http://localhost'))
520            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
521
522        def server_null(f,s): pass
523
524        def server_seer(f, s):
525            print >>f, 'seer: true'
526
527        server_service_out = {
528                'SMB': server_port,
529                'tmcd': server_port,
530                'userconfig': server_null,
531                'project_export': server_null,
532                'seer': server_seer,
533                'seer_master': server_port,
534                'hide_hosts': server_null,
535            }
536        # XXX: end un hardcode this
537
538
539        seer_out = False
540        client_out = False
541        for e in [ e for e in topo.elements \
542                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
543            myname = e.name
544            type = e.get_attribute('portal_type')
545
546            info = conninfo_to_dict(myname, connInfo)
547
548            if not info:
549                raise service_error(service_error.req,
550                        "No connectivity info for %s" % myname)
551
552            # Translate to physical name (ProtoGENI doesn't have DNS)
553            physname = nodes.get(myname, { }).get('hostname', None)
554            peer = info.get('peer', "")
555            ldomain = self.domain
556            ssh_port = info.get('ssh_port', 22)
557
558            # Collect this for the client.conf file
559            if 'masterexperiment' in info:
560                mproj, meid = info['masterexperiment'].split("/", 1)
561
562            active = info.get('active', 'False')
563
564            if type in ('control', 'both'):
565                testbed = e.get_attribute('testbed')
566                control_gw = myname
567
568            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
569            tunnelconfig = self.tunnel_config
570            try:
571                f = open(cfn, "w")
572                if active == 'True':
573                    print >>f, "active: True"
574                    print >>f, "ssh_port: %s" % ssh_port
575                    if type in ('control', 'both'):
576                        for s in [s for s in services \
577                                if s.get('name', "") in self.imports]:
578                            server_service_out[s['name']](f, s)
579
580                if tunnelconfig:
581                    print >>f, "tunnelip: %s" % tunnelconfig
582                print >>f, "peer: %s" % peer.lower()
583                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
584                        pubkey_base
585                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
586                        secretkey_base
587                f.close()
588            except EnvironmentError, e:
589                raise service_error(service_error.internal,
590                        "Can't write protal config %s: %s" % (cfn, e))
591
592        # Done with portals, write the client config file.
593        try:
594            f = open("%s/client.conf" % tmpdir, "w")
595            if control_gw:
596                print >>f, "ControlGateway: %s" % physname.lower()
597            for s in services:
598                if s.get('name',"") in self.imports and \
599                        s.get('visibility','') == 'import':
600                    client_service_out[s['name']](f, s)
601                if s.get('name', '') in self.exports and \
602                        s.get('visibility', '') == 'export' and \
603                        s['name'] in client_export_service_out:
604                    client_export_service_out[s['name']](f, s)
605            # Seer uses this.
606            if mproj and meid:
607                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
608            f.close()
609        except EnvironmentError, e:
610            raise service_error(service_error.internal,
611                    "Cannot write client.conf: %s" %s)
612
613
614
615    def export_store_info(self, cf, nodes, ssh_port, connInfo):
616        """
617        For the export requests in the connection info, install the peer names
618        at the experiment controller via SetValue calls.
619        """
620
621        for c in connInfo:
622            for p in [ p for p in c.get('parameter', []) \
623                    if p.get('type', '') == 'output']:
624
625                if p.get('name', '') == 'peer':
626                    k = p.get('key', None)
627                    surl = p.get('store', None)
628                    if surl and k and k.index('/') != -1:
629                        if self.create_debug:
630                            req = { 'name': k, 'value': 'debug' }
631                            self.call_SetValue(surl, req, cf)
632                        else:
633                            n = nodes.get(k[k.index('/')+1:], { })
634                            value = n.get('hostname', None)
635                            if value:
636                                req = { 'name': k, 'value': value }
637                                self.call_SetValue(surl, req, cf)
638                            else:
639                                self.log.error("No hostname for %s" % \
640                                        k[k.index('/'):])
641                    else:
642                        self.log.error("Bad export request: %s" % p)
643                elif p.get('name', '') == 'ssh_port':
644                    k = p.get('key', None)
645                    surl = p.get('store', None)
646                    if surl and k:
647                        req = { 'name': k, 'value': ssh_port }
648                        self.call_SetValue(surl, req, cf)
649                    else:
650                        self.log.error("Bad export request: %s" % p)
651                else:
652
653                    self.log.error("Unknown export parameter: %s" % \
654                            p.get('name'))
655                    continue
656
657    def write_node_config_script(self, elem, node, user, pubkey,
658            secretkey, stagingdir, tmpdir):
659        """
660        Write out the configuration script that is to run on the node
661        represented by elem in the topology.  This is called
662        once per node to configure.
663        """
664        # These little functions/functors just make things more readable.  Each
665        # one encapsulates a small task of copying software files or installing
666        # them.
667        class stage_file_type:
668            """
669            Write code copying file sfrom the staging host to the host on which
670            this will run.
671            """
672            def __init__(self, user, host, stagingdir):
673                self.user = user
674                self.host = host
675                self.stagingdir = stagingdir
676                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
677                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
678
679            def __call__(self, script, file, dest="."):
680                # If the file is a full pathname, do not use stagingdir
681                if file.find('/') == -1:
682                    file = "%s/%s" % (self.stagingdir, file)
683                print >>script, "%s %s@%s:%s %s" % \
684                        (self.scp, self.user, self.host, file, dest)
685
686        def install_tar(script, loc, base):
687            """
688            Print code to script to install a tarfile in loc.
689            """
690            tar = "/bin/tar"
691            mkdir="/bin/mkdir"
692
693            print >>script, "%s -p %s" % (mkdir, loc)
694            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
695
696        def install_rpm(script, base):
697            """
698            Print code to script to install an rpm
699            """
700            rpm = "/bin/rpm"
701            print >>script, "%s --install %s" % (rpm, base)
702
703        ifconfig = "/sbin/ifconfig"
704        stage_file = stage_file_type(user, self.staging_host, stagingdir)
705        pname = node.get('hostname', None)
706        fed_dir = "/usr/local/federation"
707        fed_etc_dir = "%s/etc" % fed_dir
708        fed_bin_dir = "%s/bin" % fed_dir
709        fed_lib_dir = "%s/lib" % fed_dir
710
711        if pname:
712            sfile = "%s/%s.startup" % (tmpdir, pname)
713            script = open(sfile, "w")
714            # Reset the interfaces to the ones in the topo file
715            for i in [ i for i in elem.interface \
716                    if not i.get_attribute('portal')]:
717                pinf = node['interfaces'].get(i.name, None)
718                addr = i.get_attribute('ip4_address') 
719                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
720                if pinf and addr:
721                    print >>script, \
722                            "%s %s %s netmask %s"  % \
723                            (ifconfig, pinf, addr, netmask)
724                else:
725                    self.log.error("Missing interface or address for %s" \
726                            % i.name)
727               
728            for l, f in self.federation_software:
729                base = os.path.basename(f)
730                stage_file(script, base)
731                if l: install_tar(script, l, base)
732                else: install_rpm(script, base)
733
734            for s in elem.software:
735                s_base = s.location.rpartition('/')[2]
736                stage_file(script, s_base)
737                if s.install: install_tar(script, s.install, s_base)
738                else: install_rpm(script, s_base)
739
740            for f in ('hosts', pubkey, secretkey, 'client.conf', 
741                    'userconf'):
742                stage_file(script, f, fed_etc_dir)
743            if self.sshd:
744                stage_file(script, self.sshd, fed_bin_dir)
745            if self.sshd_config:
746                stage_file(script, self.sshd_config, fed_etc_dir)
747
748            # Look in tmpdir to get the names.  They've all been copied
749            # into the (remote) staging dir
750            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
751                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
752
753            # Hackery dackery dock: the ProtoGENI python is really ancient.
754            # A modern version (though packaged for Mandrake (remember
755            # Mandrake?  good times, good times)) should be in the
756            # federation_software list, but we need to move rename is for
757            # SEER.
758            print >>script, "rm /usr/bin/python"
759            print >>script, "ln /usr/bin/python2.4 /usr/bin/python"
760            # Back to less hacky stuff
761
762            # Start commands
763            if elem.get_attribute('portal') and self.portal_startcommand:
764                # Install portal software
765                for l, f in self.portal_software:
766                    base = os.path.basename(f)
767                    stage_file(script, base)
768                    if l: install_tar(script, l, base)
769                    else: install_rpm(script, base)
770
771                # Portals never have a user-specified start command
772                print >>script, self.portal_startcommand
773            elif self.node_startcommand:
774                # XXX: debug
775                print >>script, "sudo perl -I%s %simport_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
776                # XXX: debug
777                if elem.get_attribute('startup'):
778                    print >>script, "%s \\$USER '%s'" % \
779                            (self.node_startcommand,
780                                    elem.get_attribute('startup'))
781                else:
782                    print >>script, self.node_startcommand
783            script.close()
784            return sfile, pname
785        else:
786            return None, None
787
788
789    def configure_nodes(self, segment_commands, topo, nodes, user, 
790            pubkey, secretkey, stagingdir, tmpdir):
791        """
792        For each node in the topology, generate a script file that copies
793        software onto it and installs it in the proper places and then runs the
794        startup command (including the federation commands.
795        """
796
797
798
799        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
800            vname = e.name
801            sfile, pname = self.write_node_config_script(e,
802                    nodes.get(vname, { }),
803                    user, pubkey, secretkey, stagingdir, tmpdir)
804            if sfile:
805                if not segment_commands.scp_file(sfile, user, pname):
806                    self.log.error("Could not copy script to %s" % pname)
807            else:
808                self.log.error("Unmapped node: %s" % vname)
809
810    def start_node(self, user, host, node, segment_commands):
811        """
812        Copy an identity to a node for the configuration script to be able to
813        import data and then run the startup script remotely.
814        """
815        # Place an identity on the node so that the copying can succeed
816        segment_commands.ssh_cmd(user, host, "scp .ssh/id_rsa %s:.ssh" % node)
817        segment_commands.ssh_cmd(user, node, 
818                "sudo /bin/sh ./%s.startup &" % node)
819
820    def start_nodes(self, user, host, nodes, segment_commands):
821        """
822        Start a thread to initialize each node and wait for them to complete.
823        Each thread runs start_node.
824        """
825        threads = [ ]
826        for n in nodes:
827            t = Thread(target=self.start_node, args=(user, host, n, 
828                segment_commands))
829            t.start()
830            threads.append(t)
831
832        done = [not t.isAlive() for t in threads]
833        while not all(done):
834            self.log.info("Waiting for threads %s" % done)
835            time.sleep(10)
836            done = [not t.isAlive() for t in threads]
837
838    def set_up_staging_filespace(self, segment_commands, user, host,
839            stagingdir):
840        """
841        Set up teh staging area on the staging machine.  To reduce the number
842        of ssh commands, we compose a script and execute it remotely.
843        """
844
845        self.log.info("[start_segment]: creating script file")
846        try:
847            sf, scriptname = tempfile.mkstemp()
848            scriptfile = os.fdopen(sf, 'w')
849        except EnvironmentError:
850            return False
851
852        scriptbase = os.path.basename(scriptname)
853
854        # Script the filesystem changes
855        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
856        print >>scriptfile, 'mkdir -p %s' % stagingdir
857        print >>scriptfile, "rm -f %s" % scriptbase
858        scriptfile.close()
859
860        # Move the script to the remote machine
861        # XXX: could collide tempfile names on the remote host
862        if segment_commands.scp_file(scriptname, user, host, scriptbase):
863            os.remove(scriptname)
864        else:
865            return False
866
867        # Execute the script (and the script's last line deletes it)
868        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
869            return False
870
871    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
872        """
873        Protogeni interactions take a context and a protogeni certificate.
874        This establishes both for later calls and returns them.
875        """
876        if os.access(certfile, os.R_OK):
877            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
878        else:
879            self.log.error("[start_segment]: Cannot read certfile: %s" % \
880                    certfile)
881            return None, None
882
883        try:
884            gcred = segment_commands.pg_call(self.sa_url, 
885                    'GetCredential', {}, ctxt)
886        except self.ProtoGENIError, e:
887            raise service_error(service_error.federant,
888                    "ProtoGENI: %s" % e)
889
890        return ctxt, gcred
891
892    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
893        """
894        Find a usable slice name by trying random ones until there's no
895        collision.
896        """
897
898        def random_slicename(user):
899            """
900            Return a random slicename by appending 5 letters to the username.
901            """
902            slicename = user
903            for i in range(0,5):
904                slicename += random.choice(string.ascii_letters)
905            return slicename
906
907        while True:
908            slicename = random_slicename(user)
909            try:
910                param = {
911                        'credential': gcred, 
912                        'hrn': slicename,
913                        'type': 'Slice'
914                        }
915                segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt)
916            except segment_commands.ProtoGENIError, e:
917                print e
918                break
919
920        return slicename
921
922    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
923        """
924        Create the slice and allocate resources.  If any of this stuff fails,
925        the allocations will time out on PG in short order, so we just raise
926        the service_error.  Return the slice and sliver credentials as well as
927        the manifest.
928        """
929        try:
930            param = {
931                    'credential': gcred, 
932                    'hrn': slicename,
933                    'type': 'Slice'
934                    }
935            slice_cred = segment_commands.pg_call(self.sa_url, 'Register',
936                    param, ctxt)
937            f = open("./slice_cred", "w")
938            print >>f, slice_cred
939            f.close()
940            # Populate the ssh keys (let PG format them)
941            param = {
942                    'credential': gcred, 
943                    }
944            keys =  segment_commands.pg_call(self.sa_url, 'GetKeys', param, 
945                    ctxt)
946            # Grab and redeem a ticket
947            param = {
948                    'credential': slice_cred, 
949                    'rspec': rspec,
950                    }
951            ticket = segment_commands.pg_call(self.cm_url, 'GetTicket', param,
952                    ctxt)
953            f = open("./ticket", "w")
954            print >>f, ticket
955            f.close()
956            param = { 
957                    'credential': slice_cred, 
958                    'keys': keys,
959                    'ticket': ticket,
960                    }
961            sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 
962                    'RedeemTicket', param, ctxt)
963            f = open("./sliver_cred", "w")
964            print >>f, sliver_cred
965            f.close()
966            f = open("./manifest", "w")
967            print >>f, manifest
968            f.close()
969            # start 'em up
970            param = { 
971                    'credential': sliver_cred,
972                    }
973            segment_commands.pg_call(self.cm_url, 'StartSliver', param, ctxt)
974        except segment_commands.ProtoGENIError, e:
975            raise service_error(service_error.federant,
976                    "ProtoGENI: %s %s" % (e.code, e))
977
978        return (slice_cred, sliver_cred, manifest)
979
980    def wait_for_slice(self, segment_commands, slice_cred, ctxt):
981        """
982        Wait for the given slice to finish its startup.  Return the final
983        status.
984        """
985        status = 'notready'
986        try:
987            while status == 'notready':
988                param = { 
989                        'credential': slice_cred
990                        }
991                r = segment_commands.pg_call(self.cm_url,
992                        'SliceStatus', param, ctxt)
993                status = r.get('status', 'notready')
994                if status == 'notready':
995                    time.sleep(30)
996        except segment_commands.ProtoGENIError, e:
997            raise service_error(service_error.federant,
998                    "ProtoGENI: %s %s" % (e.code, e))
999
1000        return status
1001
1002    def delete_slice(self, segment_commands, slice_cred, ctxt):
1003        """
1004        Delete the slice resources.  An error from the service is ignores,
1005        because the soft state will go away anyway.
1006        """
1007        try:
1008            param = { 'credential': slice_cred }
1009            segment_commands.pg_call(self.cm_url, 'DeleteSliver',
1010                    param, ctxt)
1011        except segment_commands.ProtoGENIError, e:
1012            self.log.warn("ProtoGENI: %s" % e)
1013
1014
1015
1016    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1017            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1018            export_certfile, topo, connInfo, services, timeout=0):
1019        """
1020        Start a sub-experiment on a federant.
1021
1022        Get the current state, modify or create as appropriate, ship data
1023        and configs and start the experiment.  There are small ordering
1024        differences based on the initial state of the sub-experiment.
1025        """
1026
1027        # Local software dir
1028        lsoftdir = "%s/software" % tmpdir
1029        host = self.staging_host
1030
1031        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1032                certfile, certpw)
1033
1034        if not ctxt: return False
1035
1036        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1037        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1038        self.log.info("Creating %s" % slicename)
1039        slice_cred, sliver_cred, manifest = self.allocate_slice(
1040                segment_commands, slicename, rspec, gcred, ctxt)
1041
1042        # With manifest in hand, we can export the portal node names.
1043        if self.create_debug: nodes = self.fake_manifest(topo)
1044        else: nodes = self.manifest_to_dict(manifest)
1045
1046        self.export_store_info(export_certfile, nodes, self.ssh_port,
1047                connInfo)
1048        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1049                ename, connInfo, services, nodes)
1050
1051        # Copy software to the staging machine (done after generation to copy
1052        # those, too)
1053        for d in (tmpdir, lsoftdir):
1054            if os.path.isdir(d):
1055                for f in os.listdir(d):
1056                    if not os.path.isdir("%s/%s" % (d, f)):
1057                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1058                                user, host, "%s/%s" % (stagingdir, f)):
1059                            self.log.error("Scp failed")
1060                            return False
1061
1062
1063        # Now we wait for the nodes to start on PG
1064        status = self.wait_for_slice(segment_commands, slice_cred, ctxt)
1065        if status == 'failed':
1066            self.log.error('Sliver failed to start on ProtoGENI')
1067            self.delete_slice(segment_commands, slice_cred, ctxt)
1068            return False
1069        else:
1070            # All good: save ProtoGENI info in shared state
1071            self.state_lock.acquire()
1072            self.allocation[aid]['slice_name'] = slicename
1073            self.allocation[aid]['slice_credential'] = slice_cred
1074            self.allocation[aid]['sliver_credential'] = sliver_cred
1075            self.allocation[aid]['manifest'] = manifest
1076            self.allocation[aid]['certfile'] = certfile
1077            self.allocation[aid]['certpw'] = certpw
1078            self.write_state()
1079            self.state_lock.release()
1080
1081        # Now we have configuration to do for ProtoGENI
1082        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1083                secretkey, stagingdir, tmpdir)
1084
1085        self.start_nodes(user, self.staging_host, 
1086                [ n.get('hostname', None) for n in nodes.values()],
1087                segment_commands)
1088
1089        # Everything has gone OK.
1090        return True, dict([(k, n.get('hostname', None)) \
1091                for k, n in nodes.items()])
1092
1093    def generate_rspec(self, topo, softdir, connInfo):
1094        t = topo.clone()
1095
1096        starts = { }
1097        # Weed out the things we aren't going to instantiate: Segments, portal
1098        # substrates, and portal interfaces.  (The copy in the for loop allows
1099        # us to delete from e.elements in side the for loop).  While we're
1100        # touching all the elements, we also adjust paths from the original
1101        # testbed to local testbed paths and put the federation commands and
1102        # startcommands into a dict so we can start them manually later.
1103        # ProtoGENI requires setup before the federation commands run, so we
1104        # run them by hand after we've seeded configurations.
1105        for e in [e for e in t.elements]:
1106            if isinstance(e, topdl.Segment):
1107                t.elements.remove(e)
1108            # Fix software paths
1109            for s in getattr(e, 'software', []):
1110                s.location = re.sub("^.*/", softdir, s.location)
1111            if isinstance(e, topdl.Computer):
1112                if e.get_attribute('portal') and self.portal_startcommand:
1113                    # Portals never have a user-specified start command
1114                    starts[e.name] = self.portal_startcommand
1115                elif self.node_startcommand:
1116                    if e.get_attribute('startup'):
1117                        starts[e.name] = "%s \\$USER '%s'" % \
1118                                (self.node_startcommand, 
1119                                        e.get_attribute('startup'))
1120                        e.remove_attribute('startup')
1121                    else:
1122                        starts[e.name] = self.node_startcommand
1123
1124                # Remove portal interfaces
1125                e.interface = [i for i in e.interface \
1126                        if not i.get_attribute('portal')]
1127
1128        t.substrates = [ s.clone() for s in t.substrates ]
1129        t.incorporate_elements()
1130
1131        # Customize the ns2 output for local portal commands and images
1132        filters = []
1133
1134        # NB: these are extra commands issued for the node, not the startcmds
1135        if self.portal_command:
1136            filters.append(topdl.generate_portal_command_filter(
1137                self.portal_command))
1138
1139        # Convert to rspec and return it
1140        exp_rspec = topdl.topology_to_rspec(t, filters)
1141
1142        return exp_rspec
1143
1144    def retrieve_software(self, topo, certfile, softdir):
1145        """
1146        Collect the software that nodes in the topology need loaded and stage
1147        it locally.  This implies retrieving it from the experiment_controller
1148        and placing it into softdir.  Certfile is used to prove that this node
1149        has access to that data (it's the allocation/segment fedid).  Finally
1150        local portal and federation software is also copied to the same staging
1151        directory for simplicity - all software needed for experiment creation
1152        is in softdir.
1153        """
1154        sw = set()
1155        for e in topo.elements:
1156            for s in getattr(e, 'software', []):
1157                sw.add(s.location)
1158        os.mkdir(softdir)
1159        for s in sw:
1160            self.log.debug("Retrieving %s" % s)
1161            try:
1162                get_url(s, certfile, softdir)
1163            except:
1164                t, v, st = sys.exc_info()
1165                raise service_error(service_error.internal,
1166                        "Error retrieving %s: %s" % (s, v))
1167
1168        # Copy local portal node software to the tempdir
1169        for s in (self.portal_software, self.federation_software):
1170            for l, f in s:
1171                base = os.path.basename(f)
1172                copy_file(f, "%s/%s" % (softdir, base))
1173
1174        # Ick.  Put this python rpm in a place that it will get moved into
1175        # the staging area.  It's a hack to install a modern (in a Roman
1176        # sense of modern) python on ProtoGENI
1177        python_rpm ="python2.4-2.4-1pydotorg.i586.rpm"
1178        if os.access("./%s" % python_rpm, os.R_OK):
1179            copy_file("./%s" % python_rpm, "%s/%s" % (softdir, python_rpm))
1180
1181
1182    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1183        """
1184        Gather common configuration files, retrieve or create an experiment
1185        name and project name, and return the ssh_key filenames.  Create an
1186        allocation log bound to the state log variable as well.
1187        """
1188        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1189        ename = None
1190        pubkey_base = None
1191        secretkey_base = None
1192        alloc_log = None
1193
1194        for a in attrs:
1195            if a['attribute'] in configs:
1196                try:
1197                    self.log.debug("Retrieving %s" % a['value'])
1198                    get_url(a['value'], certfile, tmpdir)
1199                except:
1200                    t, v, st = sys.exc_info()
1201                    raise service_error(service_error.internal,
1202                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1203            if a['attribute'] == 'ssh_pubkey':
1204                pubkey_base = a['value'].rpartition('/')[2]
1205            if a['attribute'] == 'ssh_secretkey':
1206                secretkey_base = a['value'].rpartition('/')[2]
1207            if a['attribute'] == 'experiment_name':
1208                ename = a['value']
1209
1210        if not ename:
1211            ename = ""
1212            for i in range(0,5):
1213                ename += random.choice(string.ascii_letters)
1214            self.log.warn("No experiment name: picked one randomly: %s" \
1215                    % ename)
1216
1217        self.state_lock.acquire()
1218        if self.allocation.has_key(aid):
1219            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1220            self.allocation[aid]['experiment'] = ename
1221            self.allocation[aid]['log'] = [ ]
1222            # Create a logger that logs to the experiment's state object as
1223            # well as to the main log file.
1224            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1225            h = logging.StreamHandler(
1226                    list_log.list_log(self.allocation[aid]['log']))
1227            # XXX: there should be a global one of these rather than
1228            # repeating the code.
1229            h.setFormatter(logging.Formatter(
1230                "%(asctime)s %(name)s %(message)s",
1231                        '%d %b %y %H:%M:%S'))
1232            alloc_log.addHandler(h)
1233            self.write_state()
1234        else:
1235            self.log.error("No allocation for %s!?" % aid)
1236        self.state_lock.release()
1237
1238        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1239                cpw, alloc_log)
1240
1241    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1242        # Copy the assigned names into the return topology
1243        rvtopo = topo.clone()
1244        embedding = [ ]
1245        for k, n in nodes.items():
1246            embedding.append({ 
1247                'toponame': k,
1248                'physname': ["%s%s" %  (n, self.domain)],
1249                })
1250        # Grab the log (this is some anal locking, but better safe than
1251        # sorry)
1252        self.state_lock.acquire()
1253        logv = "".join(self.allocation[aid]['log'])
1254        # It's possible that the StartSegment call gets retried (!).
1255        # if the 'started' key is in the allocation, we'll return it rather
1256        # than redo the setup.
1257        self.allocation[aid]['started'] = { 
1258                'allocID': alloc_id,
1259                'allocationLog': logv,
1260                'segmentdescription': { 
1261                    'topdldescription': rvtopo.to_dict() },
1262                'embedding': embedding,
1263                }
1264        retval = copy.deepcopy(self.allocation[aid]['started'])
1265        self.write_state()
1266        self.state_lock.release()
1267
1268        return retval
1269
1270    def StartSegment(self, req, fid):
1271        err = None  # Any service_error generated after tmpdir is created
1272        rv = None   # Return value from segment creation
1273
1274        try:
1275            req = req['StartSegmentRequestBody']
1276            topref = req['segmentdescription']['topdldescription']
1277        except KeyError:
1278            raise service_error(service_error.req, "Badly formed request")
1279
1280        connInfo = req.get('connection', [])
1281        services = req.get('service', [])
1282        auth_attr = req['allocID']['fedid']
1283        aid = "%s" % auth_attr
1284        attrs = req.get('fedAttr', [])
1285        if not self.auth.check_attribute(fid, auth_attr):
1286            raise service_error(service_error.access, "Access denied")
1287        else:
1288            # See if this is a replay of an earlier succeeded StartSegment -
1289            # sometimes SSL kills 'em.  If so, replay the response rather than
1290            # redoing the allocation.
1291            self.state_lock.acquire()
1292            retval = self.allocation[aid].get('started', None)
1293            self.state_lock.release()
1294            if retval:
1295                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1296                        "replaying response")
1297                return retval
1298
1299        if topref:
1300            topo = topdl.Topology(**topref)
1301        else:
1302            raise service_error(service_error.req, 
1303                    "Request missing segmentdescription'")
1304
1305        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1306        try:
1307            tmpdir = tempfile.mkdtemp(prefix="access-")
1308            softdir = "%s/software" % tmpdir
1309        except EnvironmentError:
1310            raise service_error(service_error.internal, "Cannot create tmp dir")
1311
1312        # Try block alllows us to clean up temporary files.
1313        try:
1314            self.retrieve_software(topo, certfile, softdir)
1315            self.configure_userconf(services, tmpdir)
1316            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1317                cpw, alloc_log = self.initialize_experiment_info(attrs,
1318                        aid, certfile, tmpdir)
1319            self.import_store_info(certfile, connInfo)
1320            rspec = self.generate_rspec(topo, "%s/%s/" \
1321                    % (self.staging_dir, ename), connInfo)
1322
1323            segment_commands = protogeni_proxy(keyfile=ssh_key,
1324                    debug=self.create_debug, log=alloc_log,
1325                    ch_url = self.ch_url, sa_url=self.sa_url,
1326                    cm_url=self.cm_url)
1327            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1328                    pubkey_base, 
1329                    secretkey_base, ename,
1330                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1331                    certfile, topo, connInfo, services)
1332        except EnvironmentError, e:
1333            err = service_error(service_error.internal, "%s" % e)
1334        except service_error, e:
1335            err = e
1336        except:
1337            t, v, st = sys.exc_info()
1338            err = service_error(service_error.internal, "%s: %s" % \
1339                    (v, traceback.extract_tb(st)))
1340
1341        # Walk up tmpdir, deleting as we go
1342        if self.cleanup: self.remove_dirs(tmpdir)
1343        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1344
1345        if rv:
1346            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1347        elif err:
1348            raise service_error(service_error.federant,
1349                    "Swapin failed: %s" % err)
1350        else:
1351            raise service_error(service_error.federant, "Swapin failed")
1352
1353    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1354            certfile, certpw):
1355        """
1356        Stop a sub experiment by calling swapexp on the federant
1357        """
1358        host = self.staging_host
1359        rv = False
1360        try:
1361            # Clean out tar files: we've gone over quota in the past
1362            if stagingdir:
1363                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1364            if slice_cred:
1365                self.log.error('Removing Sliver on ProtoGENI')
1366                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1367                self.delete_slice(segment_commands, slice_cred, ctxt)
1368            return True
1369        except self.ssh_cmd_timeout:
1370            rv = False
1371        return rv
1372
1373    def TerminateSegment(self, req, fid):
1374        try:
1375            req = req['TerminateSegmentRequestBody']
1376        except KeyError:
1377            raise service_error(service_error.req, "Badly formed request")
1378
1379        auth_attr = req['allocID']['fedid']
1380        aid = "%s" % auth_attr
1381        attrs = req.get('fedAttr', [])
1382        if not self.auth.check_attribute(fid, auth_attr):
1383            raise service_error(service_error.access, "Access denied")
1384
1385        self.state_lock.acquire()
1386        if self.allocation.has_key(aid):
1387            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1388            slice_cred = self.allocation[aid].get('slice_credential', None)
1389            ename = self.allocation[aid].get('experiment', None)
1390        else:
1391            cf, user, ssh_key, cpw = (None, None, None, None)
1392            slice_cred = None
1393            ename = None
1394        self.state_lock.release()
1395
1396        if ename:
1397            staging = "%s/%s" % ( self.staging_dir, ename)
1398        else:
1399            self.log.warn("Can't find experiment name for %s" % aid)
1400            staging = None
1401
1402        segment_commands = protogeni_proxy(keyfile=ssh_key,
1403                debug=self.create_debug, ch_url = self.ch_url,
1404                sa_url=self.sa_url, cm_url=self.cm_url)
1405        self.stop_segment(segment_commands, user, staging, slice_cred, cf, cpw)
1406        return { 'allocID': req['allocID'] }
1407
1408    def renew_segment(self, segment_commands, name, scred, interval, 
1409            certfile, certpw):
1410        """
1411        Linear code through the segment renewal calls.
1412        """
1413        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1414        try:
1415            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1416                    time.gmtime(time.time() + interval))
1417            cred = segment_commands.pg_call(self.sa_url, 'GetCredential', 
1418                    {}, ctxt)
1419
1420            param = {
1421                    'credential': scred,
1422                    'expiration': expiration
1423                    }
1424            r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt)
1425            param = {
1426                    'credential': cred,
1427                    'hrn': name,
1428                    'type': 'Slice',
1429                    }
1430            slice = segment_commands.pg_call(self.sa_url, 'Resolve', 
1431                    param, ctxt)
1432            uuid = slice.get('uuid', None)
1433            if uuid == None:
1434                sys.exit('No uuid for %s' % slicename)
1435
1436            print 'Calling GetCredential (uuid)'
1437            param = {
1438                    'credential': cred,
1439                    'uuid': uuid,
1440                    'type': 'Slice',
1441                    }
1442            new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential',
1443                    param, ctxt)
1444            f = open('./new_slice_cred', 'w')
1445            print >>f, new_scred
1446            f.close()
1447
1448        except segment_commands.ProtoGENIError, e:
1449            self.log.error("Failed to extend slice %s: %s" % (name, e))
1450            return None
1451        try:
1452            print 'Calling RenewSlice (CM)'
1453            param = {
1454                    'credential': new_scred,
1455                    }
1456            r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt)
1457        except segment_commands.ProtoGENIError, e:
1458            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1459
1460        return new_scred
1461   
1462
1463    def RenewSlices(self):
1464        self.log.info("Scanning for slices to renew")
1465        self.state_lock.acquire()
1466        aids = self.allocation.keys()
1467        self.state_lock.release()
1468
1469        for aid in aids:
1470            self.state_lock.acquire()
1471            if self.allocation.has_key(aid):
1472                name = self.allocation[aid].get('slice_name', None)
1473                scred = self.allocation[aid].get('slice_credential', None)
1474                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1475            else:
1476                name = None
1477                scred = None
1478            self.state_lock.release()
1479
1480            if not os.access(cf, os.R_OK):
1481                self.log.error(
1482                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1483                continue
1484
1485            # There's a ProtoGENI slice associated with the segment; renew it.
1486            if name and scred:
1487                segment_commands = protogeni_proxy(log=self.log, 
1488                        debug=self.create_debug, keyfile=ssh_key,
1489                        cm_url = self.cm_url, sa_url = self.sa_url,
1490                        ch_url = self.ch_url)
1491                new_scred = self.renew_segment(segment_commands, name, scred, 
1492                        self.renewal_interval, cf, cpw)
1493                if new_scred:
1494                    self.log.info("Slice %s renewed until %s GMT" % \
1495                            (name, time.asctime(time.gmtime(\
1496                                time.time()+self.renewal_interval))))
1497                    self.state_lock.acquire()
1498                    if self.allocation.has_key(aid):
1499                        self.allocation[aid]['slice_credential'] = new_scred
1500                    self.state_lock.release()
1501                else:
1502                    self.log.info("Failed to renew slice %s " % name)
1503
1504        # Let's do this all again soon.  (4 tries before the slices time out)   
1505        t = Timer(self.renewal_interval/4, self.RenewSlices)
1506        t.start()
Note: See TracBrowser for help on using the repository browser.