source: fedd/federation/emulab_access.py @ f3898f7

compt_changesinfo-ops
Last change on this file since f3898f7 was f3898f7, checked in by Ted Faber <faber@…>, 13 years ago

Simple support for experiments in groups. Closes #32

  • Property mode set to 100644
File size: 39.0 KB
RevLine 
[19cc408]1#!/usr/local/bin/python
2
3import os,sys
[eeb0088]4import stat # for chmod constants
[19cc408]5import re
[ab847bc]6import random
[19cc408]7import string
8import copy
[d81971a]9import pickle
[c971895]10import logging
[eeb0088]11import subprocess
[06cc65b]12import traceback
[19cc408]13
[f8582c9]14from threading import *
[8e6fe4d]15from M2Crypto.SSL import SSLError
[f8582c9]16
[f771e2f]17from access import access_base
[78f2668]18from legacy_access import legacy_access
[f771e2f]19
[ec4fb42]20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
[51cc9df]22from fedid import fedid, generate_fedid
[6e63513]23from authorizer import authorizer, abac_authorizer
[6a0c9f4]24from service_error import service_error
[9460b1e]25from remote_service import xmlrpc_handler, soap_handler, service_caller
[e83f2f2]26from proof import proof as access_proof
[11a08b0]27
[6c57fe9]28import httplib
29import tempfile
30from urlparse import urlparse
31
[11860f52]32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
[0ea11af]37
38# Make log messages disappear if noone configures a fedd logger
[11a08b0]39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
[19cc408]44
[78f2668]45class access(access_base, legacy_access):
[19cc408]46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
[53b5c18]53    max_name_len = 19
54
[3f6bc5f]55    def __init__(self, config=None, auth=None):
[866c983]56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
[f771e2f]60        access_base.__init__(self, config, auth)
[866c983]61
[53b5c18]62        self.max_name_len = access.max_name_len
63
[866c983]64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
[eeb0088]67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
[fe28bb2]69        self.userconfurl = config.get("access","userconfurl")
[9b3627e]70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
[e76f38a]72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
[49051fb]75        self.seer_master_start = config.get("access", "seer_master_start")
[ecca6eb]76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
[3bddd24]77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
[6280b1f]78        self.ssh_port = config.get("access","ssh_port") or "22"
[181aeb4]79        self.boss = config.get("access", "boss")
[814b5e5]80        self.ops = config.get("access", "ops")
[181aeb4]81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
[5e1fb7b]83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
[f771e2f]95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
[11860f52]98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103        elif self.access_type == 'local_emulab':
104            self.start_segment = local_emulab_segment.start_segment
105            self.stop_segment = local_emulab_segment.stop_segment
106        else:
107            self.start_segment = None
108            self.stop_segment = None
[866c983]109
110        self.restricted = [ ]
[6e63513]111        # XXX: this should go?
112        #if config.has_option("access", "accessdb"):
113        #    self.read_access(config.get("access", "accessdb"))
[866c983]114        tb = config.get('access', 'testbed')
115        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
116        else: self.testbed = [ ]
117
[6e63513]118        # authorization information
119        self.auth_type = config.get('access', 'auth_type') \
120                or 'legacy'
121        self.auth_dir = config.get('access', 'auth_dir')
122        accessdb = config.get("access", "accessdb")
123        # initialize the authorization system
124        if self.auth_type == 'legacy':
[c002cb2]125            self.access = { }
[6e63513]126            if accessdb:
[78f2668]127                self.legacy_read_access(accessdb, self.legacy_access_tuple)
[6e63513]128        elif self.auth_type == 'abac':
129            self.auth = abac_authorizer(load=self.auth_dir)
[c002cb2]130            self.access = [ ]
[6e63513]131            if accessdb:
[78f2668]132                self.read_access(accessdb, self.access_tuple)
[6e63513]133        else:
134            raise service_error(service_error.internal, 
135                    "Unknown auth_type: %s" % self.auth_type)
[f771e2f]136
[06cc65b]137        # read_state in the base_class
[f771e2f]138        self.state_lock.acquire()
[06cc65b]139        for a  in ('allocation', 'projects', 'keys', 'types'):
140            if a not in self.state:
141                self.state[a] = { }
[f771e2f]142        self.allocation = self.state['allocation']
143        self.projects = self.state['projects']
144        self.keys = self.state['keys']
145        self.types = self.state['types']
[6e63513]146        if self.auth_type == "legacy": 
147            # Add the ownership attributes to the authorizer.  Note that the
148            # indices of the allocation dict are strings, but the attributes are
149            # fedids, so there is a conversion.
150            for k in self.allocation.keys():
151                for o in self.allocation[k].get('owners', []):
152                    self.auth.set_attribute(o, fedid(hexstr=k))
153                if self.allocation[k].has_key('userconfig'):
154                    sfid = self.allocation[k]['userconfig']
155                    fid = fedid(hexstr=sfid)
156                    self.auth.set_attribute(fid, "/%s" % sfid)
[f771e2f]157        self.state_lock.release()
[a20a20f]158        self.exports = {
159                'SMB': self.export_SMB,
160                'seer': self.export_seer,
161                'tmcd': self.export_tmcd,
162                'userconfig': self.export_userconfig,
163                'project_export': self.export_project_export,
164                'local_seer_control': self.export_local_seer,
165                'seer_master': self.export_seer_master,
166                'hide_hosts': self.export_hide_hosts,
167                }
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.local_seer_start:
171            if 'local_seer_control' in self.exports:
172                del self.exports['local_seer_control']
173
174        if not self.local_seer_image or not self.local_seer_software or \
175                not self.seer_master_start:
176            if 'seer_master' in self.exports:
177                del self.exports['seer_master']
[866c983]178
179
180        self.soap_services = {\
181            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
182            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
[cc8d8e9]183            'StartSegment': soap_handler("StartSegment", self.StartSegment),
[e76f38a]184            'TerminateSegment': soap_handler("TerminateSegment",
185                self.TerminateSegment),
[866c983]186            }
187        self.xmlrpc_services =  {\
188            'RequestAccess': xmlrpc_handler('RequestAccess',
189                self.RequestAccess),
190            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
191                self.ReleaseAccess),
[5ae3857]192            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
193            'TerminateSegment': xmlrpc_handler('TerminateSegment',
194                self.TerminateSegment),
[866c983]195            }
196
[2761484]197        self.call_SetValue = service_caller('SetValue')
[2ee4226]198        self.call_GetValue = service_caller('GetValue', log=self.log)
[866c983]199
200        if not config.has_option("allocate", "uri"):
201            self.allocate_project = \
202                allocate_project_local(config, auth)
203        else:
204            self.allocate_project = \
205                allocate_project_remote(config, auth)
206
[e76f38a]207
[866c983]208        # If the project allocator exports services, put them in this object's
209        # maps so that classes that instantiate this can call the services.
210        self.soap_services.update(self.allocate_project.soap_services)
211        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
[f8582c9]212
[e76f38a]213    @staticmethod
[78f2668]214    def legacy_access_tuple(str):
[06cc65b]215        """
[027b87b]216        Convert a string of the form (id[:resources:resouces], id, id) into a
217        tuple of the form (project, user, user) where users may be names or
218        fedids.  The resources strings are obsolete and ignored.
[06cc65b]219        """
[866c983]220        def parse_name(n):
221            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
222            else: return n
[19cc408]223
[f771e2f]224        str = str.strip()
225        if str.startswith('(') and str.endswith(')'):
226            str = str[1:-1]
227            names = [ s.strip() for s in str.split(",")]
228            if len(names) > 3:
229                raise self.parse_error("More than three fields in name")
230            first = names[0].split(":")
231            if first == 'fedid:':
232                del first[0]
233                first[0] = fedid(hexstr=first[0])
[027b87b]234            names[0] = first[0]
[f771e2f]235
236            for i in range(1,2):
237                names[i] = parse_name(names[i])
238
239            return tuple(names)
240        else:
241            raise self.parse_error('Bad mapping (unbalanced parens)')
[19cc408]242
[6e63513]243    @staticmethod
[78f2668]244    def access_tuple(str):
[6e63513]245        """
246        Convert a string of the form (id, id) into an access_project.  This is
[78f2668]247        called by read_access to convert to local attributes.  It returns
[6e63513]248        a tuple of the form (project, user, user) where the two users are
249        always the same.
250        """
251
252        str = str.strip()
253        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
[725c55d]254            # The slice takes the parens off the string.
255            proj, user = str[1:-1].split(',')
[027b87b]256            return (proj.strip(), user.strip(), user.strip())
[6e63513]257        else:
258            raise self.parse_error(
259                    'Bad mapping (unbalanced parens or more than 1 comma)')
260
[06cc65b]261    # RequestAccess support routines
262
[78f2668]263    def legacy_lookup_access(self, req, fid):
[866c983]264        """
[06cc65b]265        Look up the local access control information mapped to this fedid and
266        credentials.  In this case it is a (project, create_user, access_user)
267        triple, and a triple of three booleans indicating which, if any will
268        need to be dynamically created.  Finally a list of owners for that
269        allocation is returned.
270
271        lookup_access_base pulls the first triple out, and it is parsed by this
272        routine into the boolean map.  Owners is always the controlling fedid.
[866c983]273        """
274        # Return values
[027b87b]275        rp = None
[866c983]276        ru = None
277        # This maps a valid user to the Emulab projects and users to use
[78f2668]278        found, match = self.legacy_lookup_access_base(req, fid)
[f771e2f]279        tb, project, user = match
[866c983]280       
281        if found == None:
282            raise service_error(service_error.access,
283                    "Access denied - cannot map access")
284
285        # resolve <dynamic> and <same> in found
286        dyn_proj = False
287        dyn_create_user = False
288        dyn_service_user = False
289
[027b87b]290        if found[0] == "<same>":
[866c983]291            if project != None:
[027b87b]292                rp = project
[866c983]293            else : 
294                raise service_error(\
295                        service_error.server_config,
296                        "Project matched <same> when no project given")
[027b87b]297        elif found[0] == "<dynamic>":
298            rp = None
[866c983]299            dyn_proj = True
300        else:
[027b87b]301            rp = found[0]
[866c983]302
303        if found[1] == "<same>":
304            if user_match == "<any>":
305                if user != None: rcu = user[0]
306                else: raise service_error(\
307                        service_error.server_config,
308                        "Matched <same> on anonymous request")
309            else:
310                rcu = user_match
311        elif found[1] == "<dynamic>":
312            rcu = None
313            dyn_create_user = True
314        else:
315            rcu = found[1]
316       
317        if found[2] == "<same>":
318            if user_match == "<any>":
319                if user != None: rsu = user[0]
320                else: raise service_error(\
321                        service_error.server_config,
322                        "Matched <same> on anonymous request")
323            else:
324                rsu = user_match
325        elif found[2] == "<dynamic>":
326            rsu = None
327            dyn_service_user = True
328        else:
329            rsu = found[2]
330
331        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
[f771e2f]332                [ fid ]
[19cc408]333
[f771e2f]334    def do_project_allocation(self, dyn, project, user):
335        """
336        Call the project allocation routines and return the info.
337        """
338        if dyn: 
339            # Compose the dynamic project request
340            # (only dynamic, dynamic currently allowed)
341            preq = { 'AllocateProjectRequestBody': \
342                        { 'project' : {\
343                            'user': [ \
344                            { \
345                                'access': [ { 
346                                    'sshPubkey': self.ssh_pubkey_file } ],
347                                 'role': "serviceAccess",\
348                            }, \
349                            { \
350                                'access': [ { 
351                                    'sshPubkey': self.ssh_pubkey_file } ],
352                                 'role': "experimentCreation",\
353                            }, \
354                            ], \
355                            }\
356                        }\
357                    }
358            return self.allocate_project.dynamic_project(preq)
[eeb0088]359        else:
[f771e2f]360            preq = {'StaticProjectRequestBody' : \
361                    { 'project': \
362                        { 'name' : { 'localname' : project },\
363                          'user' : [ \
364                            {\
365                                'userID': { 'localname' : user }, \
366                                'access': [ { 
367                                    'sshPubkey': self.ssh_pubkey_file } ],
368                                'role': 'experimentCreation'\
369                            },\
370                            {\
371                                'userID': { 'localname' : user}, \
372                                'access': [ { 
373                                    'sshPubkey': self.ssh_pubkey_file } ],
374                                'role': 'serviceAccess'\
375                            },\
376                        ]}\
377                    }\
378            }
379            return self.allocate_project.static_project(preq)
[eeb0088]380
[f771e2f]381    def save_project_state(self, aid, ap, dyn, owners):
382        """
383        Parse out and save the information relevant to the project created for
384        this experiment.  That info is largely in ap and owners.  dyn indicates
385        that the project was created dynamically.  Return the user and project
386        names.
387        """
388        self.state_lock.acquire()
389        self.allocation[aid] = { }
[c65b7e4]390        self.allocation[aid]['auth'] = set()
[eeb0088]391        try:
[f771e2f]392            pname = ap['project']['name']['localname']
393        except KeyError:
394            pname = None
395
396        if dyn:
397            if not pname:
398                self.state_lock.release()
399                raise service_error(service_error.internal,
400                        "Misformed allocation response?")
401            if pname in self.projects: self.projects[pname] += 1
402            else: self.projects[pname] = 1
403            self.allocation[aid]['project'] = pname
[43197eb]404        else:
[f771e2f]405            # sproject is a static project associated with this allocation.
406            self.allocation[aid]['sproject'] = pname
[866c983]407
[f771e2f]408        self.allocation[aid]['keys'] = [ ]
[5e1fb7b]409
[f771e2f]410        try:
411            for u in ap['project']['user']:
412                uname = u['userID']['localname']
413                if u['role'] == 'experimentCreation':
414                    self.allocation[aid]['user'] = uname
415                for k in [ k['sshPubkey'] for k in u['access'] \
416                        if k.has_key('sshPubkey') ]:
417                    kv = "%s:%s" % (uname, k)
418                    if self.keys.has_key(kv): self.keys[kv] += 1
419                    else: self.keys[kv] = 1
420                    self.allocation[aid]['keys'].append((uname, k))
421        except KeyError:
422            self.state_lock.release()
423            raise service_error(service_error.internal,
424                    "Misformed allocation response?")
425
426        self.allocation[aid]['owners'] = owners
427        self.write_state()
428        self.state_lock.release()
429        return (pname, uname)
[19cc408]430
[06cc65b]431    # End of RequestAccess support routines
432
[19cc408]433    def RequestAccess(self, req, fid):
[866c983]434        """
435        Handle the access request.  Proxy if not for us.
436
437        Parse out the fields and make the allocations or rejections if for us,
438        otherwise, assuming we're willing to proxy, proxy the request out.
439        """
440
441        def gateway_hardware(h):
[5e1fb7b]442            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
[866c983]443            else: return h
444
[43197eb]445        def get_export_project(svcs):
446            """
447            if the service requests includes one to export a project, return
448            that project.
449            """
450            rv = None
451            for s in svcs:
452                if s.get('name', '') == 'project_export' and \
453                        s.get('visibility', '') == 'export':
454                    if not rv: 
[1f6a573]455                        for a in s.get('fedAttr', []):
[43197eb]456                            if a.get('attribute', '') == 'project' \
457                                    and 'value' in a:
458                                rv = a['value']
459                    else:
460                        raise service_error(service_error, access, 
461                                'Requesting multiple project exports is ' + \
462                                        'not supported');
463            return rv
464
[866c983]465        # The dance to get into the request body
466        if req.has_key('RequestAccessRequestBody'):
467            req = req['RequestAccessRequestBody']
468        else:
469            raise service_error(service_error.req, "No request!?")
470
[1f6a573]471        # if this includes a project export request, construct a filter such
472        # that only the ABAC attributes mapped to that project are checked for
473        # access.
474        if 'service' in req:
475            ep = get_export_project(req['service'])
[de86b35]476            if ep: pf = lambda(a): a.value[0] == ep
477            else: pf = None
[1f6a573]478        else:
479            ep = None
480            pf = None
481
[c573278]482        if self.auth.import_credentials(
483                data_list=req.get('abac_credential', [])):
484            self.auth.save()
[866c983]485
[6e63513]486        if self.auth_type == "legacy":
[e83f2f2]487            found, dyn, owners= self.legacy_lookup_access(req, fid)
488            proof = access_proof("me", fid, "create")
[6e63513]489        elif self.auth_type == 'abac':
[e83f2f2]490            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
[6e63513]491        else:
492            raise service_error(service_error.internal, 
493                    'Unknown auth_type: %s' % self.auth_type)
[f771e2f]494        ap = None
[866c983]495
[1f6a573]496        # This only happens in legacy lookups, but if this user has access to
497        # the testbed but not the project to be exported, raise the error.
498        if ep and ep != found[0]:
499            raise service_error(service_error.access,
500                    "Cannot export %s" % ep)
[f771e2f]501
502        if self.ssh_pubkey_file:
[027b87b]503            ap = self.do_project_allocation(dyn[1], found[0], found[1])
[f771e2f]504        else:
505            raise service_error(service_error.internal, 
506                    "SSH access parameters required")
507        # keep track of what's been added
508        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
509        aid = unicode(allocID)
510
511        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
512
513        services, svc_state = self.export_services(req.get('service',[]),
514                pname, uname)
515        self.state_lock.acquire()
516        # Store services state in global state
517        for k, v in svc_state.items():
518            self.allocation[aid][k] = v
[c65b7e4]519        self.append_allocation_authorization(aid, 
520                set([(o, allocID) for o in owners]), state_attr='allocation')
[f771e2f]521        self.write_state()
522        self.state_lock.release()
523        try:
524            f = open("%s/%s.pem" % (self.certdir, aid), "w")
525            print >>f, alloc_cert
526            f.close()
527        except EnvironmentError, e:
528            raise service_error(service_error.internal, 
529                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
530        resp = self.build_access_response({ 'fedid': allocID } ,
[e83f2f2]531                ap, services, proof)
[f771e2f]532        return resp
[d81971a]533
[8cf2b90e]534    def do_release_project(self, del_project, del_users, del_types):
535        """
536        If a project and users has to be deleted, make the call.
537        """
538        msg = { 'project': { }}
539        if del_project:
540            msg['project']['name']= {'localname': del_project}
541        users = [ ]
542        for u in del_users.keys():
543            users.append({ 'userID': { 'localname': u },\
544                'access' :  \
545                        [ {'sshPubkey' : s } for s in del_users[u]]\
546            })
547        if users: 
548            msg['project']['user'] = users
549        if len(del_types) > 0:
550            msg['resources'] = { 'node': \
551                    [ {'hardware': [ h ] } for h in del_types ]\
552                }
553        if self.allocate_project.release_project:
554            msg = { 'ReleaseProjectRequestBody' : msg}
555            self.allocate_project.release_project(msg)
556
[d81971a]557    def ReleaseAccess(self, req, fid):
[866c983]558        # The dance to get into the request body
559        if req.has_key('ReleaseAccessRequestBody'):
560            req = req['ReleaseAccessRequestBody']
561        else:
562            raise service_error(service_error.req, "No request!?")
563
[8cf2b90e]564        try:
565            if req['allocID'].has_key('localname'):
566                auth_attr = aid = req['allocID']['localname']
567            elif req['allocID'].has_key('fedid'):
568                aid = unicode(req['allocID']['fedid'])
569                auth_attr = req['allocID']['fedid']
570            else:
571                raise service_error(service_error.req,
572                        "Only localnames and fedids are understood")
573        except KeyError:
574            raise service_error(service_error.req, "Badly formed request")
575
[725c55d]576        self.log.debug("[access] deallocation requested for %s by %s" % \
577                (aid, fid))
[e83f2f2]578        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
579                with_proof=True)
580        if not access_ok:
[8cf2b90e]581            self.log.debug("[access] deallocation denied for %s", aid)
582            raise service_error(service_error.access, "Access Denied")
583
584        # If we know this allocation, reduce the reference counts and
585        # remove the local allocations.  Otherwise report an error.  If
586        # there is an allocation to delete, del_users will be a dictonary
587        # of sets where the key is the user that owns the keys in the set.
588        # We use a set to avoid duplicates.  del_project is just the name
589        # of any dynamic project to delete.  We're somewhat lazy about
590        # deleting authorization attributes.  Having access to something
591        # that doesn't exist isn't harmful.
592        del_users = { }
593        del_project = None
594        del_types = set()
595
596        self.state_lock.acquire()
597        if aid in self.allocation:
598            self.log.debug("Found allocation for %s" %aid)
[c65b7e4]599            self.clear_allocation_authorization(aid, state_attr='allocation')
[8cf2b90e]600            for k in self.allocation[aid]['keys']:
601                kk = "%s:%s" % k
602                self.keys[kk] -= 1
603                if self.keys[kk] == 0:
604                    if not del_users.has_key(k[0]):
605                        del_users[k[0]] = set()
606                    del_users[k[0]].add(k[1])
607                    del self.keys[kk]
608
609            if 'project' in self.allocation[aid]:
610                pname = self.allocation[aid]['project']
611                self.projects[pname] -= 1
612                if self.projects[pname] == 0:
613                    del_project = pname
614                    del self.projects[pname]
615
616            if 'types' in self.allocation[aid]:
617                for t in self.allocation[aid]['types']:
618                    self.types[t] -= 1
619                    if self.types[t] == 0:
620                        if not del_project: del_project = t[0]
621                        del_types.add(t[1])
622                        del self.types[t]
623
624            del self.allocation[aid]
625            self.write_state()
626            self.state_lock.release()
627            # If we actually have resources to deallocate, prepare the call.
628            if del_project or del_users:
629                self.do_release_project(del_project, del_users, del_types)
630            # And remove the access cert
631            cf = "%s/%s.pem" % (self.certdir, aid)
632            self.log.debug("Removing %s" % cf)
633            os.remove(cf)
[e83f2f2]634            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
[8cf2b90e]635        else:
636            self.state_lock.release()
637            raise service_error(service_error.req, "No such allocation")
[866c983]638
[06cc65b]639    # These are subroutines for StartSegment
[8cf2b90e]640    def generate_ns2(self, topo, expfn, softdir, connInfo):
[06cc65b]641        """
642        Convert topo into an ns2 file, decorated with appropriate commands for
643        the particular testbed setup.  Convert all requests for software, etc
644        to point at the staged copies on this testbed and add the federation
645        startcommands.
646        """
[617592b]647        class dragon_commands:
648            """
649            Functor to spit out approrpiate dragon commands for nodes listed in
650            the connectivity description.  The constructor makes a dict mapping
651            dragon nodes to their parameters and the __call__ checks each
652            element in turn for membership.
653            """
654            def __init__(self, map):
655                self.node_info = map
656
657            def __call__(self, e):
658                s = ""
659                if isinstance(e, topdl.Computer):
[49051fb]660                    if self.node_info.has_key(e.name):
[fefa026]661                        info = self.node_info[e.name]
662                        for ifname, vlan, type in info:
[617592b]663                            for i in e.interface:
664                                if i.name == ifname:
665                                    addr = i.get_attribute('ip4_address')
666                                    subs = i.substrate[0]
667                                    break
668                            else:
669                                raise service_error(service_error.internal,
670                                        "No interface %s on element %s" % \
[49051fb]671                                                (ifname, e.name))
[1cf8e2c]672                            # XXX: do netmask right
[617592b]673                            if type =='link':
[06cc65b]674                                s = ("tb-allow-external ${%s} " + \
675                                        "dragonportal ip %s vlan %s " + \
676                                        "netmask 255.255.255.0\n") % \
[e07c8f3]677                                        (topdl.to_tcl_name(e.name), addr, vlan)
[617592b]678                            elif type =='lan':
[06cc65b]679                                s = ("tb-allow-external ${%s} " + \
680                                        "dragonportal " + \
[617592b]681                                        "ip %s vlan %s usurp %s\n") % \
[e07c8f3]682                                        (topdl.to_tcl_name(e.name), addr, 
683                                                vlan, subs)
[617592b]684                            else:
685                                raise service_error(service_error_internal,
686                                        "Unknown DRAGON type %s" % type)
687                return s
688
689        class not_dragon:
[06cc65b]690            """
691            Return true if a node is in the given map of dragon nodes.
692            """
[617592b]693            def __init__(self, map):
694                self.nodes = set(map.keys())
695
696            def __call__(self, e):
[49051fb]697                return e.name not in self.nodes
[69692a9]698
[06cc65b]699        # Main line of generate_ns2
[ecca6eb]700        t = topo.clone()
701
[06cc65b]702        # Create the map of nodes that need direct connections (dragon
703        # connections) from the connInfo
[617592b]704        dragon_map = { }
705        for i in [ i for i in connInfo if i['type'] == 'transit']:
706            for a in i.get('fedAttr', []):
707                if a['attribute'] == 'vlan_id':
708                    vlan = a['value']
709                    break
710            else:
[8cf2b90e]711                raise service_error(service_error.internal, "No vlan tag")
[617592b]712            members = i.get('member', [])
713            if len(members) > 1: type = 'lan'
714            else: type = 'link'
715
716            try:
717                for m in members:
[8cf2b90e]718                    if m['element'] in dragon_map:
[617592b]719                        dragon_map[m['element']].append(( m['interface'], 
720                            vlan, type))
721                    else:
722                        dragon_map[m['element']] = [( m['interface'], 
723                            vlan, type),]
724            except KeyError:
725                raise service_error(service_error.req,
726                        "Missing connectivity info")
727
[35aa3ae]728        # Weed out the things we aren't going to instantiate: Segments, portal
729        # substrates, and portal interfaces.  (The copy in the for loop allows
730        # us to delete from e.elements in side the for loop).  While we're
731        # touching all the elements, we also adjust paths from the original
732        # testbed to local testbed paths and put the federation commands into
733        # the start commands
734        for e in [e for e in t.elements]:
735            if isinstance(e, topdl.Segment):
736                t.elements.remove(e)
[43649f1]737            if isinstance(e, topdl.Computer):
[e76f38a]738                self.add_kit(e, self.federation_software)
[5e1fb7b]739                if e.get_attribute('portal') and self.portal_startcommand:
[9b3627e]740                    # Add local portal support software
[e76f38a]741                    self.add_kit(e, self.portal_software)
[43649f1]742                    # Portals never have a user-specified start command
[5e1fb7b]743                    e.set_attribute('startup', self.portal_startcommand)
744                elif self.node_startcommand:
[43649f1]745                    if e.get_attribute('startup'):
[d87778f]746                        e.set_attribute('startup', "%s \\$USER '%s'" % \
[5e1fb7b]747                                (self.node_startcommand, 
748                                    e.get_attribute('startup')))
[43649f1]749                    else:
[5e1fb7b]750                        e.set_attribute('startup', self.node_startcommand)
[0297248]751
[49051fb]752                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
[35aa3ae]753                # Remove portal interfaces that do not connect to DRAGON
754                e.interface = [i for i in e.interface \
[617592b]755                        if not i.get_attribute('portal') or i.name in dinf ]
[9b3627e]756            # Fix software paths
757            for s in getattr(e, 'software', []):
758                s.location = re.sub("^.*/", softdir, s.location)
[35aa3ae]759
760        t.substrates = [ s.clone() for s in t.substrates ]
761        t.incorporate_elements()
[ecca6eb]762
763        # Customize the ns2 output for local portal commands and images
764        filters = []
765
[5e1fb7b]766        if self.dragon_endpoint:
[617592b]767            add_filter = not_dragon(dragon_map)
768            filters.append(dragon_commands(dragon_map))
[69692a9]769        else:
770            add_filter = None
771
[5e1fb7b]772        if self.portal_command:
773            filters.append(topdl.generate_portal_command_filter(
774                self.portal_command, add_filter=add_filter))
[ecca6eb]775
[5e1fb7b]776        if self.portal_image:
[ecca6eb]777            filters.append(topdl.generate_portal_image_filter(
[5e1fb7b]778                self.portal_image))
[ecca6eb]779
[5e1fb7b]780        if self.portal_type:
[ecca6eb]781            filters.append(topdl.generate_portal_hardware_filter(
[5e1fb7b]782                self.portal_type))
[ecca6eb]783
784        # Convert to ns and write it out
785        expfile = topdl.topology_to_ns2(t, filters)
786        try:
787            f = open(expfn, "w")
788            print >>f, expfile
789            f.close()
[d3c8759]790        except EnvironmentError:
[ecca6eb]791            raise service_error(service_error.internal,
792                    "Cannot write experiment file %s: %s" % (expfn,e))
[f9ef40b]793
[2c1fd21]794    def export_store_info(self, cf, proj, ename, connInfo):
795        """
796        For the export requests in the connection info, install the peer names
797        at the experiment controller via SetValue calls.
798        """
799
800        for c in connInfo:
801            for p in [ p for p in c.get('parameter', []) \
802                    if p.get('type', '') == 'output']:
803
804                if p.get('name', '') == 'peer':
805                    k = p.get('key', None)
806                    surl = p.get('store', None)
807                    if surl and k and k.index('/') != -1:
808                        value = "%s.%s.%s%s" % \
809                                (k[k.index('/')+1:], ename, proj, self.domain)
810                        req = { 'name': k, 'value': value }
811                        self.log.debug("Setting %s to %s on %s" % \
812                                (k, value, surl))
813                        self.call_SetValue(surl, req, cf)
814                    else:
815                        self.log.error("Bad export request: %s" % p)
816                elif p.get('name', '') == 'ssh_port':
817                    k = p.get('key', None)
818                    surl = p.get('store', None)
819                    if surl and k:
820                        req = { 'name': k, 'value': self.ssh_port }
821                        self.log.debug("Setting %s to %s on %s" % \
822                                (k, self.ssh_port, surl))
823                        self.call_SetValue(surl, req, cf)
824                    else:
825                        self.log.error("Bad export request: %s" % p)
826                else:
827                    self.log.error("Unknown export parameter: %s" % \
828                            p.get('name'))
829                    continue
830
[49051fb]831    def add_seer_node(self, topo, name, startup):
832        """
833        Add a seer node to the given topology, with the startup command passed
[06cc65b]834        in.  Used by configure seer_services.
[49051fb]835        """
836        c_node = topdl.Computer(
837                name=name, 
838                os= topdl.OperatingSystem(
839                    attribute=[
840                    { 'attribute': 'osid', 
841                        'value': self.local_seer_image },
842                    ]),
843                attribute=[
844                    { 'attribute': 'startup', 'value': startup },
845                    ]
846                )
847        self.add_kit(c_node, self.local_seer_software)
848        topo.elements.append(c_node)
849
850    def configure_seer_services(self, services, topo, softdir):
[8cf2b90e]851        """
852        Make changes to the topology required for the seer requests being made.
853        Specifically, add any control or master nodes required and set up the
854        start commands on the nodes to interconnect them.
855        """
856        local_seer = False      # True if we need to add a control node
857        collect_seer = False    # True if there is a seer-master node
858        seer_master= False      # True if we need to add the seer-master
[49051fb]859        for s in services:
860            s_name = s.get('name', '')
861            s_vis = s.get('visibility','')
862
[8cf2b90e]863            if s_name  == 'local_seer_control' and s_vis == 'export':
[49051fb]864                local_seer = True
865            elif s_name == 'seer_master':
866                if s_vis == 'import':
867                    collect_seer = True
868                elif s_vis == 'export':
869                    seer_master = True
870       
871        # We've got the whole picture now, so add nodes if needed and configure
872        # them to interconnect properly.
873        if local_seer or seer_master:
874            # Copy local seer control node software to the tempdir
875            for l, f in self.local_seer_software:
876                base = os.path.basename(f)
877                copy_file(f, "%s/%s" % (softdir, base))
878        # If we're collecting seers somewhere the controllers need to talk to
879        # the master.  In testbeds that export the service, that will be a
880        # local node that we'll add below.  Elsewhere it will be the control
881        # portal that will port forward to the exporting master.
882        if local_seer:
883            if collect_seer:
[acaa9b9]884                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
[49051fb]885            else:
886                startup = self.local_seer_start
887            self.add_seer_node(topo, 'control', startup)
888        # If this is the seer master, add that node, too.
889        if seer_master:
[e07c8f3]890            self.add_seer_node(topo, 'seer-master', 
891                    "%s -R -n -R seer-master -R -A -R sink" % \
892                            self.seer_master_start)
[49051fb]893
[06cc65b]894    def retrieve_software(self, topo, certfile, softdir):
895        """
896        Collect the software that nodes in the topology need loaded and stage
897        it locally.  This implies retrieving it from the experiment_controller
898        and placing it into softdir.  Certfile is used to prove that this node
899        has access to that data (it's the allocation/segment fedid).  Finally
900        local portal and federation software is also copied to the same staging
901        directory for simplicity - all software needed for experiment creation
902        is in softdir.
903        """
904        sw = set()
905        for e in topo.elements:
906            for s in getattr(e, 'software', []):
907                sw.add(s.location)
908        for s in sw:
909            self.log.debug("Retrieving %s" % s)
910            try:
911                get_url(s, certfile, softdir)
912            except:
913                t, v, st = sys.exc_info()
914                raise service_error(service_error.internal,
915                        "Error retrieving %s: %s" % (s, v))
[49051fb]916
[06cc65b]917        # Copy local federation and portal node software to the tempdir
918        for s in (self.federation_software, self.portal_software):
919            for l, f in s:
920                base = os.path.basename(f)
921                copy_file(f, "%s/%s" % (softdir, base))
[49051fb]922
[6c57fe9]923
[06cc65b]924    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
925        """
926        Gather common configuration files, retrieve or create an experiment
927        name and project name, and return the ssh_key filenames.  Create an
928        allocation log bound to the state log variable as well.
929        """
[6c57fe9]930        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
[06cc65b]931        ename = None
932        pubkey_base = None
933        secretkey_base = None
934        proj = None
935        user = None
936        alloc_log = None
[262328f]937        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
[06cc65b]938
[f3898f7]939        self.state_lock.acquire()
940        if aid in self.allocation:
941            proj = self.allocation[aid].get('project', None)
942            if not proj: 
943                proj = self.allocation[aid].get('sproject', None)
944        self.state_lock.release()
945
946        if not proj:
947            raise service_error(service_error.internal, 
948                    "Can't find project for %s" %aid)
949
[06cc65b]950        for a in attrs:
951            if a['attribute'] in configs:
952                try:
953                    self.log.debug("Retrieving %s from %s" % \
954                            (a['attribute'], a['value']))
955                    get_url(a['value'], certfile, tmpdir)
956                except:
957                    t, v, st = sys.exc_info()
958                    raise service_error(service_error.internal,
959                            "Error retrieving %s: %s" % (a.get('value', ""), v))
960            if a['attribute'] == 'ssh_pubkey':
961                pubkey_base = a['value'].rpartition('/')[2]
962            if a['attribute'] == 'ssh_secretkey':
963                secretkey_base = a['value'].rpartition('/')[2]
964            if a['attribute'] == 'experiment_name':
965                ename = a['value']
966
[f3898f7]967        # Names longer than the emulab max are discarded
968        # Projects with a group require nonce experiment names as well
969        if ename and len(ename) <= self.max_name_len and '/' not in proj:
[262328f]970            # Clean up the experiment name so that emulab will accept it.
971            ename = re.sub(vchars_re, '-', ename)
972
973        else:
[06cc65b]974            ename = ""
975            for i in range(0,5):
976                ename += random.choice(string.ascii_letters)
[53b5c18]977            self.log.warn("No experiment name or suggestion too long: " + \
978                    "picked one randomly: %s" % ename)
[06cc65b]979
980        if not pubkey_base:
981            raise service_error(service_error.req, 
982                    "No public key attribute")
983
984        if not secretkey_base:
985            raise service_error(service_error.req, 
986                    "No secret key attribute")
[6c57fe9]987
[06cc65b]988        self.state_lock.acquire()
989        if aid in self.allocation:
990            user = self.allocation[aid].get('user', None)
991            self.allocation[aid]['experiment'] = ename
992            self.allocation[aid]['log'] = [ ]
993            # Create a logger that logs to the experiment's state object as
994            # well as to the main log file.
995            alloc_log = logging.getLogger('fedd.access.%s' % ename)
996            h = logging.StreamHandler(
997                    list_log.list_log(self.allocation[aid]['log']))
998            # XXX: there should be a global one of these rather than
999            # repeating the code.
1000            h.setFormatter(logging.Formatter(
1001                "%(asctime)s %(name)s %(message)s",
1002                        '%d %b %y %H:%M:%S'))
1003            alloc_log.addHandler(h)
1004            self.write_state()
1005        self.state_lock.release()
1006
1007        if not user:
1008            raise service_error(service_error.internal, 
1009                    "Can't find creation user for %s" %aid)
1010
1011        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1012
[e83f2f2]1013    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
[06cc65b]1014        """
1015        Store key bits of experiment state in the global repository, including
1016        the response that may need to be replayed, and return the response.
1017        """
1018        # Copy the assigned names into the return topology
1019        embedding = [ ]
1020        for n in starter.node:
1021            embedding.append({ 
1022                'toponame': n,
1023                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1024                })
1025        # Grab the log (this is some anal locking, but better safe than
1026        # sorry)
1027        self.state_lock.acquire()
1028        logv = "".join(self.allocation[aid]['log'])
1029        # It's possible that the StartSegment call gets retried (!).
1030        # if the 'started' key is in the allocation, we'll return it rather
1031        # than redo the setup.
1032        self.allocation[aid]['started'] = { 
1033                'allocID': alloc_id,
1034                'allocationLog': logv,
1035                'segmentdescription': { 
1036                    'topdldescription': topo.clone().to_dict()
1037                    },
[e83f2f2]1038                'embedding': embedding,
1039                'proof': proof.to_dict(),
[06cc65b]1040                }
1041        retval = copy.copy(self.allocation[aid]['started'])
1042        self.write_state()
1043        self.state_lock.release()
1044        return retval
1045   
1046    # End of StartSegment support routines
1047
1048    def StartSegment(self, req, fid):
[b770aa0]1049        err = None  # Any service_error generated after tmpdir is created
1050        rv = None   # Return value from segment creation
1051
[cc8d8e9]1052        try:
1053            req = req['StartSegmentRequestBody']
[06cc65b]1054            auth_attr = req['allocID']['fedid']
1055            topref = req['segmentdescription']['topdldescription']
[cc8d8e9]1056        except KeyError:
1057            raise service_error(server_error.req, "Badly formed request")
[ecca6eb]1058
[e02cd14]1059        connInfo = req.get('connection', [])
1060        services = req.get('service', [])
[ecca6eb]1061        aid = "%s" % auth_attr
[6c57fe9]1062        attrs = req.get('fedAttr', [])
[e83f2f2]1063
1064        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1065                with_proof=True)
1066        if not access_ok:
[ecca6eb]1067            raise service_error(service_error.access, "Access denied")
[cd06678]1068        else:
1069            # See if this is a replay of an earlier succeeded StartSegment -
1070            # sometimes SSL kills 'em.  If so, replay the response rather than
1071            # redoing the allocation.
1072            self.state_lock.acquire()
1073            retval = self.allocation[aid].get('started', None)
1074            self.state_lock.release()
1075            if retval:
1076                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1077                        "replaying response")
1078                return retval
1079
1080        # A new request.  Do it.
[6c57fe9]1081
[06cc65b]1082        if topref: topo = topdl.Topology(**topref)
[6c57fe9]1083        else:
1084            raise service_error(service_error.req, 
1085                    "Request missing segmentdescription'")
[2761484]1086       
[6c57fe9]1087        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1088        try:
1089            tmpdir = tempfile.mkdtemp(prefix="access-")
[ecca6eb]1090            softdir = "%s/software" % tmpdir
[c23684d]1091            os.mkdir(softdir)
[d3c8759]1092        except EnvironmentError:
[6c57fe9]1093            raise service_error(service_error.internal, "Cannot create tmp dir")
1094
[b770aa0]1095        # Try block alllows us to clean up temporary files.
1096        try:
[06cc65b]1097            self.retrieve_software(topo, certfile, softdir)
1098            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1099                    self.initialize_experiment_info(attrs, aid, 
1100                            certfile, tmpdir)
[9b3627e]1101
[f3898f7]1102            if '/' in proj: proj, gid = proj.split('/')
1103            else: gid = None
1104
1105
[06cc65b]1106            # Set up userconf and seer if needed
[c200d36]1107            self.configure_userconf(services, tmpdir)
[49051fb]1108            self.configure_seer_services(services, topo, softdir)
[06cc65b]1109            # Get and send synch store variables
[2761484]1110            self.export_store_info(certfile, proj, ename, connInfo)
1111            self.import_store_info(certfile, connInfo)
1112
[b770aa0]1113            expfile = "%s/experiment.tcl" % tmpdir
1114
1115            self.generate_portal_configs(topo, pubkey_base, 
[06cc65b]1116                    secretkey_base, tmpdir, proj, ename, connInfo, services)
[b770aa0]1117            self.generate_ns2(topo, expfile, 
[8cf2b90e]1118                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
[f07fa49]1119
[b770aa0]1120            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
[181aeb4]1121                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1122                    cert=self.xmlrpc_cert)
[f3898f7]1123            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
[b770aa0]1124        except service_error, e:
1125            err = e
[06cc65b]1126        except:
1127            t, v, st = sys.exc_info()
1128            err = service_error(service_error.internal, "%s: %s" % \
1129                    (v, traceback.extract_tb(st)))
[b770aa0]1130
[574055e]1131        # Walk up tmpdir, deleting as we go
[06cc65b]1132        if self.cleanup: self.remove_dirs(tmpdir)
1133        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
[574055e]1134
[fd556d1]1135        if rv:
[e83f2f2]1136            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1137                    proof)
[b770aa0]1138        elif err:
1139            raise service_error(service_error.federant,
1140                    "Swapin failed: %s" % err)
[fd556d1]1141        else:
1142            raise service_error(service_error.federant, "Swapin failed")
[5ae3857]1143
1144    def TerminateSegment(self, req, fid):
1145        try:
1146            req = req['TerminateSegmentRequestBody']
1147        except KeyError:
1148            raise service_error(server_error.req, "Badly formed request")
1149
1150        auth_attr = req['allocID']['fedid']
1151        aid = "%s" % auth_attr
1152        attrs = req.get('fedAttr', [])
[e83f2f2]1153
1154        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1155                with_proof=True)
1156        if not access_ok:
[5ae3857]1157            raise service_error(service_error.access, "Access denied")
1158
1159        self.state_lock.acquire()
[06cc65b]1160        if aid in self.allocation:
[5ae3857]1161            proj = self.allocation[aid].get('project', None)
1162            if not proj: 
1163                proj = self.allocation[aid].get('sproject', None)
1164            user = self.allocation[aid].get('user', None)
1165            ename = self.allocation[aid].get('experiment', None)
[1d913e13]1166        else:
1167            proj = None
1168            user = None
1169            ename = None
[5ae3857]1170        self.state_lock.release()
1171
1172        if not proj:
1173            raise service_error(service_error.internal, 
1174                    "Can't find project for %s" % aid)
[f3898f7]1175        else:
1176            if '/' in proj: proj, gid = proj.split('/')
1177            else: gid = None
[5ae3857]1178
1179        if not user:
1180            raise service_error(service_error.internal, 
1181                    "Can't find creation user for %s" % aid)
1182        if not ename:
1183            raise service_error(service_error.internal, 
1184                    "Can't find experiment name for %s" % aid)
[fd556d1]1185        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
[181aeb4]1186                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
[f3898f7]1187        stopper(self, user, proj, ename, gid)
[e83f2f2]1188        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.