source: fedd/federation/protogeni_access.py @ c2f92c5

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since c2f92c5 was c2f92c5, checked in by Ted Faber <faber@…>, 14 years ago

The protoGENI OS version lurched forward. This code removes the hacky stuff
used to support the old version and attempts to insure that a useful OSversion
is always used - it is possible to get the old OS randomly at this time as
well.

The hacks to support an ancient OS version don't work on a modern version and
vice versa, so we moved forward to teh modern version.

fedkit updates are also required.

  • Property mode set to 100644
File size: 47.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17from M2Crypto.SSL import SSLError
18
19from util import *
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30from access import access_base
31from proxy_segment import proxy_segment
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class protogeni_proxy(proxy_segment):
45    class ProtoGENIError(Exception): 
46        def __init__(self, op, code, output):
47            Exception.__init__(self, output)
48            self.op = op
49            self.code = code
50            self.output = output
51
52    def __init__(self, log=None, keyfile=None, debug=False, 
53            ch_url=None, sa_url=None, cm_url=None):
54        proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
55
56        self.ProtoGENIError = protogeni_proxy.ProtoGENIError
57        self.ch_url = ch_url
58        self.sa_url = sa_url
59        self.cm_url = cm_url
60
61        self.call_SetValue = service_caller('SetValue')
62
63        self.debug_fail = ['Resolve']
64        self.debug_response = {
65                'RedeemTicket': ("XML blob1", "XML blob2"),
66                'SliceStatus': { 'status': 'ready' },
67            }
68
69
70    def pg_call(self, url, method, params, context):
71        max_retries = 5
72        retries = 0
73
74        s = service_caller(method, request_body_name="", strict=False)
75        self.log.debug("Calling %s %s" % (url, method))
76        if not self.debug:
77            while retries < max_retries:
78                r = s.call_xmlrpc_service(url, params, context=context)
79                code = r.get('code', -1)
80                if code == 0:
81                    # Success leaves the loop here
82                    return r.get('value', None)
83                elif code == 14 and retries +1 < max_retries:
84                    # Busy resource
85                    retries+= 1
86                    self.log.info("Resource busy, retrying in 30 secs")
87                    time.sleep(30)
88                else:
89                    # NB: out of retries falls through to here
90                    raise self.ProtoGENIError(op=method, 
91                            code=r.get('code', 'unknown'), 
92                            output=r.get('output', 'No output'))
93        else:
94            if method in self.debug_fail:
95                raise self.ProtoGENIError(op=method, code='unknown',
96                        output='No output')
97            elif self.debug_response.has_key(method):
98                return self.debug_response[method]
99            else:
100                return "%s XML blob" % method
101
102
103
104class access(access_base):
105    """
106    The implementation of access control based on mapping users to projects.
107
108    Users can be mapped to existing projects or have projects created
109    dynamically.  This implements both direct requests and proxies.
110    """
111
112    def __init__(self, config=None, auth=None):
113        """
114        Initializer.  Pulls parameters out of the ConfigParser's access section.
115        """
116
117        access_base.__init__(self, config, auth)
118
119        self.domain = config.get("access", "domain")
120        self.userconfdir = config.get("access","userconfdir")
121        self.userconfcmd = config.get("access","userconfcmd")
122        self.userconfurl = config.get("access","userconfurl")
123        self.federation_software = config.get("access", "federation_software")
124        self.portal_software = config.get("access", "portal_software")
125        self.ssh_port = config.get("access","ssh_port") or "22"
126        self.sshd = config.get("access","sshd")
127        self.sshd_config = config.get("access", "sshd_config")
128        self.access_type = config.get("access", "type")
129        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
130        self.staging_host = config.get("access", "staging_host") \
131                or "ops.emulab.net"
132        self.local_seer_software = config.get("access", "local_seer_software")
133        self.local_seer_image = config.get("access", "local_seer_image")
134        self.local_seer_start = config.get("access", "local_seer_start")
135   
136        self.dragon_endpoint = config.get("access", "dragon")
137        self.dragon_vlans = config.get("access", "dragon_vlans")
138        self.deter_internal = config.get("access", "deter_internal")
139
140        self.tunnel_config = config.getboolean("access", "tunnel_config")
141        self.portal_command = config.get("access", "portal_command")
142        self.portal_image = config.get("access", "portal_image")
143        self.portal_type = config.get("access", "portal_type") or "pc"
144        self.portal_startcommand = config.get("access", "portal_startcommand")
145        self.node_startcommand = config.get("access", "node_startcommand")
146
147        self.federation_software = self.software_list(self.federation_software)
148        self.portal_software = self.software_list(self.portal_software)
149        self.local_seer_software = self.software_list(self.local_seer_software)
150
151        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
152        self.renewal_interval = int(self.renewal_interval) * 60
153
154        self.ch_url = config.get("access", "ch_url")
155        self.sa_url = config.get("access", "sa_url")
156        self.cm_url = config.get("access", "cm_url")
157
158        self.restricted = [ ]
159
160        # read_state in the base_class
161        self.state_lock.acquire()
162        for a  in ('allocation', 'projects', 'keys', 'types'):
163            if a not in self.state:
164                self.state[a] = { }
165        self.allocation = self.state['allocation']
166        self.projects = self.state['projects']
167        self.keys = self.state['keys']
168        self.types = self.state['types']
169        # Add the ownership attributes to the authorizer.  Note that the
170        # indices of the allocation dict are strings, but the attributes are
171        # fedids, so there is a conversion.
172        for k in self.state.get('allocation', {}).keys():
173            for o in self.state['allocation'][k].get('owners', []):
174                self.auth.set_attribute(o, fedid(hexstr=k))
175            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
176
177        self.state_lock.release()
178
179
180        self.log = logging.getLogger("fedd.access")
181        set_log_level(config, "access", self.log)
182
183        self.access = { }
184        if config.has_option("access", "accessdb"):
185            self.read_access(config.get("access", "accessdb"), 
186                    access_obj=self.make_access_info)
187
188        self.lookup_access = self.lookup_access_base
189
190        self.call_SetValue = service_caller('SetValue')
191        self.call_GetValue = service_caller('GetValue')
192        self.exports = {
193                'local_seer_control': self.export_local_seer,
194                'seer_master': self.export_seer_master,
195                'hide_hosts': self.export_hide_hosts,
196                }
197
198        if not self.local_seer_image or not self.local_seer_software or \
199                not self.local_seer_start:
200            if 'local_seer_control' in self.exports:
201                del self.exports['local_seer_control']
202
203        if not self.local_seer_image or not self.local_seer_software or \
204                not self.seer_master_start:
205            if 'seer_master' in self.exports:
206                del self.exports['seer_master']
207
208        self.RenewSlices()
209
210        self.soap_services = {\
211            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
212            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
213            'StartSegment': soap_handler("StartSegment", self.StartSegment),
214            'TerminateSegment': soap_handler("TerminateSegment", 
215                self.TerminateSegment),
216            }
217        self.xmlrpc_services =  {\
218            'RequestAccess': xmlrpc_handler('RequestAccess',
219                self.RequestAccess),
220            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
221                self.ReleaseAccess),
222            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
223            'TerminateSegment': xmlrpc_handler('TerminateSegment',
224                self.TerminateSegment),
225            }
226
227    @staticmethod
228    def make_access_info(s):
229        """
230        Split a string of the form (id, id, id, id) ito its constituent tuples
231        and return them as a tuple.  Use to import access info from the
232        access_db.
233        """
234
235        ss = s.strip()
236        if ss.startswith('(') and ss.endswith(')'):
237            l = [ s.strip() for s  in ss[1:-1].split(",")]
238            if len(l) == 4:
239                return tuple(l)
240            else:
241                raise self.parse_error(
242                        "Exactly 4 elements in access info required")
243        else:
244            raise self.parse_error("Expecting parenthezied values")
245
246
247    def get_handler(self, path, fid):
248        self.log.info("Get handler %s %s" % (path, fid))
249        if self.auth.check_attribute(fid, path) and self.userconfdir:
250            return ("%s/%s" % (self.userconfdir, path), "application/binary")
251        else:
252            return (None, None)
253
254    def build_access_response(self, alloc_id, services):
255        """
256        Create the SOAP response.
257
258        Build the dictionary description of the response and use
259        fedd_utils.pack_soap to create the soap message.  ap is the allocate
260        project message returned from a remote project allocation (even if that
261        allocation was done locally).
262        """
263        # Because alloc_id is already a fedd_services_types.IDType_Holder,
264        # there's no need to repack it
265        msg = { 
266                'allocID': alloc_id,
267                'fedAttr': [
268                    { 'attribute': 'domain', 'value': self.domain } , 
269                ]
270            }
271        if self.dragon_endpoint:
272            msg['fedAttr'].append({'attribute': 'dragon',
273                'value': self.dragon_endpoint})
274        if self.deter_internal:
275            msg['fedAttr'].append({'attribute': 'deter_internal',
276                'value': self.deter_internal})
277        #XXX: ??
278        if self.dragon_vlans:
279            msg['fedAttr'].append({'attribute': 'vlans',
280                'value': self.dragon_vlans})
281
282        if services:
283            msg['service'] = services
284        return msg
285
286    def RequestAccess(self, req, fid):
287        """
288        Handle the access request.
289        """
290
291        # The dance to get into the request body
292        if req.has_key('RequestAccessRequestBody'):
293            req = req['RequestAccessRequestBody']
294        else:
295            raise service_error(service_error.req, "No request!?")
296
297        if req.has_key('destinationTestbed'):
298            dt = unpack_id(req['destinationTestbed'])
299
300        # Request for this fedd
301        found, match = self.lookup_access(req, fid)
302        services, svc_state = self.export_services(req.get('service',[]),
303                None, None)
304        # keep track of what's been added
305        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
306        aid = unicode(allocID)
307
308        self.state_lock.acquire()
309        self.allocation[aid] = { }
310        # The protoGENI certificate
311        self.allocation[aid]['credentials'] = found
312        # The list of owner FIDs
313        self.allocation[aid]['owners'] = [ fid ]
314        self.write_state()
315        self.state_lock.release()
316        self.auth.set_attribute(fid, allocID)
317        self.auth.set_attribute(allocID, allocID)
318
319        try:
320            f = open("%s/%s.pem" % (self.certdir, aid), "w")
321            print >>f, alloc_cert
322            f.close()
323        except EnvironmentError, e:
324            raise service_error(service_error.internal, 
325                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
326        return self.build_access_response({ 'fedid': allocID }, None)
327
328
329    def ReleaseAccess(self, req, fid):
330        # The dance to get into the request body
331        if req.has_key('ReleaseAccessRequestBody'):
332            req = req['ReleaseAccessRequestBody']
333        else:
334            raise service_error(service_error.req, "No request!?")
335
336        # Local request
337        try:
338            if req['allocID'].has_key('localname'):
339                auth_attr = aid = req['allocID']['localname']
340            elif req['allocID'].has_key('fedid'):
341                aid = unicode(req['allocID']['fedid'])
342                auth_attr = req['allocID']['fedid']
343            else:
344                raise service_error(service_error.req,
345                        "Only localnames and fedids are understood")
346        except KeyError:
347            raise service_error(service_error.req, "Badly formed request")
348
349        self.log.debug("[access] deallocation requested for %s", aid)
350        if not self.auth.check_attribute(fid, auth_attr):
351            self.log.debug("[access] deallocation denied for %s", aid)
352            raise service_error(service_error.access, "Access Denied")
353
354        self.state_lock.acquire()
355        if self.allocation.has_key(aid):
356            self.log.debug("Found allocation for %s" %aid)
357            del self.allocation[aid]
358            self.write_state()
359            self.state_lock.release()
360            # And remove the access cert
361            cf = "%s/%s.pem" % (self.certdir, aid)
362            self.log.debug("Removing %s" % cf)
363            os.remove(cf)
364            return { 'allocID': req['allocID'] } 
365        else:
366            self.state_lock.release()
367            raise service_error(service_error.req, "No such allocation")
368
369    def manifest_to_dict(self, manifest, ignore_debug=False):
370        """
371        Turn the manifest into a dict were each virtual nodename (i.e. the
372        topdl name) has an entry with the allocated machine in hostname and the
373        interfaces in 'interfaces'.  I love having XML parser code lying
374        around.
375        """
376        if self.create_debug and not ignore_debug: 
377            self.log.debug("Returning null manifest dict")
378            return { }
379
380        # The class allows us to keep a little state - the dict under
381        # consteruction and the current entry in that dict for the interface
382        # element code.
383        class manifest_parser:
384            def __init__(self):
385                self.d = { }
386                self.current_key=None
387
388            # If the element is a node, create a dict entry for it.  If it's an
389            # interface inside a node, add an entry in the interfaces list with
390            # the virtual name and component id.
391            def start_element(self, name, attrs):
392                if name == 'node':
393                    self.current_key = attrs.get('virtual_id',"")
394                    if self.current_key:
395                        self.d[self.current_key] = {
396                                'hostname': attrs.get('hostname', None),
397                                'interfaces': { }
398                                }
399                elif name == 'interface' and self.current_key:
400                    self.d[self.current_key]['interfaces']\
401                            [attrs.get('virtual_id','')] = \
402                            attrs.get('component_id', None)
403            #  When a node is finished, clear current_key
404            def end_element(self, name):
405                if name == 'node': self.current_key = None
406
407        node = { }
408
409        mp = manifest_parser()
410        p = xml.parsers.expat.ParserCreate()
411        # These are bound to the class we just created
412        p.StartElementHandler = mp.start_element
413        p.EndElementHandler = mp.end_element
414
415        p.Parse(manifest)
416        # Make the node dict that the callers expect
417        for k in mp.d:
418            node[k] = mp.d.get('hostname', '')
419        return mp.d
420
421    def fake_manifest(self, topo):
422        """
423        Fake the output of manifest_to_dict with a bunch of generic node an
424        interface names, for debugging.
425        """
426        node = { }
427        for i, e in enumerate([ e for e in topo.elements \
428                if isinstance(e, topdl.Computer)]):
429            node[e.name] = { 
430                    'hostname': "node%03d" % i,
431                    'interfaces': { }
432                    }
433            for j, inf in enumerate(e.interface):
434                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
435
436        return node
437
438
439    def generate_portal_configs(self, topo, pubkey_base, 
440            secretkey_base, tmpdir, leid, connInfo, services, nodes):
441
442        def conninfo_to_dict(key, info):
443            """
444            Make a cpoy of the connection information about key, and flatten it
445            into a single dict by parsing out any feddAttrs.
446            """
447
448            rv = None
449            for i in info:
450                if key == i.get('portal', "") or \
451                        key in [e.get('element', "") \
452                        for e in i.get('member', [])]:
453                    rv = i.copy()
454                    break
455
456            else:
457                return rv
458
459            if 'fedAttr' in rv:
460                for a in rv['fedAttr']:
461                    attr = a.get('attribute', "")
462                    val = a.get('value', "")
463                    if attr and attr not in rv:
464                        rv[attr] = val
465                del rv['fedAttr']
466            return rv
467
468        # XXX: un hardcode this
469        def client_null(f, s):
470            print >>f, "Service: %s" % s['name']
471       
472        def client_seer_master(f, s):
473            print >>f, 'PortalAlias: seer-master'
474
475        def client_smb(f, s):
476            print >>f, "Service: %s" % s['name']
477            smbshare = None
478            smbuser = None
479            smbproj = None
480            for a in s.get('fedAttr', []):
481                if a.get('attribute', '') == 'SMBSHARE':
482                    smbshare = a.get('value', None)
483                elif a.get('attribute', '') == 'SMBUSER':
484                    smbuser = a.get('value', None)
485                elif a.get('attribute', '') == 'SMBPROJ':
486                    smbproj = a.get('value', None)
487
488            if all((smbshare, smbuser, smbproj)):
489                print >>f, "SMBshare: %s" % smbshare
490                print >>f, "ProjectUser: %s" % smbuser
491                print >>f, "ProjectName: %s" % smbproj
492
493        def client_hide_hosts(f, s):
494            for a in s.get('fedAttr', [ ]):
495                if a.get('attribute', "") == 'hosts':
496                    print >>f, 'Hide: %s' % a.get('value', "")
497
498        client_service_out = {
499                'SMB': client_smb,
500                'tmcd': client_null,
501                'seer': client_null,
502                'userconfig': client_null,
503                'project_export': client_null,
504                'seer_master': client_seer_master,
505                'hide_hosts': client_hide_hosts,
506            }
507
508        def client_seer_master_export(f, s):
509            print >>f, "AddedNode: seer-master"
510
511        def client_seer_local_export(f, s):
512            print >>f, "AddedNode: control"
513
514        client_export_service_out = {
515                'seer_master': client_seer_master_export,
516                'local_seer_control': client_seer_local_export,
517            }
518
519        def server_port(f, s):
520            p = urlparse(s.get('server', 'http://localhost'))
521            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
522
523        def server_null(f,s): pass
524
525        def server_seer(f, s):
526            print >>f, 'seer: true'
527
528        server_service_out = {
529                'SMB': server_port,
530                'tmcd': server_port,
531                'userconfig': server_null,
532                'project_export': server_null,
533                'seer': server_seer,
534                'seer_master': server_port,
535                'hide_hosts': server_null,
536            }
537        # XXX: end un hardcode this
538
539
540        seer_out = False
541        client_out = False
542        for e in [ e for e in topo.elements \
543                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
544            myname = e.name
545            type = e.get_attribute('portal_type')
546
547            info = conninfo_to_dict(myname, connInfo)
548
549            if not info:
550                raise service_error(service_error.req,
551                        "No connectivity info for %s" % myname)
552
553            # Translate to physical name (ProtoGENI doesn't have DNS)
554            physname = nodes.get(myname, { }).get('hostname', None)
555            peer = info.get('peer', "")
556            ldomain = self.domain
557            ssh_port = info.get('ssh_port', 22)
558
559            # Collect this for the client.conf file
560            if 'masterexperiment' in info:
561                mproj, meid = info['masterexperiment'].split("/", 1)
562
563            active = info.get('active', 'False')
564
565            if type in ('control', 'both'):
566                testbed = e.get_attribute('testbed')
567                control_gw = myname
568
569            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
570            tunnelconfig = self.tunnel_config
571            try:
572                f = open(cfn, "w")
573                if active == 'True':
574                    print >>f, "active: True"
575                    print >>f, "ssh_port: %s" % ssh_port
576                    if type in ('control', 'both'):
577                        for s in [s for s in services \
578                                if s.get('name', "") in self.imports]:
579                            server_service_out[s['name']](f, s)
580
581                if tunnelconfig:
582                    print >>f, "tunnelip: %s" % tunnelconfig
583                print >>f, "peer: %s" % peer.lower()
584                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
585                        pubkey_base
586                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
587                        secretkey_base
588                f.close()
589            except EnvironmentError, e:
590                raise service_error(service_error.internal,
591                        "Can't write protal config %s: %s" % (cfn, e))
592
593        # Done with portals, write the client config file.
594        try:
595            f = open("%s/client.conf" % tmpdir, "w")
596            if control_gw:
597                print >>f, "ControlGateway: %s" % physname.lower()
598            for s in services:
599                if s.get('name',"") in self.imports and \
600                        s.get('visibility','') == 'import':
601                    client_service_out[s['name']](f, s)
602                if s.get('name', '') in self.exports and \
603                        s.get('visibility', '') == 'export' and \
604                        s['name'] in client_export_service_out:
605                    client_export_service_out[s['name']](f, s)
606            # Seer uses this.
607            if mproj and meid:
608                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
609            f.close()
610        except EnvironmentError, e:
611            raise service_error(service_error.internal,
612                    "Cannot write client.conf: %s" %s)
613
614
615
616    def export_store_info(self, cf, nodes, ssh_port, connInfo):
617        """
618        For the export requests in the connection info, install the peer names
619        at the experiment controller via SetValue calls.
620        """
621
622        for c in connInfo:
623            for p in [ p for p in c.get('parameter', []) \
624                    if p.get('type', '') == 'output']:
625
626                if p.get('name', '') == 'peer':
627                    k = p.get('key', None)
628                    surl = p.get('store', None)
629                    if surl and k and k.index('/') != -1:
630                        if self.create_debug:
631                            req = { 'name': k, 'value': 'debug' }
632                            self.call_SetValue(surl, req, cf)
633                        else:
634                            n = nodes.get(k[k.index('/')+1:], { })
635                            value = n.get('hostname', None)
636                            if value:
637                                req = { 'name': k, 'value': value }
638                                self.call_SetValue(surl, req, cf)
639                            else:
640                                self.log.error("No hostname for %s" % \
641                                        k[k.index('/'):])
642                    else:
643                        self.log.error("Bad export request: %s" % p)
644                elif p.get('name', '') == 'ssh_port':
645                    k = p.get('key', None)
646                    surl = p.get('store', None)
647                    if surl and k:
648                        req = { 'name': k, 'value': ssh_port }
649                        self.call_SetValue(surl, req, cf)
650                    else:
651                        self.log.error("Bad export request: %s" % p)
652                else:
653
654                    self.log.error("Unknown export parameter: %s" % \
655                            p.get('name'))
656                    continue
657
658    def write_node_config_script(self, elem, node, user, pubkey,
659            secretkey, stagingdir, tmpdir):
660        """
661        Write out the configuration script that is to run on the node
662        represented by elem in the topology.  This is called
663        once per node to configure.
664        """
665        # These little functions/functors just make things more readable.  Each
666        # one encapsulates a small task of copying software files or installing
667        # them.
668        class stage_file_type:
669            """
670            Write code copying file sfrom the staging host to the host on which
671            this will run.
672            """
673            def __init__(self, user, host, stagingdir):
674                self.user = user
675                self.host = host
676                self.stagingdir = stagingdir
677                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
678                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
679
680            def __call__(self, script, file, dest="."):
681                # If the file is a full pathname, do not use stagingdir
682                if file.find('/') == -1:
683                    file = "%s/%s" % (self.stagingdir, file)
684                print >>script, "%s %s@%s:%s %s" % \
685                        (self.scp, self.user, self.host, file, dest)
686
687        def install_tar(script, loc, base):
688            """
689            Print code to script to install a tarfile in loc.
690            """
691            tar = "/bin/tar"
692            mkdir="/bin/mkdir"
693
694            print >>script, "%s -p %s" % (mkdir, loc)
695            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
696
697        def install_rpm(script, base):
698            """
699            Print code to script to install an rpm
700            """
701            rpm = "/bin/rpm"
702            print >>script, "%s --install %s" % (rpm, base)
703
704        ifconfig = "/sbin/ifconfig"
705        stage_file = stage_file_type(user, self.staging_host, stagingdir)
706        pname = node.get('hostname', None)
707        fed_dir = "/usr/local/federation"
708        fed_etc_dir = "%s/etc" % fed_dir
709        fed_bin_dir = "%s/bin" % fed_dir
710        fed_lib_dir = "%s/lib" % fed_dir
711
712        if pname:
713            sfile = "%s/%s.startup" % (tmpdir, pname)
714            script = open(sfile, "w")
715            # Reset the interfaces to the ones in the topo file
716            for i in [ i for i in elem.interface \
717                    if not i.get_attribute('portal')]:
718                pinf = node['interfaces'].get(i.name, None)
719                addr = i.get_attribute('ip4_address') 
720                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
721                if pinf and addr:
722                    print >>script, \
723                            "%s %s %s netmask %s"  % \
724                            (ifconfig, pinf, addr, netmask)
725                else:
726                    self.log.error("Missing interface or address for %s" \
727                            % i.name)
728               
729            for l, f in self.federation_software:
730                base = os.path.basename(f)
731                stage_file(script, base)
732                if l: install_tar(script, l, base)
733                else: install_rpm(script, base)
734
735            for s in elem.software:
736                s_base = s.location.rpartition('/')[2]
737                stage_file(script, s_base)
738                if s.install: install_tar(script, s.install, s_base)
739                else: install_rpm(script, s_base)
740
741            for f in ('hosts', pubkey, secretkey, 'client.conf', 
742                    'userconf'):
743                stage_file(script, f, fed_etc_dir)
744            if self.sshd:
745                stage_file(script, self.sshd, fed_bin_dir)
746            if self.sshd_config:
747                stage_file(script, self.sshd_config, fed_etc_dir)
748
749            # Look in tmpdir to get the names.  They've all been copied
750            # into the (remote) staging dir
751            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
752                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
753
754            # Done with staging, remove the identity used to stage
755            print >>script, "#/bin/rm .ssh/id_rsa"
756
757            # Start commands
758            if elem.get_attribute('portal') and self.portal_startcommand:
759                # Install portal software
760                for l, f in self.portal_software:
761                    base = os.path.basename(f)
762                    stage_file(script, base)
763                    if l: install_tar(script, l, base)
764                    else: install_rpm(script, base)
765
766                # Portals never have a user-specified start command
767                print >>script, self.portal_startcommand
768            elif self.node_startcommand:
769                # XXX: debug
770                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
771                # XXX: debug
772                if elem.get_attribute('startup'):
773                    print >>script, "%s \\$USER '%s'" % \
774                            (self.node_startcommand,
775                                    elem.get_attribute('startup'))
776                else:
777                    print >>script, self.node_startcommand
778            script.close()
779            return sfile, pname
780        else:
781            return None, None
782
783
784    def configure_nodes(self, segment_commands, topo, nodes, user, 
785            pubkey, secretkey, stagingdir, tmpdir):
786        """
787        For each node in the topology, generate a script file that copies
788        software onto it and installs it in the proper places and then runs the
789        startup command (including the federation commands.
790        """
791
792
793
794        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
795            vname = e.name
796            sfile, pname = self.write_node_config_script(e,
797                    nodes.get(vname, { }),
798                    user, pubkey, secretkey, stagingdir, tmpdir)
799            if sfile:
800                if not segment_commands.scp_file(sfile, user, pname):
801                    self.log.error("Could not copy script to %s" % pname)
802            else:
803                self.log.error("Unmapped node: %s" % vname)
804
805    def start_node(self, user, host, node, segment_commands):
806        """
807        Copy an identity to a node for the configuration script to be able to
808        import data and then run the startup script remotely.
809        """
810        # Place an identity on the node so that the copying can succeed
811        segment_commands.scp_file( segment_commands.ssh_privkey_file,
812                user, node, ".ssh/id_rsa")
813        segment_commands.ssh_cmd(user, node, 
814                "sudo /bin/sh ./%s.startup &" % node)
815
816    def start_nodes(self, user, host, nodes, segment_commands):
817        """
818        Start a thread to initialize each node and wait for them to complete.
819        Each thread runs start_node.
820        """
821        threads = [ ]
822        for n in nodes:
823            t = Thread(target=self.start_node, args=(user, host, n, 
824                segment_commands))
825            t.start()
826            threads.append(t)
827
828        done = [not t.isAlive() for t in threads]
829        while not all(done):
830            self.log.info("Waiting for threads %s" % done)
831            time.sleep(10)
832            done = [not t.isAlive() for t in threads]
833
834    def set_up_staging_filespace(self, segment_commands, user, host,
835            stagingdir):
836        """
837        Set up teh staging area on the staging machine.  To reduce the number
838        of ssh commands, we compose a script and execute it remotely.
839        """
840
841        self.log.info("[start_segment]: creating script file")
842        try:
843            sf, scriptname = tempfile.mkstemp()
844            scriptfile = os.fdopen(sf, 'w')
845        except EnvironmentError:
846            return False
847
848        scriptbase = os.path.basename(scriptname)
849
850        # Script the filesystem changes
851        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
852        print >>scriptfile, 'mkdir -p %s' % stagingdir
853        print >>scriptfile, "rm -f %s" % scriptbase
854        scriptfile.close()
855
856        # Move the script to the remote machine
857        # XXX: could collide tempfile names on the remote host
858        if segment_commands.scp_file(scriptname, user, host, scriptbase):
859            os.remove(scriptname)
860        else:
861            return False
862
863        # Execute the script (and the script's last line deletes it)
864        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
865            return False
866
867    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
868        """
869        Protogeni interactions take a context and a protogeni certificate.
870        This establishes both for later calls and returns them.
871        """
872        if os.access(certfile, os.R_OK):
873            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
874        else:
875            self.log.error("[start_segment]: Cannot read certfile: %s" % \
876                    certfile)
877            return None, None
878
879        try:
880            gcred = segment_commands.pg_call(self.sa_url, 
881                    'GetCredential', {}, ctxt)
882        except self.ProtoGENIError, e:
883            raise service_error(service_error.federant,
884                    "ProtoGENI: %s" % e)
885
886        return ctxt, gcred
887
888    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
889        """
890        Find a usable slice name by trying random ones until there's no
891        collision.
892        """
893
894        def random_slicename(user):
895            """
896            Return a random slicename by appending 5 letters to the username.
897            """
898            slicename = user
899            for i in range(0,5):
900                slicename += random.choice(string.ascii_letters)
901            return slicename
902
903        while True:
904            slicename = random_slicename(user)
905            try:
906                param = {
907                        'credential': gcred, 
908                        'hrn': slicename,
909                        'type': 'Slice'
910                        }
911                segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt)
912            except segment_commands.ProtoGENIError, e:
913                print e
914                break
915
916        return slicename
917
918    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
919        """
920        Create the slice and allocate resources.  If any of this stuff fails,
921        the allocations will time out on PG in short order, so we just raise
922        the service_error.  Return the slice and sliver credentials as well as
923        the manifest.
924        """
925        try:
926            param = {
927                    'credential': gcred, 
928                    'hrn': slicename,
929                    'type': 'Slice'
930                    }
931            slice_cred = segment_commands.pg_call(self.sa_url, 'Register',
932                    param, ctxt)
933            #f = open("./slice_cred", "w")
934            #print >>f, slice_cred
935            #f.close()
936            # Populate the ssh keys (let PG format them)
937            param = {
938                    'credential': gcred, 
939                    }
940            keys =  segment_commands.pg_call(self.sa_url, 'GetKeys', param, 
941                    ctxt)
942            # Grab and redeem a ticket
943            param = {
944                    'credential': slice_cred, 
945                    'rspec': rspec,
946                    }
947            ticket = segment_commands.pg_call(self.cm_url, 'GetTicket', param,
948                    ctxt)
949            #f = open("./ticket", "w")
950            #print >>f, ticket
951            #f.close()
952            param = { 
953                    'credential': slice_cred, 
954                    'keys': keys,
955                    'ticket': ticket,
956                    }
957            sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 
958                    'RedeemTicket', param, ctxt)
959            #f = open("./sliver_cred", "w")
960            #print >>f, sliver_cred
961            #f.close()
962            #f = open("./manifest", "w")
963            #print >>f, manifest
964            #f.close()
965            # start 'em up
966            param = { 
967                    'credential': sliver_cred,
968                    }
969            segment_commands.pg_call(self.cm_url, 'StartSliver', param, ctxt)
970        except segment_commands.ProtoGENIError, e:
971            raise service_error(service_error.federant,
972                    "ProtoGENI: %s %s" % (e.code, e))
973
974        return (slice_cred, sliver_cred, manifest)
975
976    def wait_for_slice(self, segment_commands, slice_cred, ctxt, timeout=None):
977        """
978        Wait for the given slice to finish its startup.  Return the final
979        status.
980        """
981        status = 'notready'
982        if timeout is not None:
983            end = time.time() + timeout
984        try:
985            while status == 'notready':
986                param = { 
987                        'credential': slice_cred
988                        }
989                r = segment_commands.pg_call(self.cm_url,
990                        'SliceStatus', param, ctxt)
991                status = r.get('status', 'notready')
992                if status == 'notready':
993                    if timeout is not None and time.time() > end:
994                        return 'timeout'
995                    time.sleep(30)
996        except segment_commands.ProtoGENIError, e:
997            raise service_error(service_error.federant,
998                    "ProtoGENI: %s %s" % (e.code, e))
999
1000        return status
1001
1002    def delete_slice(self, segment_commands, slice_cred, ctxt):
1003        """
1004        Delete the slice resources.  An error from the service is ignores,
1005        because the soft state will go away anyway.
1006        """
1007        try:
1008            param = { 'credential': slice_cred }
1009            segment_commands.pg_call(self.cm_url, 'DeleteSlice',
1010                    param, ctxt)
1011        except segment_commands.ProtoGENIError, e:
1012            self.log.warn("ProtoGENI: %s" % e)
1013
1014
1015
1016    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1017            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1018            export_certfile, topo, connInfo, services, timeout=0):
1019        """
1020        Start a sub-experiment on a federant.
1021
1022        Get the current state, modify or create as appropriate, ship data
1023        and configs and start the experiment.  There are small ordering
1024        differences based on the initial state of the sub-experiment.
1025        """
1026
1027        # Local software dir
1028        lsoftdir = "%s/software" % tmpdir
1029        host = self.staging_host
1030
1031        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1032                certfile, certpw)
1033
1034        if not ctxt: return False
1035
1036        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1037        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1038        self.log.info("Creating %s" % slicename)
1039        slice_cred, sliver_cred, manifest = self.allocate_slice(
1040                segment_commands, slicename, rspec, gcred, ctxt)
1041
1042        # With manifest in hand, we can export the portal node names.
1043        if self.create_debug: nodes = self.fake_manifest(topo)
1044        else: nodes = self.manifest_to_dict(manifest)
1045
1046        self.export_store_info(export_certfile, nodes, self.ssh_port,
1047                connInfo)
1048        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1049                ename, connInfo, services, nodes)
1050
1051        # Copy software to the staging machine (done after generation to copy
1052        # those, too)
1053        for d in (tmpdir, lsoftdir):
1054            if os.path.isdir(d):
1055                for f in os.listdir(d):
1056                    if not os.path.isdir("%s/%s" % (d, f)):
1057                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1058                                user, host, "%s/%s" % (stagingdir, f)):
1059                            self.log.error("Scp failed")
1060                            return False
1061
1062
1063        # Now we wait for the nodes to start on PG
1064        status = self.wait_for_slice(segment_commands, slice_cred, ctxt,
1065                timeout=300)
1066        if status == 'failed':
1067            self.log.error('Sliver failed to start on ProtoGENI')
1068            self.delete_slice(segment_commands, slice_cred, ctxt)
1069            return False
1070        elif status == 'timeout':
1071            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1072            self.delete_slice(segment_commands, slice_cred, ctxt)
1073            return False
1074        else:
1075            # All good: save ProtoGENI info in shared state
1076            self.state_lock.acquire()
1077            self.allocation[aid]['slice_name'] = slicename
1078            self.allocation[aid]['slice_credential'] = slice_cred
1079            self.allocation[aid]['sliver_credential'] = sliver_cred
1080            self.allocation[aid]['manifest'] = manifest
1081            self.allocation[aid]['certfile'] = certfile
1082            self.allocation[aid]['certpw'] = certpw
1083            self.write_state()
1084            self.state_lock.release()
1085
1086        # Now we have configuration to do for ProtoGENI
1087        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1088                secretkey, stagingdir, tmpdir)
1089
1090        self.start_nodes(user, self.staging_host, 
1091                [ n.get('hostname', None) for n in nodes.values()],
1092                segment_commands)
1093
1094        # Everything has gone OK.
1095        return True, dict([(k, n.get('hostname', None)) \
1096                for k, n in nodes.items()])
1097
1098    def generate_rspec(self, topo, softdir, connInfo):
1099
1100        # Force a useful image.  Without picking this the nodes can get
1101        # different images and there is great pain.
1102        def image_filter(e):
1103            if isinstance(e, topdl.Computer):
1104                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1105                        'image+emulab-ops//FEDORA10-STD" />'
1106            else:
1107                return ""
1108        # Main line of generate
1109        t = topo.clone()
1110
1111        starts = { }
1112        # Weed out the things we aren't going to instantiate: Segments, portal
1113        # substrates, and portal interfaces.  (The copy in the for loop allows
1114        # us to delete from e.elements in side the for loop).  While we're
1115        # touching all the elements, we also adjust paths from the original
1116        # testbed to local testbed paths and put the federation commands and
1117        # startcommands into a dict so we can start them manually later.
1118        # ProtoGENI requires setup before the federation commands run, so we
1119        # run them by hand after we've seeded configurations.
1120        for e in [e for e in t.elements]:
1121            if isinstance(e, topdl.Segment):
1122                t.elements.remove(e)
1123            # Fix software paths
1124            for s in getattr(e, 'software', []):
1125                s.location = re.sub("^.*/", softdir, s.location)
1126            if isinstance(e, topdl.Computer):
1127                if e.get_attribute('portal') and self.portal_startcommand:
1128                    # Portals never have a user-specified start command
1129                    starts[e.name] = self.portal_startcommand
1130                elif self.node_startcommand:
1131                    if e.get_attribute('startup'):
1132                        starts[e.name] = "%s \\$USER '%s'" % \
1133                                (self.node_startcommand, 
1134                                        e.get_attribute('startup'))
1135                        e.remove_attribute('startup')
1136                    else:
1137                        starts[e.name] = self.node_startcommand
1138
1139                # Remove portal interfaces
1140                e.interface = [i for i in e.interface \
1141                        if not i.get_attribute('portal')]
1142
1143        t.substrates = [ s.clone() for s in t.substrates ]
1144        t.incorporate_elements()
1145
1146        # Customize the rspec output to use the image we like
1147        filters = [ image_filter ]
1148
1149        # Convert to rspec and return it
1150        exp_rspec = topdl.topology_to_rspec(t, filters)
1151        print exp_rspec
1152
1153        return exp_rspec
1154
1155    def retrieve_software(self, topo, certfile, softdir):
1156        """
1157        Collect the software that nodes in the topology need loaded and stage
1158        it locally.  This implies retrieving it from the experiment_controller
1159        and placing it into softdir.  Certfile is used to prove that this node
1160        has access to that data (it's the allocation/segment fedid).  Finally
1161        local portal and federation software is also copied to the same staging
1162        directory for simplicity - all software needed for experiment creation
1163        is in softdir.
1164        """
1165        sw = set()
1166        for e in topo.elements:
1167            for s in getattr(e, 'software', []):
1168                sw.add(s.location)
1169        os.mkdir(softdir)
1170        for s in sw:
1171            self.log.debug("Retrieving %s" % s)
1172            try:
1173                get_url(s, certfile, softdir)
1174            except:
1175                t, v, st = sys.exc_info()
1176                raise service_error(service_error.internal,
1177                        "Error retrieving %s: %s" % (s, v))
1178
1179        # Copy local portal node software to the tempdir
1180        for s in (self.portal_software, self.federation_software):
1181            for l, f in s:
1182                base = os.path.basename(f)
1183                copy_file(f, "%s/%s" % (softdir, base))
1184
1185
1186    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1187        """
1188        Gather common configuration files, retrieve or create an experiment
1189        name and project name, and return the ssh_key filenames.  Create an
1190        allocation log bound to the state log variable as well.
1191        """
1192        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1193        ename = None
1194        pubkey_base = None
1195        secretkey_base = None
1196        alloc_log = None
1197
1198        for a in attrs:
1199            if a['attribute'] in configs:
1200                try:
1201                    self.log.debug("Retrieving %s" % a['value'])
1202                    get_url(a['value'], certfile, tmpdir)
1203                except:
1204                    t, v, st = sys.exc_info()
1205                    raise service_error(service_error.internal,
1206                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1207            if a['attribute'] == 'ssh_pubkey':
1208                pubkey_base = a['value'].rpartition('/')[2]
1209            if a['attribute'] == 'ssh_secretkey':
1210                secretkey_base = a['value'].rpartition('/')[2]
1211            if a['attribute'] == 'experiment_name':
1212                ename = a['value']
1213
1214        if not ename:
1215            ename = ""
1216            for i in range(0,5):
1217                ename += random.choice(string.ascii_letters)
1218            self.log.warn("No experiment name: picked one randomly: %s" \
1219                    % ename)
1220
1221        self.state_lock.acquire()
1222        if self.allocation.has_key(aid):
1223            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1224            self.allocation[aid]['experiment'] = ename
1225            self.allocation[aid]['log'] = [ ]
1226            # Create a logger that logs to the experiment's state object as
1227            # well as to the main log file.
1228            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1229            h = logging.StreamHandler(
1230                    list_log.list_log(self.allocation[aid]['log']))
1231            # XXX: there should be a global one of these rather than
1232            # repeating the code.
1233            h.setFormatter(logging.Formatter(
1234                "%(asctime)s %(name)s %(message)s",
1235                        '%d %b %y %H:%M:%S'))
1236            alloc_log.addHandler(h)
1237            self.write_state()
1238        else:
1239            self.log.error("No allocation for %s!?" % aid)
1240        self.state_lock.release()
1241
1242        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1243                cpw, alloc_log)
1244
1245    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1246        # Copy the assigned names into the return topology
1247        rvtopo = topo.clone()
1248        embedding = [ ]
1249        for k, n in nodes.items():
1250            embedding.append({ 
1251                'toponame': k,
1252                'physname': [n ],
1253                })
1254        # Grab the log (this is some anal locking, but better safe than
1255        # sorry)
1256        self.state_lock.acquire()
1257        logv = "".join(self.allocation[aid]['log'])
1258        # It's possible that the StartSegment call gets retried (!).
1259        # if the 'started' key is in the allocation, we'll return it rather
1260        # than redo the setup.
1261        self.allocation[aid]['started'] = { 
1262                'allocID': alloc_id,
1263                'allocationLog': logv,
1264                'segmentdescription': { 
1265                    'topdldescription': rvtopo.to_dict() },
1266                'embedding': embedding,
1267                }
1268        retval = copy.deepcopy(self.allocation[aid]['started'])
1269        self.write_state()
1270        self.state_lock.release()
1271
1272        return retval
1273
1274    def StartSegment(self, req, fid):
1275        err = None  # Any service_error generated after tmpdir is created
1276        rv = None   # Return value from segment creation
1277
1278        try:
1279            req = req['StartSegmentRequestBody']
1280            topref = req['segmentdescription']['topdldescription']
1281        except KeyError:
1282            raise service_error(service_error.req, "Badly formed request")
1283
1284        connInfo = req.get('connection', [])
1285        services = req.get('service', [])
1286        auth_attr = req['allocID']['fedid']
1287        aid = "%s" % auth_attr
1288        attrs = req.get('fedAttr', [])
1289        if not self.auth.check_attribute(fid, auth_attr):
1290            raise service_error(service_error.access, "Access denied")
1291        else:
1292            # See if this is a replay of an earlier succeeded StartSegment -
1293            # sometimes SSL kills 'em.  If so, replay the response rather than
1294            # redoing the allocation.
1295            self.state_lock.acquire()
1296            retval = self.allocation[aid].get('started', None)
1297            self.state_lock.release()
1298            if retval:
1299                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1300                        "replaying response")
1301                return retval
1302
1303        if topref:
1304            topo = topdl.Topology(**topref)
1305        else:
1306            raise service_error(service_error.req, 
1307                    "Request missing segmentdescription'")
1308
1309        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1310        try:
1311            tmpdir = tempfile.mkdtemp(prefix="access-")
1312            softdir = "%s/software" % tmpdir
1313        except EnvironmentError:
1314            raise service_error(service_error.internal, "Cannot create tmp dir")
1315
1316        # Try block alllows us to clean up temporary files.
1317        try:
1318            self.retrieve_software(topo, certfile, softdir)
1319            self.configure_userconf(services, tmpdir)
1320            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1321                cpw, alloc_log = self.initialize_experiment_info(attrs,
1322                        aid, certfile, tmpdir)
1323            self.import_store_info(certfile, connInfo)
1324            rspec = self.generate_rspec(topo, "%s/%s/" \
1325                    % (self.staging_dir, ename), connInfo)
1326
1327            segment_commands = protogeni_proxy(keyfile=ssh_key,
1328                    debug=self.create_debug, log=alloc_log,
1329                    ch_url = self.ch_url, sa_url=self.sa_url,
1330                    cm_url=self.cm_url)
1331            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1332                    pubkey_base, 
1333                    secretkey_base, ename,
1334                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1335                    certfile, topo, connInfo, services)
1336        except EnvironmentError, e:
1337            err = service_error(service_error.internal, "%s" % e)
1338        except service_error, e:
1339            err = e
1340        except:
1341            t, v, st = sys.exc_info()
1342            err = service_error(service_error.internal, "%s: %s" % \
1343                    (v, traceback.extract_tb(st)))
1344
1345        # Walk up tmpdir, deleting as we go
1346        if self.cleanup: self.remove_dirs(tmpdir)
1347        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1348
1349        if rv:
1350            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1351        elif err:
1352            raise service_error(service_error.federant,
1353                    "Swapin failed: %s" % err)
1354        else:
1355            raise service_error(service_error.federant, "Swapin failed")
1356
1357    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1358            certfile, certpw):
1359        """
1360        Stop a sub experiment by calling swapexp on the federant
1361        """
1362        host = self.staging_host
1363        rv = False
1364        try:
1365            # Clean out tar files: we've gone over quota in the past
1366            if stagingdir:
1367                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1368            if slice_cred:
1369                self.log.error('Removing Sliver on ProtoGENI')
1370                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1371                self.delete_slice(segment_commands, slice_cred, ctxt)
1372            return True
1373        except self.ssh_cmd_timeout:
1374            rv = False
1375        return rv
1376
1377    def TerminateSegment(self, req, fid):
1378        try:
1379            req = req['TerminateSegmentRequestBody']
1380        except KeyError:
1381            raise service_error(service_error.req, "Badly formed request")
1382
1383        auth_attr = req['allocID']['fedid']
1384        aid = "%s" % auth_attr
1385        attrs = req.get('fedAttr', [])
1386        if not self.auth.check_attribute(fid, auth_attr):
1387            raise service_error(service_error.access, "Access denied")
1388
1389        self.state_lock.acquire()
1390        if self.allocation.has_key(aid):
1391            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1392            slice_cred = self.allocation[aid].get('slice_credential', None)
1393            ename = self.allocation[aid].get('experiment', None)
1394        else:
1395            cf, user, ssh_key, cpw = (None, None, None, None)
1396            slice_cred = None
1397            ename = None
1398        self.state_lock.release()
1399
1400        if ename:
1401            staging = "%s/%s" % ( self.staging_dir, ename)
1402        else:
1403            self.log.warn("Can't find experiment name for %s" % aid)
1404            staging = None
1405
1406        segment_commands = protogeni_proxy(keyfile=ssh_key,
1407                debug=self.create_debug, ch_url = self.ch_url,
1408                sa_url=self.sa_url, cm_url=self.cm_url)
1409        self.stop_segment(segment_commands, user, staging, slice_cred, cf, cpw)
1410        return { 'allocID': req['allocID'] }
1411
1412    def renew_segment(self, segment_commands, name, scred, interval, 
1413            certfile, certpw):
1414        """
1415        Linear code through the segment renewal calls.
1416        """
1417        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1418        try:
1419            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1420                    time.gmtime(time.time() + interval))
1421            cred = segment_commands.pg_call(self.sa_url, 'GetCredential', 
1422                    {}, ctxt)
1423
1424            param = {
1425                    'credential': scred,
1426                    'expiration': expiration
1427                    }
1428            r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt)
1429            param = {
1430                    'credential': cred,
1431                    'hrn': name,
1432                    'type': 'Slice',
1433                    }
1434            slice = segment_commands.pg_call(self.sa_url, 'Resolve', 
1435                    param, ctxt)
1436            uuid = slice.get('uuid', None)
1437            if uuid == None:
1438                sys.exit('No uuid for %s' % slicename)
1439
1440            print 'Calling GetCredential (uuid)'
1441            param = {
1442                    'credential': cred,
1443                    'uuid': uuid,
1444                    'type': 'Slice',
1445                    }
1446            new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential',
1447                    param, ctxt)
1448            #f = open('./new_slice_cred', 'w')
1449            #print >>f, new_scred
1450            #f.close()
1451
1452        except segment_commands.ProtoGENIError, e:
1453            self.log.error("Failed to extend slice %s: %s" % (name, e))
1454            return None
1455        try:
1456            print 'Calling RenewSlice (CM)'
1457            param = {
1458                    'credential': new_scred,
1459                    }
1460            r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt)
1461        except segment_commands.ProtoGENIError, e:
1462            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1463
1464        return new_scred
1465   
1466
1467    def RenewSlices(self):
1468        self.log.info("Scanning for slices to renew")
1469        self.state_lock.acquire()
1470        aids = self.allocation.keys()
1471        self.state_lock.release()
1472
1473        for aid in aids:
1474            self.state_lock.acquire()
1475            if self.allocation.has_key(aid):
1476                name = self.allocation[aid].get('slice_name', None)
1477                scred = self.allocation[aid].get('slice_credential', None)
1478                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1479            else:
1480                name = None
1481                scred = None
1482            self.state_lock.release()
1483
1484            if not os.access(cf, os.R_OK):
1485                self.log.error(
1486                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1487                continue
1488
1489            # There's a ProtoGENI slice associated with the segment; renew it.
1490            if name and scred:
1491                segment_commands = protogeni_proxy(log=self.log, 
1492                        debug=self.create_debug, keyfile=ssh_key,
1493                        cm_url = self.cm_url, sa_url = self.sa_url,
1494                        ch_url = self.ch_url)
1495                new_scred = self.renew_segment(segment_commands, name, scred, 
1496                        self.renewal_interval, cf, cpw)
1497                if new_scred:
1498                    self.log.info("Slice %s renewed until %s GMT" % \
1499                            (name, time.asctime(time.gmtime(\
1500                                time.time()+self.renewal_interval))))
1501                    self.state_lock.acquire()
1502                    if self.allocation.has_key(aid):
1503                        self.allocation[aid]['slice_credential'] = new_scred
1504                    self.state_lock.release()
1505                else:
1506                    self.log.info("Failed to renew slice %s " % name)
1507
1508        # Let's do this all again soon.  (4 tries before the slices time out)   
1509        t = Timer(self.renewal_interval/4, self.RenewSlices)
1510        t.start()
Note: See TracBrowser for help on using the repository browser.