source: fedd/federation/protogeni_access.py @ c65b7e4

axis_examplecompt_changesinfo-ops
Last change on this file since c65b7e4 was c65b7e4, checked in by Ted Faber <faber@…>, 13 years ago

Access controllers delete (some) unused ABAC attrs.

  • Property mode set to 100644
File size: 47.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import time
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import random
13import traceback
14import xml.parsers.expat
15
16from threading import Thread, Timer, Lock
17
18from util import *
19from fedid import fedid, generate_fedid
20from authorizer import authorizer, abac_authorizer
21from service_error import service_error
22from remote_service import xmlrpc_handler, soap_handler, service_caller
23
24import httplib
25import tempfile
26from urlparse import urlparse
27
28from access import access_base
29from legacy_access import legacy_access
30from protogeni_proxy import protogeni_proxy
31from geniapi_proxy import geniapi_proxy
32
33import topdl
34import list_log
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.domain = config.get("access", "domain")
60        self.userconfdir = config.get("access","userconfdir")
61        self.userconfcmd = config.get("access","userconfcmd")
62        self.userconfurl = config.get("access","userconfurl")
63        self.federation_software = config.get("access", "federation_software")
64        self.portal_software = config.get("access", "portal_software")
65        self.ssh_port = config.get("access","ssh_port") or "22"
66        self.sshd = config.get("access","sshd")
67        self.sshd_config = config.get("access", "sshd_config")
68        self.access_type = config.get("access", "type")
69        self.staging_dir = config.get("access", "staging_dir") or "/tmp"
70        self.staging_host = config.get("access", "staging_host") \
71                or "ops.emulab.net"
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75   
76        self.dragon_endpoint = config.get("access", "dragon")
77        self.dragon_vlans = config.get("access", "dragon_vlans")
78        self.deter_internal = config.get("access", "deter_internal")
79
80        self.tunnel_config = config.getboolean("access", "tunnel_config")
81        self.portal_command = config.get("access", "portal_command")
82        self.portal_image = config.get("access", "portal_image")
83        self.portal_type = config.get("access", "portal_type") or "pc"
84        self.portal_startcommand = config.get("access", "portal_startcommand")
85        self.node_startcommand = config.get("access", "node_startcommand")
86
87        self.federation_software = self.software_list(self.federation_software)
88        self.portal_software = self.software_list(self.portal_software)
89        self.local_seer_software = self.software_list(self.local_seer_software)
90
91        self.renewal_interval = config.get("access", "renewal") or (3 * 60 )
92        self.renewal_interval = int(self.renewal_interval) * 60
93
94        self.ch_url = config.get("access", "ch_url")
95        self.sa_url = config.get("access", "sa_url")
96        self.cm_url = config.get("access", "cm_url")
97
98        self.restricted = [ ]
99
100        # read_state in the base_class
101        self.state_lock.acquire()
102        for a  in ('allocation', 'projects', 'keys', 'types'):
103            if a not in self.state:
104                self.state[a] = { }
105        self.allocation = self.state['allocation']
106        self.projects = self.state['projects']
107        self.keys = self.state['keys']
108        self.types = self.state['types']
109        self.state_lock.release()
110
111
112        self.log = logging.getLogger("fedd.access")
113        set_log_level(config, "access", self.log)
114
115        # authorization information
116        self.auth_type = config.get('access', 'auth_type') \
117                or 'legacy'
118        self.auth_dir = config.get('access', 'auth_dir')
119        accessdb = config.get("access", "accessdb")
120        # initialize the authorization system
121        if self.auth_type == 'legacy':
122            self.access = { }
123            if accessdb:
124                self.legacy_read_access(accessdb, self.make_access_info)
125            # Add the ownership attributes to the authorizer.  Note that the
126            # indices of the allocation dict are strings, but the attributes are
127            # fedids, so there is a conversion.
128            self.state_lock.acquire()
129            for k in self.state.get('allocation', {}).keys():
130                for o in self.state['allocation'][k].get('owners', []):
131                    self.auth.set_attribute(o, fedid(hexstr=k))
132                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
133
134            self.state_lock.release()
135            self.lookup_access = self.legacy_lookup_access_base
136        elif self.auth_type == 'abac':
137            self.auth = abac_authorizer(load=self.auth_dir)
138            self.access = [ ]
139            if accessdb:
140                self.read_access(accessdb, self.make_access_info)
141        else:
142            raise service_error(service_error.internal, 
143                    "Unknown auth_type: %s" % self.auth_type)
144        api = config.get("access", "api") or "protogeni"
145        if api == "protogeni":
146            self.api_proxy = protogeni_proxy
147        elif api == "geniapi":
148            self.api_proxy = geniapi_proxy
149        else:
150            self.log.debug("Unknown interface, using protogeni")
151            self.api_proxy = protogeni_proxy
152
153        self.call_SetValue = service_caller('SetValue')
154        self.call_GetValue = service_caller('GetValue')
155        self.exports = {
156                'local_seer_control': self.export_local_seer,
157                'seer_master': self.export_seer_master,
158                'hide_hosts': self.export_hide_hosts,
159                }
160
161        if not self.local_seer_image or not self.local_seer_software or \
162                not self.local_seer_start:
163            if 'local_seer_control' in self.exports:
164                del self.exports['local_seer_control']
165
166        if not self.local_seer_image or not self.local_seer_software or \
167                not self.seer_master_start:
168            if 'seer_master' in self.exports:
169                del self.exports['seer_master']
170
171        self.RenewSlices()
172
173        self.soap_services = {\
174            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
175            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
176            'StartSegment': soap_handler("StartSegment", self.StartSegment),
177            'TerminateSegment': soap_handler("TerminateSegment", 
178                self.TerminateSegment),
179            }
180        self.xmlrpc_services =  {\
181            'RequestAccess': xmlrpc_handler('RequestAccess',
182                self.RequestAccess),
183            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
184                self.ReleaseAccess),
185            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
186            'TerminateSegment': xmlrpc_handler('TerminateSegment',
187                self.TerminateSegment),
188            }
189
190    @staticmethod
191    def make_access_info(s):
192        """
193        Split a string of the form (id, id, id, id) ito its constituent tuples
194        and return them as a tuple.  Use to import access info from the
195        access_db.
196        """
197
198        ss = s.strip()
199        if ss.startswith('(') and ss.endswith(')'):
200            l = [ s.strip() for s  in ss[1:-1].split(",")]
201            if len(l) == 4:
202                return tuple(l)
203            else:
204                raise self.parse_error(
205                        "Exactly 4 elements in access info required")
206        else:
207            raise self.parse_error("Expecting parenthezied values")
208
209
210    def get_handler(self, path, fid):
211        self.log.info("Get handler %s %s" % (path, fid))
212        if self.auth.check_attribute(fid, path) and self.userconfdir:
213            return ("%s/%s" % (self.userconfdir, path), "application/binary")
214        else:
215            return (None, None)
216
217    def build_access_response(self, alloc_id, services):
218        """
219        Create the SOAP response.
220
221        Build the dictionary description of the response and use
222        fedd_utils.pack_soap to create the soap message.  ap is the allocate
223        project message returned from a remote project allocation (even if that
224        allocation was done locally).
225        """
226        # Because alloc_id is already a fedd_services_types.IDType_Holder,
227        # there's no need to repack it
228        msg = { 
229                'allocID': alloc_id,
230                'fedAttr': [
231                    { 'attribute': 'domain', 'value': self.domain } , 
232                ]
233            }
234        if self.dragon_endpoint:
235            msg['fedAttr'].append({'attribute': 'dragon',
236                'value': self.dragon_endpoint})
237        if self.deter_internal:
238            msg['fedAttr'].append({'attribute': 'deter_internal',
239                'value': self.deter_internal})
240        #XXX: ??
241        if self.dragon_vlans:
242            msg['fedAttr'].append({'attribute': 'vlans',
243                'value': self.dragon_vlans})
244
245        if services:
246            msg['service'] = services
247        return msg
248
249    def RequestAccess(self, req, fid):
250        """
251        Handle the access request.
252        """
253
254        # The dance to get into the request body
255        if req.has_key('RequestAccessRequestBody'):
256            req = req['RequestAccessRequestBody']
257        else:
258            raise service_error(service_error.req, "No request!?")
259
260        if req.has_key('destinationTestbed'):
261            dt = unpack_id(req['destinationTestbed'])
262
263        # Request for this fedd
264        found, match, owners = self.lookup_access(req, fid)
265        services, svc_state = self.export_services(req.get('service',[]),
266                None, None)
267        # keep track of what's been added
268        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
269        aid = unicode(allocID)
270
271        self.state_lock.acquire()
272        self.allocation[aid] = { }
273        # The protoGENI certificate
274        self.allocation[aid]['credentials'] = found
275        # The list of owner FIDs
276        self.allocation[aid]['owners'] = owners
277        self.allocation[aid]['auth'] = set()
278        self.append_allocation_authorization(aid, 
279                ((fid, allocID), (allocID, allocID)), state_attr='allocation')
280        self.write_state()
281        self.state_lock.release()
282
283        try:
284            f = open("%s/%s.pem" % (self.certdir, aid), "w")
285            print >>f, alloc_cert
286            f.close()
287        except EnvironmentError, e:
288            raise service_error(service_error.internal, 
289                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
290        return self.build_access_response({ 'fedid': allocID }, None)
291
292
293    def ReleaseAccess(self, req, fid):
294        # The dance to get into the request body
295        if req.has_key('ReleaseAccessRequestBody'):
296            req = req['ReleaseAccessRequestBody']
297        else:
298            raise service_error(service_error.req, "No request!?")
299
300        # Local request
301        try:
302            if req['allocID'].has_key('localname'):
303                auth_attr = aid = req['allocID']['localname']
304            elif req['allocID'].has_key('fedid'):
305                aid = unicode(req['allocID']['fedid'])
306                auth_attr = req['allocID']['fedid']
307            else:
308                raise service_error(service_error.req,
309                        "Only localnames and fedids are understood")
310        except KeyError:
311            raise service_error(service_error.req, "Badly formed request")
312
313        self.log.debug("[access] deallocation requested for %s", aid)
314        if not self.auth.check_attribute(fid, auth_attr):
315            self.log.debug("[access] deallocation denied for %s", aid)
316            raise service_error(service_error.access, "Access Denied")
317
318        self.state_lock.acquire()
319        if self.allocation.has_key(aid):
320            self.log.debug("Found allocation for %s" %aid)
321            self.clear_allocation_authorization(aid, state_attr='allocation')
322            del self.allocation[aid]
323            self.write_state()
324            self.state_lock.release()
325            # And remove the access cert
326            cf = "%s/%s.pem" % (self.certdir, aid)
327            self.log.debug("Removing %s" % cf)
328            os.remove(cf)
329            return { 'allocID': req['allocID'] } 
330        else:
331            self.state_lock.release()
332            raise service_error(service_error.req, "No such allocation")
333
334    def manifest_to_dict(self, manifest, ignore_debug=False):
335        """
336        Turn the manifest into a dict were each virtual nodename (i.e. the
337        topdl name) has an entry with the allocated machine in hostname and the
338        interfaces in 'interfaces'.  I love having XML parser code lying
339        around.
340        """
341        if self.create_debug and not ignore_debug: 
342            self.log.debug("Returning null manifest dict")
343            return { }
344
345        # The class allows us to keep a little state - the dict under
346        # consteruction and the current entry in that dict for the interface
347        # element code.
348        class manifest_parser:
349            def __init__(self):
350                self.d = { }
351                self.current_key=None
352
353            # If the element is a node, create a dict entry for it.  If it's an
354            # interface inside a node, add an entry in the interfaces list with
355            # the virtual name and component id.
356            def start_element(self, name, attrs):
357                if name == 'node':
358                    self.current_key = attrs.get('virtual_id',"")
359                    if self.current_key:
360                        self.d[self.current_key] = {
361                                'hostname': attrs.get('hostname', None),
362                                'interfaces': { },
363                                'mac': { }
364                                }
365                elif name == 'interface' and self.current_key:
366                    self.d[self.current_key]['interfaces']\
367                            [attrs.get('virtual_id','')] = \
368                            attrs.get('component_id', None)
369                elif name == 'interface_ref':
370                    # Collect mac address information from an interface_ref.
371                    # These appear after the node info has been parsed.
372                    nid = attrs.get('virtual_node_id', None)
373                    ifid = attrs.get('virtual_interface_id', None)
374                    mac = attrs.get('MAC', None)
375                    self.d[nid]['mac'][ifid] = mac
376            #  When a node is finished, clear current_key
377            def end_element(self, name):
378                if name == 'node': self.current_key = None
379
380        node = { }
381
382        mp = manifest_parser()
383        p = xml.parsers.expat.ParserCreate()
384        # These are bound to the class we just created
385        p.StartElementHandler = mp.start_element
386        p.EndElementHandler = mp.end_element
387
388        p.Parse(manifest)
389        # Make the node dict that the callers expect
390        for k in mp.d:
391            node[k] = mp.d.get('hostname', '')
392        return mp.d
393
394    def fake_manifest(self, topo):
395        """
396        Fake the output of manifest_to_dict with a bunch of generic node an
397        interface names, for debugging.
398        """
399        node = { }
400        for i, e in enumerate([ e for e in topo.elements \
401                if isinstance(e, topdl.Computer)]):
402            node[e.name] = { 
403                    'hostname': "node%03d" % i,
404                    'interfaces': { }
405                    }
406            for j, inf in enumerate(e.interface):
407                node[e.name]['interfaces'][inf.name] = 'eth%d' % j
408
409        return node
410
411
412    def generate_portal_configs(self, topo, pubkey_base, 
413            secretkey_base, tmpdir, leid, connInfo, services, nodes):
414
415        def conninfo_to_dict(key, info):
416            """
417            Make a cpoy of the connection information about key, and flatten it
418            into a single dict by parsing out any feddAttrs.
419            """
420
421            rv = None
422            for i in info:
423                if key == i.get('portal', "") or \
424                        key in [e.get('element', "") \
425                        for e in i.get('member', [])]:
426                    rv = i.copy()
427                    break
428
429            else:
430                return rv
431
432            if 'fedAttr' in rv:
433                for a in rv['fedAttr']:
434                    attr = a.get('attribute', "")
435                    val = a.get('value', "")
436                    if attr and attr not in rv:
437                        rv[attr] = val
438                del rv['fedAttr']
439            return rv
440
441        # XXX: un hardcode this
442        def client_null(f, s):
443            print >>f, "Service: %s" % s['name']
444       
445        def client_seer_master(f, s):
446            print >>f, 'PortalAlias: seer-master'
447
448        def client_smb(f, s):
449            print >>f, "Service: %s" % s['name']
450            smbshare = None
451            smbuser = None
452            smbproj = None
453            for a in s.get('fedAttr', []):
454                if a.get('attribute', '') == 'SMBSHARE':
455                    smbshare = a.get('value', None)
456                elif a.get('attribute', '') == 'SMBUSER':
457                    smbuser = a.get('value', None)
458                elif a.get('attribute', '') == 'SMBPROJ':
459                    smbproj = a.get('value', None)
460
461            if all((smbshare, smbuser, smbproj)):
462                print >>f, "SMBshare: %s" % smbshare
463                print >>f, "ProjectUser: %s" % smbuser
464                print >>f, "ProjectName: %s" % smbproj
465
466        def client_hide_hosts(f, s):
467            for a in s.get('fedAttr', [ ]):
468                if a.get('attribute', "") == 'hosts':
469                    print >>f, 'Hide: %s' % a.get('value', "")
470
471        client_service_out = {
472                'SMB': client_smb,
473                'tmcd': client_null,
474                'seer': client_null,
475                'userconfig': client_null,
476                'project_export': client_null,
477                'seer_master': client_seer_master,
478                'hide_hosts': client_hide_hosts,
479            }
480
481        def client_seer_master_export(f, s):
482            print >>f, "AddedNode: seer-master"
483
484        def client_seer_local_export(f, s):
485            print >>f, "AddedNode: control"
486
487        client_export_service_out = {
488                'seer_master': client_seer_master_export,
489                'local_seer_control': client_seer_local_export,
490            }
491
492        def server_port(f, s):
493            p = urlparse(s.get('server', 'http://localhost'))
494            print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 
495
496        def server_null(f,s): pass
497
498        def server_seer(f, s):
499            print >>f, 'seer: true'
500
501        server_service_out = {
502                'SMB': server_port,
503                'tmcd': server_port,
504                'userconfig': server_null,
505                'project_export': server_null,
506                'seer': server_seer,
507                'seer_master': server_port,
508                'hide_hosts': server_null,
509            }
510        # XXX: end un hardcode this
511
512
513        seer_out = False
514        client_out = False
515        control_gw = None
516        for e in [ e for e in topo.elements \
517                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
518            myname = e.name
519            type = e.get_attribute('portal_type')
520
521            info = conninfo_to_dict(myname, connInfo)
522
523            if not info:
524                raise service_error(service_error.req,
525                        "No connectivity info for %s" % myname)
526
527            # Translate to physical name (ProtoGENI doesn't have DNS)
528            physname = nodes.get(myname, { }).get('hostname', None)
529            peer = info.get('peer', "")
530            ldomain = self.domain
531            ssh_port = info.get('ssh_port', 22)
532
533            # Collect this for the client.conf file
534            if 'masterexperiment' in info:
535                mproj, meid = info['masterexperiment'].split("/", 1)
536
537            active = info.get('active', 'False')
538
539            if type in ('control', 'both'):
540                testbed = e.get_attribute('testbed')
541                control_gw = myname
542
543            cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower())
544            tunnelconfig = self.tunnel_config
545            try:
546                f = open(cfn, "w")
547                if active == 'True':
548                    print >>f, "active: True"
549                    print >>f, "ssh_port: %s" % ssh_port
550                    if type in ('control', 'both'):
551                        for s in [s for s in services \
552                                if s.get('name', "") in self.imports]:
553                            server_service_out[s['name']](f, s)
554
555                if tunnelconfig:
556                    print >>f, "tunnelip: %s" % tunnelconfig
557                print >>f, "peer: %s" % peer.lower()
558                print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \
559                        pubkey_base
560                print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \
561                        secretkey_base
562                f.close()
563            except EnvironmentError, e:
564                raise service_error(service_error.internal,
565                        "Can't write protal config %s: %s" % (cfn, e))
566
567        # Done with portals, write the client config file.
568        try:
569            f = open("%s/client.conf" % tmpdir, "w")
570            if control_gw:
571                print >>f, "ControlGateway: %s" % physname.lower()
572            for s in services:
573                if s.get('name',"") in self.imports and \
574                        s.get('visibility','') == 'import':
575                    client_service_out[s['name']](f, s)
576                if s.get('name', '') in self.exports and \
577                        s.get('visibility', '') == 'export' and \
578                        s['name'] in client_export_service_out:
579                    client_export_service_out[s['name']](f, s)
580            # Seer uses this.
581            if mproj and meid:
582                print >>f, "ExperimentID: %s/%s" % (mproj, meid)
583            f.close()
584        except EnvironmentError, e:
585            raise service_error(service_error.internal,
586                    "Cannot write client.conf: %s" %s)
587
588
589
590    def export_store_info(self, cf, nodes, ssh_port, connInfo):
591        """
592        For the export requests in the connection info, install the peer names
593        at the experiment controller via SetValue calls.
594        """
595
596        for c in connInfo:
597            for p in [ p for p in c.get('parameter', []) \
598                    if p.get('type', '') == 'output']:
599
600                if p.get('name', '') == 'peer':
601                    k = p.get('key', None)
602                    surl = p.get('store', None)
603                    if surl and k and k.index('/') != -1:
604                        if self.create_debug:
605                            req = { 'name': k, 'value': 'debug' }
606                            self.call_SetValue(surl, req, cf)
607                        else:
608                            n = nodes.get(k[k.index('/')+1:], { })
609                            value = n.get('hostname', None)
610                            if value:
611                                req = { 'name': k, 'value': value }
612                                self.call_SetValue(surl, req, cf)
613                            else:
614                                self.log.error("No hostname for %s" % \
615                                        k[k.index('/'):])
616                    else:
617                        self.log.error("Bad export request: %s" % p)
618                elif p.get('name', '') == 'ssh_port':
619                    k = p.get('key', None)
620                    surl = p.get('store', None)
621                    if surl and k:
622                        req = { 'name': k, 'value': ssh_port }
623                        self.call_SetValue(surl, req, cf)
624                    else:
625                        self.log.error("Bad export request: %s" % p)
626                else:
627
628                    self.log.error("Unknown export parameter: %s" % \
629                            p.get('name'))
630                    continue
631
632    def write_node_config_script(self, elem, node, user, pubkey,
633            secretkey, stagingdir, tmpdir):
634        """
635        Write out the configuration script that is to run on the node
636        represented by elem in the topology.  This is called
637        once per node to configure.
638        """
639        # These little functions/functors just make things more readable.  Each
640        # one encapsulates a small task of copying software files or installing
641        # them.
642        class stage_file_type:
643            """
644            Write code copying file sfrom the staging host to the host on which
645            this will run.
646            """
647            def __init__(self, user, host, stagingdir):
648                self.user = user
649                self.host = host
650                self.stagingdir = stagingdir
651                self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \
652                        "'ForwardX11 no' -o 'StrictHostKeyChecking no' "
653
654            def __call__(self, script, file, dest="."):
655                # If the file is a full pathname, do not use stagingdir
656                if file.find('/') == -1:
657                    file = "%s/%s" % (self.stagingdir, file)
658                print >>script, "%s %s@%s:%s %s" % \
659                        (self.scp, self.user, self.host, file, dest)
660
661        def install_tar(script, loc, base):
662            """
663            Print code to script to install a tarfile in loc.
664            """
665            tar = "/bin/tar"
666            mkdir="/bin/mkdir"
667
668            print >>script, "%s -p %s" % (mkdir, loc)
669            print >>script, "%s -C %s -xzf %s" % (tar, loc, base)
670
671        def install_rpm(script, base):
672            """
673            Print code to script to install an rpm
674            """
675            rpm = "/bin/rpm"
676            print >>script, "%s --install %s" % (rpm, base)
677
678        ifconfig = "/sbin/ifconfig"
679        findif = '/usr/local/etc/emulab/findif'
680        stage_file = stage_file_type(user, self.staging_host, stagingdir)
681        pname = node.get('hostname', None)
682        fed_dir = "/usr/local/federation"
683        fed_etc_dir = "%s/etc" % fed_dir
684        fed_bin_dir = "%s/bin" % fed_dir
685        fed_lib_dir = "%s/lib" % fed_dir
686
687        if pname:
688            sfile = "%s/%s.startup" % (tmpdir, pname)
689            script = open(sfile, "w")
690            # Reset the interfaces to the ones in the topo file
691            for i in [ i for i in elem.interface \
692                    if not i.get_attribute('portal')]:
693                if 'interfaces' in node:
694                    pinf = node['interfaces'].get(i.name, None)
695                else:
696                    pinf = None
697
698                if 'mac' in node:
699                    pmac = node['mac'].get(i.name, None)
700                else:
701                    pmac = None
702                addr = i.get_attribute('ip4_address') 
703                netmask = i.get_attribute('ip4_netmask') or '255.255.255.0'
704                # The interface names in manifests are not to be trusted, so we
705                # find the interface to configure using the local node's script
706                # to match mac address to interface name.
707                if pinf and addr and pmac:
708                    print >>script, '# %s' % pinf
709                    print >>script, \
710                            "%s `%s %s` %s netmask %s"  % \
711                            (ifconfig, findif, pmac, addr, netmask)
712                else:
713                    self.log.error("Missing interface or address for %s" \
714                            % i.name)
715               
716            for l, f in self.federation_software:
717                base = os.path.basename(f)
718                stage_file(script, base)
719                if l: install_tar(script, l, base)
720                else: install_rpm(script, base)
721
722            for s in elem.software:
723                s_base = s.location.rpartition('/')[2]
724                stage_file(script, s_base)
725                if s.install: install_tar(script, s.install, s_base)
726                else: install_rpm(script, s_base)
727
728            for f in ('hosts', pubkey, secretkey, 'client.conf', 
729                    'userconf'):
730                stage_file(script, f, fed_etc_dir)
731            if self.sshd:
732                stage_file(script, self.sshd, fed_bin_dir)
733            if self.sshd_config:
734                stage_file(script, self.sshd_config, fed_etc_dir)
735
736            # Look in tmpdir to get the names.  They've all been copied
737            # into the (remote) staging dir
738            if os.access("%s/%s.gw.conf" % (tmpdir, elem.name), os.R_OK):
739                stage_file(script, "%s.gw.conf" % elem.name, fed_etc_dir)
740
741            # Done with staging, remove the identity used to stage
742            print >>script, "#/bin/rm .ssh/id_rsa"
743
744            # Start commands
745            if elem.get_attribute('portal') and self.portal_startcommand:
746                # Install portal software
747                for l, f in self.portal_software:
748                    base = os.path.basename(f)
749                    stage_file(script, base)
750                    if l: install_tar(script, l, base)
751                    else: install_rpm(script, base)
752
753                # Portals never have a user-specified start command
754                print >>script, self.portal_startcommand
755            elif self.node_startcommand:
756                # XXX: debug
757                print >>script, "sudo perl -I%s %s/import_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user)
758                # XXX: debug
759                if elem.get_attribute('startup'):
760                    print >>script, "%s \\$USER '%s'" % \
761                            (self.node_startcommand,
762                                    elem.get_attribute('startup'))
763                else:
764                    print >>script, self.node_startcommand
765            script.close()
766            return sfile, pname
767        else:
768            return None, None
769
770
771    def configure_nodes(self, segment_commands, topo, nodes, user, 
772            pubkey, secretkey, stagingdir, tmpdir):
773        """
774        For each node in the topology, generate a script file that copies
775        software onto it and installs it in the proper places and then runs the
776        startup command (including the federation commands.
777        """
778
779
780
781        for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]:
782            vname = e.name
783            sfile, pname = self.write_node_config_script(e,
784                    nodes.get(vname, { }),
785                    user, pubkey, secretkey, stagingdir, tmpdir)
786            if sfile:
787                if not segment_commands.scp_file(sfile, user, pname):
788                    self.log.error("Could not copy script to %s" % pname)
789            else:
790                self.log.error("Unmapped node: %s" % vname)
791
792    def start_node(self, user, host, node, segment_commands):
793        """
794        Copy an identity to a node for the configuration script to be able to
795        import data and then run the startup script remotely.
796        """
797        # Place an identity on the node so that the copying can succeed
798        segment_commands.scp_file( segment_commands.ssh_privkey_file,
799                user, node, ".ssh/id_rsa")
800        segment_commands.ssh_cmd(user, node, 
801                "sudo /bin/sh ./%s.startup &" % node)
802
803    def start_nodes(self, user, host, nodes, segment_commands):
804        """
805        Start a thread to initialize each node and wait for them to complete.
806        Each thread runs start_node.
807        """
808        threads = [ ]
809        for n in nodes:
810            t = Thread(target=self.start_node, args=(user, host, n, 
811                segment_commands))
812            t.start()
813            threads.append(t)
814
815        done = [not t.isAlive() for t in threads]
816        while not all(done):
817            self.log.info("Waiting for threads %s" % done)
818            time.sleep(10)
819            done = [not t.isAlive() for t in threads]
820
821    def set_up_staging_filespace(self, segment_commands, user, host,
822            stagingdir):
823        """
824        Set up teh staging area on the staging machine.  To reduce the number
825        of ssh commands, we compose a script and execute it remotely.
826        """
827
828        self.log.info("[start_segment]: creating script file")
829        try:
830            sf, scriptname = tempfile.mkstemp()
831            scriptfile = os.fdopen(sf, 'w')
832        except EnvironmentError:
833            return False
834
835        scriptbase = os.path.basename(scriptname)
836
837        # Script the filesystem changes
838        print >>scriptfile, "/bin/rm -rf %s" % stagingdir
839        print >>scriptfile, 'mkdir -p %s' % stagingdir
840        print >>scriptfile, "rm -f %s" % scriptbase
841        scriptfile.close()
842
843        # Move the script to the remote machine
844        # XXX: could collide tempfile names on the remote host
845        if segment_commands.scp_file(scriptname, user, host, scriptbase):
846            os.remove(scriptname)
847        else:
848            return False
849
850        # Execute the script (and the script's last line deletes it)
851        if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase):
852            return False
853
854    def initialize_protogeni_context(self, segment_commands, certfile, certpw):
855        """
856        Protogeni interactions take a context and a protogeni certificate.
857        This establishes both for later calls and returns them.
858        """
859        if os.access(certfile, os.R_OK):
860            ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
861        else:
862            self.log.error("[start_segment]: Cannot read certfile: %s" % \
863                    certfile)
864            return None, None
865
866        try:
867            gcred = segment_commands.slice_authority_call('GetCredential', 
868                    {}, ctxt)
869        except segment_commands.ProtoGENIError, e:
870            raise service_error(service_error.federant,
871                    "ProtoGENI: %s" % e)
872
873        return ctxt, gcred
874
875    def get_free_slicename(self, segment_commands, user, gcred, ctxt):
876        """
877        Find a usable slice name by trying random ones until there's no
878        collision.
879        """
880
881        def random_slicename(user):
882            """
883            Return a random slicename by appending 5 letters to the username.
884            """
885            slicename = user
886            for i in range(0,5):
887                slicename += random.choice(string.ascii_letters)
888            return slicename
889
890        while True:
891            slicename = random_slicename(user)
892            try:
893                param = {
894                        'credential': gcred, 
895                        'hrn': slicename,
896                        'type': 'Slice'
897                        }
898
899                if not self.create_debug:
900                    segment_commands.slice_authority_call('Resolve', param, 
901                            ctxt)
902                else:
903                    raise segment_commands.ProtoGENIError(0,0,'Debug')
904            except segment_commands.ProtoGENIError, e:
905                print e
906                break
907
908        return slicename
909
910    def allocate_slice(self, segment_commands, slicename, rspec, gcred, ctxt):
911        """
912        Create the slice and allocate resources.  If any of this stuff fails,
913        the allocations will time out on PG in short order, so we just raise
914        the service_error.  Return the slice and sliver credentials as well as
915        the manifest.
916        """
917        try:
918            param = {
919                    'credential': gcred, 
920                    'hrn': slicename,
921                    'type': 'Slice'
922                    }
923            slice_cred = segment_commands.slice_authority_call('Register',
924                    param, ctxt)
925            # Resolve the slice to get the URN that PG has assigned it.
926            param = {
927                    'credential': gcred,
928                    'type': 'Slice',
929                    'hrn': slicename
930                    }
931            data = segment_commands.slice_authority_call('Resolve', param,
932                    ctxt)
933            if 'urn' in data:
934                slice_urn = data['urn']
935            else:
936                raise service_error(service_error.federant, 
937                        "No URN returned for slice %s" % slicename)
938
939            if 'creator_urn' in data:
940                creator_urn = data['creator_urn']
941            else:
942                raise service_error(service_error.federant, 
943                        "No creator URN returned for slice %s" % slicename)
944            # Populate the ssh keys (let PG format them)
945            param = {
946                    'credential': gcred, 
947                    }
948            keys =  segment_commands.slice_authority_call('GetKeys', param, 
949                    ctxt)
950            # Create a Sliver
951            param = {
952                    'credentials': [ slice_cred ],
953                    'rspec': rspec, 
954                    'users': [ {
955                        'urn': creator_urn,
956                        'keys': keys, 
957                        },
958                    ],
959                    'slice_urn': slice_urn,
960                    }
961            rv = segment_commands.component_manager_call(
962                    'CreateSliver', param, ctxt)
963
964            # the GENIAPI AM just hands back the manifest, bit the ProtoGENI
965            # API hands back a sliver credential.  This just finds the format
966            # of choice.
967            if isinstance(rv, tuple):
968                manifest = rv[1]
969            else:
970                manifest = rv
971        except segment_commands.ProtoGENIError, e:
972            raise service_error(service_error.federant,
973                    "ProtoGENI: %s %s" % (e.code, e))
974
975        return (slice_urn, slice_cred, manifest, rspec)
976
977    def wait_for_slice(self, segment_commands, slice_cred, slice_urn, ctxt, 
978            timeout=None):
979        """
980        Wait for the given slice to finish its startup.  Return the final
981        status.
982        """
983        completed_states = ('failed', 'ready')
984        status = 'changing'
985        if timeout is not None:
986            end = time.time() + timeout
987        try:
988            while status not in completed_states:
989                param = { 
990                        'credentials': [ slice_cred ],
991                        'slice_urn': slice_urn,
992                        }
993                r = segment_commands.component_manager_call(
994                        'SliverStatus', param, ctxt)
995                # GENIAPI uses geni_status as the key, so check for both
996                status = r.get('status', r.get('geni_status','changing'))
997                if status not in completed_states:
998                    if timeout is not None and time.time() > end:
999                        return 'timeout'
1000                    time.sleep(30)
1001        except segment_commands.ProtoGENIError, e:
1002            raise service_error(service_error.federant,
1003                    "ProtoGENI: %s %s" % (e.code, e))
1004
1005        return status
1006
1007    def delete_slice(self, segment_commands, slice_cred, slice_urn, ctxt):
1008        """
1009        Delete the slice resources.  An error from the service is ignores,
1010        because the soft state will go away anyway.
1011        """
1012        try:
1013            param = { 
1014                    'credentials': [ slice_cred, ],
1015                    'slice_urn': slice_urn,
1016                    }
1017            segment_commands.component_manager_call('DeleteSlice',
1018                    param, ctxt)
1019        except segment_commands.ProtoGENIError, e:
1020            self.log.warn("ProtoGENI: %s" % e)
1021
1022
1023
1024    def start_segment(self, segment_commands, aid, user, rspec, pubkey,
1025            secretkey, ename, stagingdir, tmpdir, certfile, certpw,
1026            export_certfile, topo, connInfo, services, timeout=0):
1027        """
1028        Start a sub-experiment on a federant.
1029
1030        Get the current state, modify or create as appropriate, ship data
1031        and configs and start the experiment.  There are small ordering
1032        differences based on the initial state of the sub-experiment.
1033        """
1034
1035        # Local software dir
1036        lsoftdir = "%s/software" % tmpdir
1037        host = self.staging_host
1038
1039        ctxt, gcred = self.initialize_protogeni_context(segment_commands, 
1040                certfile, certpw)
1041
1042        if not ctxt: return False, {}
1043
1044        self.set_up_staging_filespace(segment_commands, user, host, stagingdir)
1045        slicename = self.get_free_slicename(segment_commands, user, gcred, ctxt)
1046        self.log.info("Creating %s" % slicename)
1047        slice_urn, slice_cred, manifest, rpsec = self.allocate_slice(
1048                segment_commands, slicename, rspec, gcred, ctxt)
1049
1050        # With manifest in hand, we can export the portal node names.
1051        if self.create_debug: nodes = self.fake_manifest(topo)
1052        else: nodes = self.manifest_to_dict(manifest)
1053
1054        self.export_store_info(export_certfile, nodes, self.ssh_port,
1055                connInfo)
1056        self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 
1057                ename, connInfo, services, nodes)
1058
1059        # Copy software to the staging machine (done after generation to copy
1060        # those, too)
1061        for d in (tmpdir, lsoftdir):
1062            if os.path.isdir(d):
1063                for f in os.listdir(d):
1064                    if not os.path.isdir("%s/%s" % (d, f)):
1065                        if not segment_commands.scp_file("%s/%s" % (d, f), 
1066                                user, host, "%s/%s" % (stagingdir, f)):
1067                            self.log.error("Scp failed")
1068                            return False, {}
1069
1070
1071        # Now we wait for the nodes to start on PG
1072        status = self.wait_for_slice(segment_commands, slice_cred, slice_urn, 
1073                ctxt, timeout=300)
1074        if status == 'failed':
1075            self.log.error('Sliver failed to start on ProtoGENI')
1076            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1077            return False, {}
1078        elif status == 'timeout':
1079            self.log.error('Sliver failed to start on ProtoGENI (timeout)')
1080            self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1081            return False, {}
1082        else:
1083            # All good: save ProtoGENI info in shared state
1084            self.state_lock.acquire()
1085            self.allocation[aid]['slice_urn'] = slice_urn
1086            self.allocation[aid]['slice_name'] = slicename
1087            self.allocation[aid]['slice_credential'] = slice_cred
1088            self.allocation[aid]['manifest'] = manifest
1089            self.allocation[aid]['rspec'] = rspec
1090            self.allocation[aid]['certfile'] = certfile
1091            self.allocation[aid]['certpw'] = certpw
1092            self.write_state()
1093            self.state_lock.release()
1094
1095        # Now we have configuration to do for ProtoGENI
1096        self.configure_nodes(segment_commands, topo, nodes, user, pubkey,
1097                secretkey, stagingdir, tmpdir)
1098
1099        self.start_nodes(user, self.staging_host, 
1100                [ n.get('hostname', None) for n in nodes.values()],
1101                segment_commands)
1102
1103        # Everything has gone OK.
1104        return True, dict([(k, n.get('hostname', None)) \
1105                for k, n in nodes.items()])
1106
1107    def generate_rspec(self, topo, softdir, connInfo):
1108
1109        # Force a useful image.  Without picking this the nodes can get
1110        # different images and there is great pain.
1111        def image_filter(e):
1112            if isinstance(e, topdl.Computer):
1113                return '<disk_image name="urn:publicid:IDN+emulab.net+' + \
1114                        'image+emulab-ops//FEDORA10-STD" />'
1115            else:
1116                return ""
1117        # Main line of generate
1118        t = topo.clone()
1119
1120        starts = { }
1121        # Weed out the things we aren't going to instantiate: Segments, portal
1122        # substrates, and portal interfaces.  (The copy in the for loop allows
1123        # us to delete from e.elements in side the for loop).  While we're
1124        # touching all the elements, we also adjust paths from the original
1125        # testbed to local testbed paths and put the federation commands and
1126        # startcommands into a dict so we can start them manually later.
1127        # ProtoGENI requires setup before the federation commands run, so we
1128        # run them by hand after we've seeded configurations.
1129        for e in [e for e in t.elements]:
1130            if isinstance(e, topdl.Segment):
1131                t.elements.remove(e)
1132            # Fix software paths
1133            for s in getattr(e, 'software', []):
1134                s.location = re.sub("^.*/", softdir, s.location)
1135            if isinstance(e, topdl.Computer):
1136                if e.get_attribute('portal') and self.portal_startcommand:
1137                    # Portals never have a user-specified start command
1138                    starts[e.name] = self.portal_startcommand
1139                elif self.node_startcommand:
1140                    if e.get_attribute('startup'):
1141                        starts[e.name] = "%s \\$USER '%s'" % \
1142                                (self.node_startcommand, 
1143                                        e.get_attribute('startup'))
1144                        e.remove_attribute('startup')
1145                    else:
1146                        starts[e.name] = self.node_startcommand
1147
1148                # Remove portal interfaces
1149                e.interface = [i for i in e.interface \
1150                        if not i.get_attribute('portal')]
1151
1152        t.substrates = [ s.clone() for s in t.substrates ]
1153        t.incorporate_elements()
1154
1155        # Customize the rspec output to use the image we like
1156        filters = [ image_filter ]
1157
1158        # Convert to rspec and return it
1159        exp_rspec = topdl.topology_to_rspec(t, filters)
1160
1161        return exp_rspec
1162
1163    def retrieve_software(self, topo, certfile, softdir):
1164        """
1165        Collect the software that nodes in the topology need loaded and stage
1166        it locally.  This implies retrieving it from the experiment_controller
1167        and placing it into softdir.  Certfile is used to prove that this node
1168        has access to that data (it's the allocation/segment fedid).  Finally
1169        local portal and federation software is also copied to the same staging
1170        directory for simplicity - all software needed for experiment creation
1171        is in softdir.
1172        """
1173        sw = set()
1174        for e in topo.elements:
1175            for s in getattr(e, 'software', []):
1176                sw.add(s.location)
1177        os.mkdir(softdir)
1178        for s in sw:
1179            self.log.debug("Retrieving %s" % s)
1180            try:
1181                get_url(s, certfile, softdir)
1182            except:
1183                t, v, st = sys.exc_info()
1184                raise service_error(service_error.internal,
1185                        "Error retrieving %s: %s" % (s, v))
1186
1187        # Copy local portal node software to the tempdir
1188        for s in (self.portal_software, self.federation_software):
1189            for l, f in s:
1190                base = os.path.basename(f)
1191                copy_file(f, "%s/%s" % (softdir, base))
1192
1193
1194    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
1195        """
1196        Gather common configuration files, retrieve or create an experiment
1197        name and project name, and return the ssh_key filenames.  Create an
1198        allocation log bound to the state log variable as well.
1199        """
1200        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1201        ename = None
1202        pubkey_base = None
1203        secretkey_base = None
1204        alloc_log = None
1205
1206        for a in attrs:
1207            if a['attribute'] in configs:
1208                try:
1209                    self.log.debug("Retrieving %s" % a['value'])
1210                    get_url(a['value'], certfile, tmpdir)
1211                except:
1212                    t, v, st = sys.exc_info()
1213                    raise service_error(service_error.internal,
1214                            "Error retrieving %s: %s" % (a.get('value', ""), v))
1215            if a['attribute'] == 'ssh_pubkey':
1216                pubkey_base = a['value'].rpartition('/')[2]
1217            if a['attribute'] == 'ssh_secretkey':
1218                secretkey_base = a['value'].rpartition('/')[2]
1219            if a['attribute'] == 'experiment_name':
1220                ename = a['value']
1221
1222        if not ename:
1223            ename = ""
1224            for i in range(0,5):
1225                ename += random.choice(string.ascii_letters)
1226            self.log.warn("No experiment name: picked one randomly: %s" \
1227                    % ename)
1228
1229        self.state_lock.acquire()
1230        if self.allocation.has_key(aid):
1231            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1232            self.allocation[aid]['experiment'] = ename
1233            self.allocation[aid]['log'] = [ ]
1234            # Create a logger that logs to the experiment's state object as
1235            # well as to the main log file.
1236            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1237            h = logging.StreamHandler(
1238                    list_log.list_log(self.allocation[aid]['log']))
1239            # XXX: there should be a global one of these rather than
1240            # repeating the code.
1241            h.setFormatter(logging.Formatter(
1242                "%(asctime)s %(name)s %(message)s",
1243                        '%d %b %y %H:%M:%S'))
1244            alloc_log.addHandler(h)
1245            self.write_state()
1246        else:
1247            self.log.error("No allocation for %s!?" % aid)
1248        self.state_lock.release()
1249
1250        return (ename, pubkey_base, secretkey_base, cf, user, ssh_key, 
1251                cpw, alloc_log)
1252
1253    def finalize_experiment(self, topo, nodes, aid, alloc_id):
1254        # Copy the assigned names into the return topology
1255        rvtopo = topo.clone()
1256        embedding = [ ]
1257        for k, n in nodes.items():
1258            embedding.append({ 
1259                'toponame': k,
1260                'physname': [n ],
1261                })
1262        # Grab the log (this is some anal locking, but better safe than
1263        # sorry)
1264        self.state_lock.acquire()
1265        logv = "".join(self.allocation[aid]['log'])
1266        # It's possible that the StartSegment call gets retried (!).
1267        # if the 'started' key is in the allocation, we'll return it rather
1268        # than redo the setup.
1269        self.allocation[aid]['started'] = { 
1270                'allocID': alloc_id,
1271                'allocationLog': logv,
1272                'segmentdescription': { 
1273                    'topdldescription': rvtopo.to_dict() },
1274                'embedding': embedding,
1275                }
1276        retval = copy.deepcopy(self.allocation[aid]['started'])
1277        self.write_state()
1278        self.state_lock.release()
1279
1280        return retval
1281
1282    def StartSegment(self, req, fid):
1283        err = None  # Any service_error generated after tmpdir is created
1284        rv = None   # Return value from segment creation
1285
1286        try:
1287            req = req['StartSegmentRequestBody']
1288            topref = req['segmentdescription']['topdldescription']
1289        except KeyError:
1290            raise service_error(service_error.req, "Badly formed request")
1291
1292        connInfo = req.get('connection', [])
1293        services = req.get('service', [])
1294        auth_attr = req['allocID']['fedid']
1295        aid = "%s" % auth_attr
1296        attrs = req.get('fedAttr', [])
1297        if not self.auth.check_attribute(fid, auth_attr):
1298            raise service_error(service_error.access, "Access denied")
1299        else:
1300            # See if this is a replay of an earlier succeeded StartSegment -
1301            # sometimes SSL kills 'em.  If so, replay the response rather than
1302            # redoing the allocation.
1303            self.state_lock.acquire()
1304            retval = self.allocation[aid].get('started', None)
1305            self.state_lock.release()
1306            if retval:
1307                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1308                        "replaying response")
1309                return retval
1310
1311        if topref:
1312            topo = topdl.Topology(**topref)
1313        else:
1314            raise service_error(service_error.req, 
1315                    "Request missing segmentdescription'")
1316
1317        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1318        try:
1319            tmpdir = tempfile.mkdtemp(prefix="access-")
1320            softdir = "%s/software" % tmpdir
1321        except EnvironmentError:
1322            raise service_error(service_error.internal, "Cannot create tmp dir")
1323
1324        # Try block alllows us to clean up temporary files.
1325        try:
1326            self.retrieve_software(topo, certfile, softdir)
1327            self.configure_userconf(services, tmpdir)
1328            ename, pubkey_base, secretkey_base, cf, user, ssh_key, \
1329                cpw, alloc_log = self.initialize_experiment_info(attrs,
1330                        aid, certfile, tmpdir)
1331            self.import_store_info(certfile, connInfo)
1332            rspec = self.generate_rspec(topo, "%s/%s/" \
1333                    % (self.staging_dir, ename), connInfo)
1334
1335            segment_commands = self.api_proxy(keyfile=ssh_key,
1336                    debug=self.create_debug, log=alloc_log,
1337                    ch_url = self.ch_url, sa_url=self.sa_url,
1338                    cm_url=self.cm_url)
1339            rv, nodes = self.start_segment(segment_commands, aid, user, rspec,
1340                    pubkey_base, 
1341                    secretkey_base, ename,
1342                    "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw,
1343                    certfile, topo, connInfo, services)
1344        except EnvironmentError, e:
1345            err = service_error(service_error.internal, "%s" % e)
1346        except service_error, e:
1347            err = e
1348        except:
1349            t, v, st = sys.exc_info()
1350            err = service_error(service_error.internal, "%s: %s" % \
1351                    (v, traceback.extract_tb(st)))
1352
1353        # Walk up tmpdir, deleting as we go
1354        if self.cleanup: self.remove_dirs(tmpdir)
1355        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1356
1357        if rv:
1358            return self.finalize_experiment(topo, nodes, aid, req['allocID'])
1359        elif err:
1360            raise service_error(service_error.federant,
1361                    "Swapin failed: %s" % err)
1362        else:
1363            raise service_error(service_error.federant, "Swapin failed")
1364
1365    def stop_segment(self, segment_commands, user, stagingdir, slice_cred,
1366            slice_urn, certfile, certpw):
1367        """
1368        Stop a sub experiment by calling swapexp on the federant
1369        """
1370        host = self.staging_host
1371        rv = False
1372        try:
1373            # Clean out tar files: we've gone over quota in the past
1374            if stagingdir:
1375                segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir)
1376            if slice_cred:
1377                self.log.error('Removing Sliver on ProtoGENI')
1378                ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1379                self.delete_slice(segment_commands, slice_cred, slice_urn, ctxt)
1380            return True
1381        except self.ssh_cmd_timeout:
1382            rv = False
1383        return rv
1384
1385    def TerminateSegment(self, req, fid):
1386        try:
1387            req = req['TerminateSegmentRequestBody']
1388        except KeyError:
1389            raise service_error(service_error.req, "Badly formed request")
1390
1391        auth_attr = req['allocID']['fedid']
1392        aid = "%s" % auth_attr
1393        attrs = req.get('fedAttr', [])
1394        if not self.auth.check_attribute(fid, auth_attr):
1395            raise service_error(service_error.access, "Access denied")
1396
1397        self.state_lock.acquire()
1398        if self.allocation.has_key(aid):
1399            cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1400            slice_cred = self.allocation[aid].get('slice_credential', None)
1401            slice_urn = self.allocation[aid].get('slice_urn', None)
1402            ename = self.allocation[aid].get('experiment', None)
1403        else:
1404            cf, user, ssh_key, cpw = (None, None, None, None)
1405            slice_cred = None
1406            slice_urn = None
1407            ename = None
1408        self.state_lock.release()
1409
1410        if ename:
1411            staging = "%s/%s" % ( self.staging_dir, ename)
1412        else:
1413            self.log.warn("Can't find experiment name for %s" % aid)
1414            staging = None
1415
1416        segment_commands = self.api_proxy(keyfile=ssh_key,
1417                debug=self.create_debug, ch_url = self.ch_url,
1418                sa_url=self.sa_url, cm_url=self.cm_url)
1419        self.stop_segment(segment_commands, user, staging, slice_cred,
1420                slice_urn, cf, cpw)
1421        return { 'allocID': req['allocID'] }
1422
1423    def renew_segment(self, segment_commands, name, scred, slice_urn, interval, 
1424            certfile, certpw):
1425        """
1426        Linear code through the segment renewal calls.
1427        """
1428        ctxt = fedd_ssl_context(my_cert=certfile, password=certpw)
1429        try:
1430            expiration = time.strftime("%Y%m%dT%H:%M:%S",
1431                    time.gmtime(time.time() + interval))
1432            cred = segment_commands.slice_authority_call('GetCredential', 
1433                    {}, ctxt)
1434
1435            param = {
1436                    'credential': scred,
1437                    'expiration': expiration
1438                    }
1439            r = segment_commands.slice_authority_call('RenewSlice', param, ctxt)
1440            param = {
1441                    'credential': cred,
1442                    'urn': slice_urn,
1443                    'type': 'Slice',
1444                    }
1445            new_scred = segment_commands.slice_authority_call('GetCredential',
1446                    param, ctxt)
1447
1448        except segment_commands.ProtoGENIError, e:
1449            self.log.error("Failed to extend slice %s: %s" % (name, e))
1450            return None
1451        try:
1452            param = {
1453                    'credentials': [new_scred,],
1454                    'slice_urn': slice_urn,
1455                    }
1456            r = segment_commands.component_manager_call('RenewSlice', param, 
1457                    ctxt)
1458        except segment_commands.ProtoGENIError, e:
1459            self.log.warn("Failed to renew sliver for %s: %s" % (name, e))
1460
1461        return new_scred
1462   
1463
1464    def RenewSlices(self):
1465        self.log.info("Scanning for slices to renew")
1466        self.state_lock.acquire()
1467        aids = self.allocation.keys()
1468        self.state_lock.release()
1469
1470        for aid in aids:
1471            self.state_lock.acquire()
1472            if self.allocation.has_key(aid):
1473                name = self.allocation[aid].get('slice_name', None)
1474                scred = self.allocation[aid].get('slice_credential', None)
1475                slice_urn = self.allocation[aid].get('slice_urn', None)
1476                cf, user, ssh_key, cpw = self.allocation[aid]['credentials']
1477            else:
1478                name = None
1479                scred = None
1480            self.state_lock.release()
1481
1482            if not os.access(cf, os.R_OK):
1483                self.log.error(
1484                        "[RenewSlices] cred.file %s unreadable, ignoring" % cf)
1485                continue
1486
1487            # There's a ProtoGENI slice associated with the segment; renew it.
1488            if name and scred and slice_urn:
1489                segment_commands = self.api_proxy(log=self.log, 
1490                        debug=self.create_debug, keyfile=ssh_key,
1491                        cm_url = self.cm_url, sa_url = self.sa_url,
1492                        ch_url = self.ch_url)
1493                new_scred = self.renew_segment(segment_commands, name, scred, 
1494                        slice_urn, self.renewal_interval, cf, cpw)
1495                if new_scred:
1496                    self.log.info("Slice %s renewed until %s GMT" % \
1497                            (name, time.asctime(time.gmtime(\
1498                                time.time()+self.renewal_interval))))
1499                    self.state_lock.acquire()
1500                    if self.allocation.has_key(aid):
1501                        self.allocation[aid]['slice_credential'] = new_scred
1502                        self.write_state()
1503                    self.state_lock.release()
1504                else:
1505                    self.log.info("Failed to renew slice %s " % name)
1506
1507        # Let's do this all again soon.  (4 tries before the slices time out)   
1508        t = Timer(self.renewal_interval/4, self.RenewSlices)
1509        t.start()
Note: See TracBrowser for help on using the repository browser.