source: fedd/federation/emulab_access.py @ 3df9b33

compt_changesinfo-ops
Last change on this file since 3df9b33 was 3df9b33, checked in by Ted Faber <faber@…>, 13 years ago

fedd-generated SEER certs and distribution (initial implementation,
untested) addresses #33

  • Property mode set to 100644
File size: 38.9 KB
RevLine 
[19cc408]1#!/usr/local/bin/python
2
3import os,sys
[eeb0088]4import stat # for chmod constants
[19cc408]5import re
[ab847bc]6import random
[19cc408]7import string
8import copy
[d81971a]9import pickle
[c971895]10import logging
[eeb0088]11import subprocess
[06cc65b]12import traceback
[19cc408]13
[f8582c9]14from threading import *
[8e6fe4d]15from M2Crypto.SSL import SSLError
[f8582c9]16
[f771e2f]17from access import access_base
[78f2668]18from legacy_access import legacy_access
[f771e2f]19
[ec4fb42]20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
[51cc9df]22from fedid import fedid, generate_fedid
[6e63513]23from authorizer import authorizer, abac_authorizer
[6a0c9f4]24from service_error import service_error
[9460b1e]25from remote_service import xmlrpc_handler, soap_handler, service_caller
[e83f2f2]26from proof import proof as access_proof
[11a08b0]27
[6c57fe9]28import httplib
29import tempfile
30from urlparse import urlparse
31
[11860f52]32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
[0ea11af]37
38# Make log messages disappear if noone configures a fedd logger
[11a08b0]39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
[19cc408]44
[78f2668]45class access(access_base, legacy_access):
[19cc408]46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
[53b5c18]53    max_name_len = 19
54
[3f6bc5f]55    def __init__(self, config=None, auth=None):
[866c983]56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
[f771e2f]60        access_base.__init__(self, config, auth)
[866c983]61
[53b5c18]62        self.max_name_len = access.max_name_len
63
[866c983]64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
[eeb0088]67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
[fe28bb2]69        self.userconfurl = config.get("access","userconfurl")
[9b3627e]70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
[e76f38a]72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
[49051fb]75        self.seer_master_start = config.get("access", "seer_master_start")
[ecca6eb]76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
[3bddd24]77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
[6280b1f]78        self.ssh_port = config.get("access","ssh_port") or "22"
[181aeb4]79        self.boss = config.get("access", "boss")
[814b5e5]80        self.ops = config.get("access", "ops")
[181aeb4]81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
[5e1fb7b]83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
[f771e2f]95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
[11860f52]98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103        elif self.access_type == 'local_emulab':
104            self.start_segment = local_emulab_segment.start_segment
105            self.stop_segment = local_emulab_segment.stop_segment
106        else:
107            self.start_segment = None
108            self.stop_segment = None
[866c983]109
110        self.restricted = [ ]
111        tb = config.get('access', 'testbed')
112        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
113        else: self.testbed = [ ]
114
[6e63513]115        # authorization information
116        self.auth_type = config.get('access', 'auth_type') \
117                or 'legacy'
118        self.auth_dir = config.get('access', 'auth_dir')
119        accessdb = config.get("access", "accessdb")
120        # initialize the authorization system
121        if self.auth_type == 'legacy':
[c002cb2]122            self.access = { }
[6e63513]123            if accessdb:
[78f2668]124                self.legacy_read_access(accessdb, self.legacy_access_tuple)
[6e63513]125        elif self.auth_type == 'abac':
126            self.auth = abac_authorizer(load=self.auth_dir)
[c002cb2]127            self.access = [ ]
[6e63513]128            if accessdb:
[78f2668]129                self.read_access(accessdb, self.access_tuple)
[6e63513]130        else:
131            raise service_error(service_error.internal, 
132                    "Unknown auth_type: %s" % self.auth_type)
[f771e2f]133
[06cc65b]134        # read_state in the base_class
[f771e2f]135        self.state_lock.acquire()
[06cc65b]136        for a  in ('allocation', 'projects', 'keys', 'types'):
137            if a not in self.state:
138                self.state[a] = { }
[f771e2f]139        self.allocation = self.state['allocation']
140        self.projects = self.state['projects']
141        self.keys = self.state['keys']
142        self.types = self.state['types']
[6e63513]143        if self.auth_type == "legacy": 
144            # Add the ownership attributes to the authorizer.  Note that the
145            # indices of the allocation dict are strings, but the attributes are
146            # fedids, so there is a conversion.
147            for k in self.allocation.keys():
148                for o in self.allocation[k].get('owners', []):
149                    self.auth.set_attribute(o, fedid(hexstr=k))
150                if self.allocation[k].has_key('userconfig'):
151                    sfid = self.allocation[k]['userconfig']
152                    fid = fedid(hexstr=sfid)
153                    self.auth.set_attribute(fid, "/%s" % sfid)
[f771e2f]154        self.state_lock.release()
[a20a20f]155        self.exports = {
156                'SMB': self.export_SMB,
157                'seer': self.export_seer,
158                'tmcd': self.export_tmcd,
159                'userconfig': self.export_userconfig,
160                'project_export': self.export_project_export,
161                'local_seer_control': self.export_local_seer,
162                'seer_master': self.export_seer_master,
163                'hide_hosts': self.export_hide_hosts,
164                }
165
166        if not self.local_seer_image or not self.local_seer_software or \
167                not self.local_seer_start:
168            if 'local_seer_control' in self.exports:
169                del self.exports['local_seer_control']
170
171        if not self.local_seer_image or not self.local_seer_software or \
172                not self.seer_master_start:
173            if 'seer_master' in self.exports:
174                del self.exports['seer_master']
[866c983]175
176
177        self.soap_services = {\
178            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
179            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
[cc8d8e9]180            'StartSegment': soap_handler("StartSegment", self.StartSegment),
[e76f38a]181            'TerminateSegment': soap_handler("TerminateSegment",
182                self.TerminateSegment),
[866c983]183            }
184        self.xmlrpc_services =  {\
185            'RequestAccess': xmlrpc_handler('RequestAccess',
186                self.RequestAccess),
187            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
188                self.ReleaseAccess),
[5ae3857]189            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
190            'TerminateSegment': xmlrpc_handler('TerminateSegment',
191                self.TerminateSegment),
[866c983]192            }
193
[2761484]194        self.call_SetValue = service_caller('SetValue')
[2ee4226]195        self.call_GetValue = service_caller('GetValue', log=self.log)
[866c983]196
197        if not config.has_option("allocate", "uri"):
198            self.allocate_project = \
199                allocate_project_local(config, auth)
200        else:
201            self.allocate_project = \
202                allocate_project_remote(config, auth)
203
[e76f38a]204
[866c983]205        # If the project allocator exports services, put them in this object's
206        # maps so that classes that instantiate this can call the services.
207        self.soap_services.update(self.allocate_project.soap_services)
208        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
[f8582c9]209
[e76f38a]210    @staticmethod
[78f2668]211    def legacy_access_tuple(str):
[06cc65b]212        """
[027b87b]213        Convert a string of the form (id[:resources:resouces], id, id) into a
214        tuple of the form (project, user, user) where users may be names or
215        fedids.  The resources strings are obsolete and ignored.
[06cc65b]216        """
[866c983]217        def parse_name(n):
218            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
219            else: return n
[19cc408]220
[f771e2f]221        str = str.strip()
222        if str.startswith('(') and str.endswith(')'):
223            str = str[1:-1]
224            names = [ s.strip() for s in str.split(",")]
225            if len(names) > 3:
226                raise self.parse_error("More than three fields in name")
227            first = names[0].split(":")
228            if first == 'fedid:':
229                del first[0]
230                first[0] = fedid(hexstr=first[0])
[027b87b]231            names[0] = first[0]
[f771e2f]232
233            for i in range(1,2):
234                names[i] = parse_name(names[i])
235
236            return tuple(names)
237        else:
238            raise self.parse_error('Bad mapping (unbalanced parens)')
[19cc408]239
[6e63513]240    @staticmethod
[78f2668]241    def access_tuple(str):
[6e63513]242        """
243        Convert a string of the form (id, id) into an access_project.  This is
[78f2668]244        called by read_access to convert to local attributes.  It returns
[6e63513]245        a tuple of the form (project, user, user) where the two users are
246        always the same.
247        """
248
249        str = str.strip()
250        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
[725c55d]251            # The slice takes the parens off the string.
252            proj, user = str[1:-1].split(',')
[027b87b]253            return (proj.strip(), user.strip(), user.strip())
[6e63513]254        else:
255            raise self.parse_error(
256                    'Bad mapping (unbalanced parens or more than 1 comma)')
257
[06cc65b]258    # RequestAccess support routines
259
[78f2668]260    def legacy_lookup_access(self, req, fid):
[866c983]261        """
[06cc65b]262        Look up the local access control information mapped to this fedid and
263        credentials.  In this case it is a (project, create_user, access_user)
264        triple, and a triple of three booleans indicating which, if any will
265        need to be dynamically created.  Finally a list of owners for that
266        allocation is returned.
267
268        lookup_access_base pulls the first triple out, and it is parsed by this
269        routine into the boolean map.  Owners is always the controlling fedid.
[866c983]270        """
271        # Return values
[027b87b]272        rp = None
[866c983]273        ru = None
274        # This maps a valid user to the Emulab projects and users to use
[78f2668]275        found, match = self.legacy_lookup_access_base(req, fid)
[f771e2f]276        tb, project, user = match
[866c983]277       
278        if found == None:
279            raise service_error(service_error.access,
280                    "Access denied - cannot map access")
281
282        # resolve <dynamic> and <same> in found
283        dyn_proj = False
284        dyn_create_user = False
285        dyn_service_user = False
286
[027b87b]287        if found[0] == "<same>":
[866c983]288            if project != None:
[027b87b]289                rp = project
[866c983]290            else : 
291                raise service_error(\
292                        service_error.server_config,
293                        "Project matched <same> when no project given")
[027b87b]294        elif found[0] == "<dynamic>":
295            rp = None
[866c983]296            dyn_proj = True
297        else:
[027b87b]298            rp = found[0]
[866c983]299
300        if found[1] == "<same>":
301            if user_match == "<any>":
302                if user != None: rcu = user[0]
303                else: raise service_error(\
304                        service_error.server_config,
305                        "Matched <same> on anonymous request")
306            else:
307                rcu = user_match
308        elif found[1] == "<dynamic>":
309            rcu = None
310            dyn_create_user = True
311        else:
312            rcu = found[1]
313       
314        if found[2] == "<same>":
315            if user_match == "<any>":
316                if user != None: rsu = user[0]
317                else: raise service_error(\
318                        service_error.server_config,
319                        "Matched <same> on anonymous request")
320            else:
321                rsu = user_match
322        elif found[2] == "<dynamic>":
323            rsu = None
324            dyn_service_user = True
325        else:
326            rsu = found[2]
327
328        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
[f771e2f]329                [ fid ]
[19cc408]330
[f771e2f]331    def do_project_allocation(self, dyn, project, user):
332        """
333        Call the project allocation routines and return the info.
334        """
335        if dyn: 
336            # Compose the dynamic project request
337            # (only dynamic, dynamic currently allowed)
338            preq = { 'AllocateProjectRequestBody': \
339                        { 'project' : {\
340                            'user': [ \
341                            { \
342                                'access': [ { 
343                                    'sshPubkey': self.ssh_pubkey_file } ],
344                                 'role': "serviceAccess",\
345                            }, \
346                            { \
347                                'access': [ { 
348                                    'sshPubkey': self.ssh_pubkey_file } ],
349                                 'role': "experimentCreation",\
350                            }, \
351                            ], \
352                            }\
353                        }\
354                    }
355            return self.allocate_project.dynamic_project(preq)
[eeb0088]356        else:
[f771e2f]357            preq = {'StaticProjectRequestBody' : \
358                    { 'project': \
359                        { 'name' : { 'localname' : project },\
360                          'user' : [ \
361                            {\
362                                'userID': { 'localname' : user }, \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                'role': 'experimentCreation'\
366                            },\
367                            {\
368                                'userID': { 'localname' : user}, \
369                                'access': [ { 
370                                    'sshPubkey': self.ssh_pubkey_file } ],
371                                'role': 'serviceAccess'\
372                            },\
373                        ]}\
374                    }\
375            }
376            return self.allocate_project.static_project(preq)
[eeb0088]377
[f771e2f]378    def save_project_state(self, aid, ap, dyn, owners):
379        """
380        Parse out and save the information relevant to the project created for
381        this experiment.  That info is largely in ap and owners.  dyn indicates
382        that the project was created dynamically.  Return the user and project
383        names.
384        """
385        self.state_lock.acquire()
386        self.allocation[aid] = { }
[c65b7e4]387        self.allocation[aid]['auth'] = set()
[eeb0088]388        try:
[f771e2f]389            pname = ap['project']['name']['localname']
390        except KeyError:
391            pname = None
392
393        if dyn:
394            if not pname:
395                self.state_lock.release()
396                raise service_error(service_error.internal,
397                        "Misformed allocation response?")
398            if pname in self.projects: self.projects[pname] += 1
399            else: self.projects[pname] = 1
400            self.allocation[aid]['project'] = pname
[43197eb]401        else:
[f771e2f]402            # sproject is a static project associated with this allocation.
403            self.allocation[aid]['sproject'] = pname
[866c983]404
[f771e2f]405        self.allocation[aid]['keys'] = [ ]
[5e1fb7b]406
[f771e2f]407        try:
408            for u in ap['project']['user']:
409                uname = u['userID']['localname']
410                if u['role'] == 'experimentCreation':
411                    self.allocation[aid]['user'] = uname
412                for k in [ k['sshPubkey'] for k in u['access'] \
413                        if k.has_key('sshPubkey') ]:
414                    kv = "%s:%s" % (uname, k)
415                    if self.keys.has_key(kv): self.keys[kv] += 1
416                    else: self.keys[kv] = 1
417                    self.allocation[aid]['keys'].append((uname, k))
418        except KeyError:
419            self.state_lock.release()
420            raise service_error(service_error.internal,
421                    "Misformed allocation response?")
422
423        self.allocation[aid]['owners'] = owners
424        self.write_state()
425        self.state_lock.release()
426        return (pname, uname)
[19cc408]427
[06cc65b]428    # End of RequestAccess support routines
429
[19cc408]430    def RequestAccess(self, req, fid):
[866c983]431        """
432        Handle the access request.  Proxy if not for us.
433
434        Parse out the fields and make the allocations or rejections if for us,
435        otherwise, assuming we're willing to proxy, proxy the request out.
436        """
437
438        def gateway_hardware(h):
[5e1fb7b]439            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
[866c983]440            else: return h
441
[43197eb]442        def get_export_project(svcs):
443            """
444            if the service requests includes one to export a project, return
445            that project.
446            """
447            rv = None
448            for s in svcs:
449                if s.get('name', '') == 'project_export' and \
450                        s.get('visibility', '') == 'export':
451                    if not rv: 
[1f6a573]452                        for a in s.get('fedAttr', []):
[43197eb]453                            if a.get('attribute', '') == 'project' \
454                                    and 'value' in a:
455                                rv = a['value']
456                    else:
457                        raise service_error(service_error, access, 
458                                'Requesting multiple project exports is ' + \
459                                        'not supported');
460            return rv
461
[866c983]462        # The dance to get into the request body
463        if req.has_key('RequestAccessRequestBody'):
464            req = req['RequestAccessRequestBody']
465        else:
466            raise service_error(service_error.req, "No request!?")
467
[1f6a573]468        # if this includes a project export request, construct a filter such
469        # that only the ABAC attributes mapped to that project are checked for
470        # access.
471        if 'service' in req:
472            ep = get_export_project(req['service'])
[de86b35]473            if ep: pf = lambda(a): a.value[0] == ep
474            else: pf = None
[1f6a573]475        else:
476            ep = None
477            pf = None
478
[c573278]479        if self.auth.import_credentials(
480                data_list=req.get('abac_credential', [])):
481            self.auth.save()
[866c983]482
[6e63513]483        if self.auth_type == "legacy":
[e83f2f2]484            found, dyn, owners= self.legacy_lookup_access(req, fid)
485            proof = access_proof("me", fid, "create")
[6e63513]486        elif self.auth_type == 'abac':
[e83f2f2]487            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
[6e63513]488        else:
489            raise service_error(service_error.internal, 
490                    'Unknown auth_type: %s' % self.auth_type)
[f771e2f]491        ap = None
[866c983]492
[1f6a573]493        # This only happens in legacy lookups, but if this user has access to
494        # the testbed but not the project to be exported, raise the error.
495        if ep and ep != found[0]:
496            raise service_error(service_error.access,
497                    "Cannot export %s" % ep)
[f771e2f]498
499        if self.ssh_pubkey_file:
[027b87b]500            ap = self.do_project_allocation(dyn[1], found[0], found[1])
[f771e2f]501        else:
502            raise service_error(service_error.internal, 
503                    "SSH access parameters required")
504        # keep track of what's been added
505        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
506        aid = unicode(allocID)
507
508        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
509
510        services, svc_state = self.export_services(req.get('service',[]),
511                pname, uname)
512        self.state_lock.acquire()
513        # Store services state in global state
514        for k, v in svc_state.items():
515            self.allocation[aid][k] = v
[c65b7e4]516        self.append_allocation_authorization(aid, 
517                set([(o, allocID) for o in owners]), state_attr='allocation')
[f771e2f]518        self.write_state()
519        self.state_lock.release()
520        try:
521            f = open("%s/%s.pem" % (self.certdir, aid), "w")
522            print >>f, alloc_cert
523            f.close()
524        except EnvironmentError, e:
525            raise service_error(service_error.internal, 
526                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
527        resp = self.build_access_response({ 'fedid': allocID } ,
[e83f2f2]528                ap, services, proof)
[f771e2f]529        return resp
[d81971a]530
[8cf2b90e]531    def do_release_project(self, del_project, del_users, del_types):
532        """
533        If a project and users has to be deleted, make the call.
534        """
535        msg = { 'project': { }}
536        if del_project:
537            msg['project']['name']= {'localname': del_project}
538        users = [ ]
539        for u in del_users.keys():
540            users.append({ 'userID': { 'localname': u },\
541                'access' :  \
542                        [ {'sshPubkey' : s } for s in del_users[u]]\
543            })
544        if users: 
545            msg['project']['user'] = users
546        if len(del_types) > 0:
547            msg['resources'] = { 'node': \
548                    [ {'hardware': [ h ] } for h in del_types ]\
549                }
550        if self.allocate_project.release_project:
551            msg = { 'ReleaseProjectRequestBody' : msg}
552            self.allocate_project.release_project(msg)
553
[d81971a]554    def ReleaseAccess(self, req, fid):
[866c983]555        # The dance to get into the request body
556        if req.has_key('ReleaseAccessRequestBody'):
557            req = req['ReleaseAccessRequestBody']
558        else:
559            raise service_error(service_error.req, "No request!?")
560
[8cf2b90e]561        try:
562            if req['allocID'].has_key('localname'):
563                auth_attr = aid = req['allocID']['localname']
564            elif req['allocID'].has_key('fedid'):
565                aid = unicode(req['allocID']['fedid'])
566                auth_attr = req['allocID']['fedid']
567            else:
568                raise service_error(service_error.req,
569                        "Only localnames and fedids are understood")
570        except KeyError:
571            raise service_error(service_error.req, "Badly formed request")
572
[725c55d]573        self.log.debug("[access] deallocation requested for %s by %s" % \
574                (aid, fid))
[e83f2f2]575        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
576                with_proof=True)
577        if not access_ok:
[8cf2b90e]578            self.log.debug("[access] deallocation denied for %s", aid)
579            raise service_error(service_error.access, "Access Denied")
580
581        # If we know this allocation, reduce the reference counts and
582        # remove the local allocations.  Otherwise report an error.  If
583        # there is an allocation to delete, del_users will be a dictonary
584        # of sets where the key is the user that owns the keys in the set.
585        # We use a set to avoid duplicates.  del_project is just the name
586        # of any dynamic project to delete.  We're somewhat lazy about
587        # deleting authorization attributes.  Having access to something
588        # that doesn't exist isn't harmful.
589        del_users = { }
590        del_project = None
591        del_types = set()
592
593        self.state_lock.acquire()
594        if aid in self.allocation:
595            self.log.debug("Found allocation for %s" %aid)
[c65b7e4]596            self.clear_allocation_authorization(aid, state_attr='allocation')
[8cf2b90e]597            for k in self.allocation[aid]['keys']:
598                kk = "%s:%s" % k
599                self.keys[kk] -= 1
600                if self.keys[kk] == 0:
601                    if not del_users.has_key(k[0]):
602                        del_users[k[0]] = set()
603                    del_users[k[0]].add(k[1])
604                    del self.keys[kk]
605
606            if 'project' in self.allocation[aid]:
607                pname = self.allocation[aid]['project']
608                self.projects[pname] -= 1
609                if self.projects[pname] == 0:
610                    del_project = pname
611                    del self.projects[pname]
612
613            if 'types' in self.allocation[aid]:
614                for t in self.allocation[aid]['types']:
615                    self.types[t] -= 1
616                    if self.types[t] == 0:
617                        if not del_project: del_project = t[0]
618                        del_types.add(t[1])
619                        del self.types[t]
620
621            del self.allocation[aid]
622            self.write_state()
623            self.state_lock.release()
624            # If we actually have resources to deallocate, prepare the call.
625            if del_project or del_users:
626                self.do_release_project(del_project, del_users, del_types)
627            # And remove the access cert
628            cf = "%s/%s.pem" % (self.certdir, aid)
629            self.log.debug("Removing %s" % cf)
630            os.remove(cf)
[e83f2f2]631            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
[8cf2b90e]632        else:
633            self.state_lock.release()
634            raise service_error(service_error.req, "No such allocation")
[866c983]635
[06cc65b]636    # These are subroutines for StartSegment
[8cf2b90e]637    def generate_ns2(self, topo, expfn, softdir, connInfo):
[06cc65b]638        """
639        Convert topo into an ns2 file, decorated with appropriate commands for
640        the particular testbed setup.  Convert all requests for software, etc
641        to point at the staged copies on this testbed and add the federation
642        startcommands.
643        """
[617592b]644        class dragon_commands:
645            """
646            Functor to spit out approrpiate dragon commands for nodes listed in
647            the connectivity description.  The constructor makes a dict mapping
648            dragon nodes to their parameters and the __call__ checks each
649            element in turn for membership.
650            """
651            def __init__(self, map):
652                self.node_info = map
653
654            def __call__(self, e):
655                s = ""
656                if isinstance(e, topdl.Computer):
[49051fb]657                    if self.node_info.has_key(e.name):
[fefa026]658                        info = self.node_info[e.name]
659                        for ifname, vlan, type in info:
[617592b]660                            for i in e.interface:
661                                if i.name == ifname:
662                                    addr = i.get_attribute('ip4_address')
663                                    subs = i.substrate[0]
664                                    break
665                            else:
666                                raise service_error(service_error.internal,
667                                        "No interface %s on element %s" % \
[49051fb]668                                                (ifname, e.name))
[1cf8e2c]669                            # XXX: do netmask right
[617592b]670                            if type =='link':
[06cc65b]671                                s = ("tb-allow-external ${%s} " + \
672                                        "dragonportal ip %s vlan %s " + \
673                                        "netmask 255.255.255.0\n") % \
[e07c8f3]674                                        (topdl.to_tcl_name(e.name), addr, vlan)
[617592b]675                            elif type =='lan':
[06cc65b]676                                s = ("tb-allow-external ${%s} " + \
677                                        "dragonportal " + \
[617592b]678                                        "ip %s vlan %s usurp %s\n") % \
[e07c8f3]679                                        (topdl.to_tcl_name(e.name), addr, 
680                                                vlan, subs)
[617592b]681                            else:
682                                raise service_error(service_error_internal,
683                                        "Unknown DRAGON type %s" % type)
684                return s
685
686        class not_dragon:
[06cc65b]687            """
688            Return true if a node is in the given map of dragon nodes.
689            """
[617592b]690            def __init__(self, map):
691                self.nodes = set(map.keys())
692
693            def __call__(self, e):
[49051fb]694                return e.name not in self.nodes
[69692a9]695
[06cc65b]696        # Main line of generate_ns2
[ecca6eb]697        t = topo.clone()
698
[06cc65b]699        # Create the map of nodes that need direct connections (dragon
700        # connections) from the connInfo
[617592b]701        dragon_map = { }
702        for i in [ i for i in connInfo if i['type'] == 'transit']:
703            for a in i.get('fedAttr', []):
704                if a['attribute'] == 'vlan_id':
705                    vlan = a['value']
706                    break
707            else:
[8cf2b90e]708                raise service_error(service_error.internal, "No vlan tag")
[617592b]709            members = i.get('member', [])
710            if len(members) > 1: type = 'lan'
711            else: type = 'link'
712
713            try:
714                for m in members:
[8cf2b90e]715                    if m['element'] in dragon_map:
[617592b]716                        dragon_map[m['element']].append(( m['interface'], 
717                            vlan, type))
718                    else:
719                        dragon_map[m['element']] = [( m['interface'], 
720                            vlan, type),]
721            except KeyError:
722                raise service_error(service_error.req,
723                        "Missing connectivity info")
724
[35aa3ae]725        # Weed out the things we aren't going to instantiate: Segments, portal
726        # substrates, and portal interfaces.  (The copy in the for loop allows
727        # us to delete from e.elements in side the for loop).  While we're
728        # touching all the elements, we also adjust paths from the original
729        # testbed to local testbed paths and put the federation commands into
730        # the start commands
731        for e in [e for e in t.elements]:
732            if isinstance(e, topdl.Segment):
733                t.elements.remove(e)
[43649f1]734            if isinstance(e, topdl.Computer):
[e76f38a]735                self.add_kit(e, self.federation_software)
[5e1fb7b]736                if e.get_attribute('portal') and self.portal_startcommand:
[9b3627e]737                    # Add local portal support software
[e76f38a]738                    self.add_kit(e, self.portal_software)
[43649f1]739                    # Portals never have a user-specified start command
[5e1fb7b]740                    e.set_attribute('startup', self.portal_startcommand)
741                elif self.node_startcommand:
[43649f1]742                    if e.get_attribute('startup'):
[d87778f]743                        e.set_attribute('startup', "%s \\$USER '%s'" % \
[5e1fb7b]744                                (self.node_startcommand, 
745                                    e.get_attribute('startup')))
[43649f1]746                    else:
[5e1fb7b]747                        e.set_attribute('startup', self.node_startcommand)
[0297248]748
[49051fb]749                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
[35aa3ae]750                # Remove portal interfaces that do not connect to DRAGON
751                e.interface = [i for i in e.interface \
[617592b]752                        if not i.get_attribute('portal') or i.name in dinf ]
[9b3627e]753            # Fix software paths
754            for s in getattr(e, 'software', []):
755                s.location = re.sub("^.*/", softdir, s.location)
[35aa3ae]756
757        t.substrates = [ s.clone() for s in t.substrates ]
758        t.incorporate_elements()
[ecca6eb]759
760        # Customize the ns2 output for local portal commands and images
761        filters = []
762
[5e1fb7b]763        if self.dragon_endpoint:
[617592b]764            add_filter = not_dragon(dragon_map)
765            filters.append(dragon_commands(dragon_map))
[69692a9]766        else:
767            add_filter = None
768
[5e1fb7b]769        if self.portal_command:
770            filters.append(topdl.generate_portal_command_filter(
771                self.portal_command, add_filter=add_filter))
[ecca6eb]772
[5e1fb7b]773        if self.portal_image:
[ecca6eb]774            filters.append(topdl.generate_portal_image_filter(
[5e1fb7b]775                self.portal_image))
[ecca6eb]776
[5e1fb7b]777        if self.portal_type:
[ecca6eb]778            filters.append(topdl.generate_portal_hardware_filter(
[5e1fb7b]779                self.portal_type))
[ecca6eb]780
781        # Convert to ns and write it out
782        expfile = topdl.topology_to_ns2(t, filters)
783        try:
784            f = open(expfn, "w")
785            print >>f, expfile
786            f.close()
[d3c8759]787        except EnvironmentError:
[ecca6eb]788            raise service_error(service_error.internal,
789                    "Cannot write experiment file %s: %s" % (expfn,e))
[f9ef40b]790
[2c1fd21]791    def export_store_info(self, cf, proj, ename, connInfo):
792        """
793        For the export requests in the connection info, install the peer names
794        at the experiment controller via SetValue calls.
795        """
796
797        for c in connInfo:
798            for p in [ p for p in c.get('parameter', []) \
799                    if p.get('type', '') == 'output']:
800
801                if p.get('name', '') == 'peer':
802                    k = p.get('key', None)
803                    surl = p.get('store', None)
804                    if surl and k and k.index('/') != -1:
805                        value = "%s.%s.%s%s" % \
806                                (k[k.index('/')+1:], ename, proj, self.domain)
807                        req = { 'name': k, 'value': value }
808                        self.log.debug("Setting %s to %s on %s" % \
809                                (k, value, surl))
810                        self.call_SetValue(surl, req, cf)
811                    else:
812                        self.log.error("Bad export request: %s" % p)
813                elif p.get('name', '') == 'ssh_port':
814                    k = p.get('key', None)
815                    surl = p.get('store', None)
816                    if surl and k:
817                        req = { 'name': k, 'value': self.ssh_port }
818                        self.log.debug("Setting %s to %s on %s" % \
819                                (k, self.ssh_port, surl))
820                        self.call_SetValue(surl, req, cf)
821                    else:
822                        self.log.error("Bad export request: %s" % p)
823                else:
824                    self.log.error("Unknown export parameter: %s" % \
825                            p.get('name'))
826                    continue
827
[49051fb]828    def add_seer_node(self, topo, name, startup):
829        """
830        Add a seer node to the given topology, with the startup command passed
[06cc65b]831        in.  Used by configure seer_services.
[49051fb]832        """
833        c_node = topdl.Computer(
834                name=name, 
835                os= topdl.OperatingSystem(
836                    attribute=[
837                    { 'attribute': 'osid', 
838                        'value': self.local_seer_image },
839                    ]),
840                attribute=[
841                    { 'attribute': 'startup', 'value': startup },
842                    ]
843                )
844        self.add_kit(c_node, self.local_seer_software)
845        topo.elements.append(c_node)
846
847    def configure_seer_services(self, services, topo, softdir):
[8cf2b90e]848        """
849        Make changes to the topology required for the seer requests being made.
850        Specifically, add any control or master nodes required and set up the
851        start commands on the nodes to interconnect them.
852        """
853        local_seer = False      # True if we need to add a control node
854        collect_seer = False    # True if there is a seer-master node
855        seer_master= False      # True if we need to add the seer-master
[49051fb]856        for s in services:
857            s_name = s.get('name', '')
858            s_vis = s.get('visibility','')
859
[8cf2b90e]860            if s_name  == 'local_seer_control' and s_vis == 'export':
[49051fb]861                local_seer = True
862            elif s_name == 'seer_master':
863                if s_vis == 'import':
864                    collect_seer = True
865                elif s_vis == 'export':
866                    seer_master = True
867       
868        # We've got the whole picture now, so add nodes if needed and configure
869        # them to interconnect properly.
870        if local_seer or seer_master:
871            # Copy local seer control node software to the tempdir
872            for l, f in self.local_seer_software:
873                base = os.path.basename(f)
874                copy_file(f, "%s/%s" % (softdir, base))
875        # If we're collecting seers somewhere the controllers need to talk to
876        # the master.  In testbeds that export the service, that will be a
877        # local node that we'll add below.  Elsewhere it will be the control
878        # portal that will port forward to the exporting master.
879        if local_seer:
880            if collect_seer:
[acaa9b9]881                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
[49051fb]882            else:
883                startup = self.local_seer_start
884            self.add_seer_node(topo, 'control', startup)
885        # If this is the seer master, add that node, too.
886        if seer_master:
[e07c8f3]887            self.add_seer_node(topo, 'seer-master', 
888                    "%s -R -n -R seer-master -R -A -R sink" % \
889                            self.seer_master_start)
[49051fb]890
[06cc65b]891    def retrieve_software(self, topo, certfile, softdir):
892        """
893        Collect the software that nodes in the topology need loaded and stage
894        it locally.  This implies retrieving it from the experiment_controller
895        and placing it into softdir.  Certfile is used to prove that this node
896        has access to that data (it's the allocation/segment fedid).  Finally
897        local portal and federation software is also copied to the same staging
898        directory for simplicity - all software needed for experiment creation
899        is in softdir.
900        """
901        sw = set()
902        for e in topo.elements:
903            for s in getattr(e, 'software', []):
904                sw.add(s.location)
905        for s in sw:
906            self.log.debug("Retrieving %s" % s)
907            try:
908                get_url(s, certfile, softdir)
909            except:
910                t, v, st = sys.exc_info()
911                raise service_error(service_error.internal,
912                        "Error retrieving %s: %s" % (s, v))
[49051fb]913
[06cc65b]914        # Copy local federation and portal node software to the tempdir
915        for s in (self.federation_software, self.portal_software):
916            for l, f in s:
917                base = os.path.basename(f)
918                copy_file(f, "%s/%s" % (softdir, base))
[49051fb]919
[6c57fe9]920
[06cc65b]921    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
922        """
923        Gather common configuration files, retrieve or create an experiment
924        name and project name, and return the ssh_key filenames.  Create an
925        allocation log bound to the state log variable as well.
926        """
[3df9b33]927        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
928                'seer_ca_pem', 'seer_node_pem')
[06cc65b]929        ename = None
930        pubkey_base = None
931        secretkey_base = None
932        proj = None
933        user = None
934        alloc_log = None
[262328f]935        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
[06cc65b]936
[f3898f7]937        self.state_lock.acquire()
938        if aid in self.allocation:
939            proj = self.allocation[aid].get('project', None)
940            if not proj: 
941                proj = self.allocation[aid].get('sproject', None)
942        self.state_lock.release()
943
944        if not proj:
945            raise service_error(service_error.internal, 
946                    "Can't find project for %s" %aid)
947
[06cc65b]948        for a in attrs:
949            if a['attribute'] in configs:
950                try:
951                    self.log.debug("Retrieving %s from %s" % \
952                            (a['attribute'], a['value']))
953                    get_url(a['value'], certfile, tmpdir)
954                except:
955                    t, v, st = sys.exc_info()
956                    raise service_error(service_error.internal,
957                            "Error retrieving %s: %s" % (a.get('value', ""), v))
958            if a['attribute'] == 'ssh_pubkey':
959                pubkey_base = a['value'].rpartition('/')[2]
960            if a['attribute'] == 'ssh_secretkey':
961                secretkey_base = a['value'].rpartition('/')[2]
962            if a['attribute'] == 'experiment_name':
963                ename = a['value']
964
[f3898f7]965        # Names longer than the emulab max are discarded
966        # Projects with a group require nonce experiment names as well
967        if ename and len(ename) <= self.max_name_len and '/' not in proj:
[262328f]968            # Clean up the experiment name so that emulab will accept it.
969            ename = re.sub(vchars_re, '-', ename)
970
971        else:
[06cc65b]972            ename = ""
973            for i in range(0,5):
974                ename += random.choice(string.ascii_letters)
[53b5c18]975            self.log.warn("No experiment name or suggestion too long: " + \
976                    "picked one randomly: %s" % ename)
[06cc65b]977
978        if not pubkey_base:
979            raise service_error(service_error.req, 
980                    "No public key attribute")
981
982        if not secretkey_base:
983            raise service_error(service_error.req, 
984                    "No secret key attribute")
[6c57fe9]985
[06cc65b]986        self.state_lock.acquire()
987        if aid in self.allocation:
988            user = self.allocation[aid].get('user', None)
989            self.allocation[aid]['experiment'] = ename
990            self.allocation[aid]['log'] = [ ]
991            # Create a logger that logs to the experiment's state object as
992            # well as to the main log file.
993            alloc_log = logging.getLogger('fedd.access.%s' % ename)
994            h = logging.StreamHandler(
995                    list_log.list_log(self.allocation[aid]['log']))
996            # XXX: there should be a global one of these rather than
997            # repeating the code.
998            h.setFormatter(logging.Formatter(
999                "%(asctime)s %(name)s %(message)s",
1000                        '%d %b %y %H:%M:%S'))
1001            alloc_log.addHandler(h)
1002            self.write_state()
1003        self.state_lock.release()
1004
1005        if not user:
1006            raise service_error(service_error.internal, 
1007                    "Can't find creation user for %s" %aid)
1008
1009        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1010
[e83f2f2]1011    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
[06cc65b]1012        """
1013        Store key bits of experiment state in the global repository, including
1014        the response that may need to be replayed, and return the response.
1015        """
1016        # Copy the assigned names into the return topology
1017        embedding = [ ]
1018        for n in starter.node:
1019            embedding.append({ 
1020                'toponame': n,
1021                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1022                })
1023        # Grab the log (this is some anal locking, but better safe than
1024        # sorry)
1025        self.state_lock.acquire()
1026        logv = "".join(self.allocation[aid]['log'])
1027        # It's possible that the StartSegment call gets retried (!).
1028        # if the 'started' key is in the allocation, we'll return it rather
1029        # than redo the setup.
1030        self.allocation[aid]['started'] = { 
1031                'allocID': alloc_id,
1032                'allocationLog': logv,
1033                'segmentdescription': { 
1034                    'topdldescription': topo.clone().to_dict()
1035                    },
[e83f2f2]1036                'embedding': embedding,
1037                'proof': proof.to_dict(),
[06cc65b]1038                }
1039        retval = copy.copy(self.allocation[aid]['started'])
1040        self.write_state()
1041        self.state_lock.release()
1042        return retval
1043   
1044    # End of StartSegment support routines
1045
1046    def StartSegment(self, req, fid):
[b770aa0]1047        err = None  # Any service_error generated after tmpdir is created
1048        rv = None   # Return value from segment creation
1049
[cc8d8e9]1050        try:
1051            req = req['StartSegmentRequestBody']
[06cc65b]1052            auth_attr = req['allocID']['fedid']
1053            topref = req['segmentdescription']['topdldescription']
[cc8d8e9]1054        except KeyError:
1055            raise service_error(server_error.req, "Badly formed request")
[ecca6eb]1056
[e02cd14]1057        connInfo = req.get('connection', [])
1058        services = req.get('service', [])
[ecca6eb]1059        aid = "%s" % auth_attr
[6c57fe9]1060        attrs = req.get('fedAttr', [])
[e83f2f2]1061
1062        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1063                with_proof=True)
1064        if not access_ok:
[ecca6eb]1065            raise service_error(service_error.access, "Access denied")
[cd06678]1066        else:
1067            # See if this is a replay of an earlier succeeded StartSegment -
1068            # sometimes SSL kills 'em.  If so, replay the response rather than
1069            # redoing the allocation.
1070            self.state_lock.acquire()
1071            retval = self.allocation[aid].get('started', None)
1072            self.state_lock.release()
1073            if retval:
1074                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1075                        "replaying response")
1076                return retval
1077
1078        # A new request.  Do it.
[6c57fe9]1079
[06cc65b]1080        if topref: topo = topdl.Topology(**topref)
[6c57fe9]1081        else:
1082            raise service_error(service_error.req, 
1083                    "Request missing segmentdescription'")
[2761484]1084       
[6c57fe9]1085        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1086        try:
1087            tmpdir = tempfile.mkdtemp(prefix="access-")
[ecca6eb]1088            softdir = "%s/software" % tmpdir
[c23684d]1089            os.mkdir(softdir)
[d3c8759]1090        except EnvironmentError:
[6c57fe9]1091            raise service_error(service_error.internal, "Cannot create tmp dir")
1092
[b770aa0]1093        # Try block alllows us to clean up temporary files.
1094        try:
[06cc65b]1095            self.retrieve_software(topo, certfile, softdir)
1096            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1097                    self.initialize_experiment_info(attrs, aid, 
1098                            certfile, tmpdir)
[9b3627e]1099
[f3898f7]1100            if '/' in proj: proj, gid = proj.split('/')
1101            else: gid = None
1102
1103
[06cc65b]1104            # Set up userconf and seer if needed
[c200d36]1105            self.configure_userconf(services, tmpdir)
[49051fb]1106            self.configure_seer_services(services, topo, softdir)
[06cc65b]1107            # Get and send synch store variables
[2761484]1108            self.export_store_info(certfile, proj, ename, connInfo)
1109            self.import_store_info(certfile, connInfo)
1110
[b770aa0]1111            expfile = "%s/experiment.tcl" % tmpdir
1112
1113            self.generate_portal_configs(topo, pubkey_base, 
[06cc65b]1114                    secretkey_base, tmpdir, proj, ename, connInfo, services)
[b770aa0]1115            self.generate_ns2(topo, expfile, 
[8cf2b90e]1116                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
[f07fa49]1117
[b770aa0]1118            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
[181aeb4]1119                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1120                    cert=self.xmlrpc_cert)
[f3898f7]1121            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
[b770aa0]1122        except service_error, e:
1123            err = e
[06cc65b]1124        except:
1125            t, v, st = sys.exc_info()
1126            err = service_error(service_error.internal, "%s: %s" % \
1127                    (v, traceback.extract_tb(st)))
[b770aa0]1128
[574055e]1129        # Walk up tmpdir, deleting as we go
[06cc65b]1130        if self.cleanup: self.remove_dirs(tmpdir)
1131        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
[574055e]1132
[fd556d1]1133        if rv:
[e83f2f2]1134            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1135                    proof)
[b770aa0]1136        elif err:
1137            raise service_error(service_error.federant,
1138                    "Swapin failed: %s" % err)
[fd556d1]1139        else:
1140            raise service_error(service_error.federant, "Swapin failed")
[5ae3857]1141
1142    def TerminateSegment(self, req, fid):
1143        try:
1144            req = req['TerminateSegmentRequestBody']
1145        except KeyError:
1146            raise service_error(server_error.req, "Badly formed request")
1147
1148        auth_attr = req['allocID']['fedid']
1149        aid = "%s" % auth_attr
1150        attrs = req.get('fedAttr', [])
[e83f2f2]1151
1152        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1153                with_proof=True)
1154        if not access_ok:
[5ae3857]1155            raise service_error(service_error.access, "Access denied")
1156
1157        self.state_lock.acquire()
[06cc65b]1158        if aid in self.allocation:
[5ae3857]1159            proj = self.allocation[aid].get('project', None)
1160            if not proj: 
1161                proj = self.allocation[aid].get('sproject', None)
1162            user = self.allocation[aid].get('user', None)
1163            ename = self.allocation[aid].get('experiment', None)
[1d913e13]1164        else:
1165            proj = None
1166            user = None
1167            ename = None
[5ae3857]1168        self.state_lock.release()
1169
1170        if not proj:
1171            raise service_error(service_error.internal, 
1172                    "Can't find project for %s" % aid)
[f3898f7]1173        else:
1174            if '/' in proj: proj, gid = proj.split('/')
1175            else: gid = None
[5ae3857]1176
1177        if not user:
1178            raise service_error(service_error.internal, 
1179                    "Can't find creation user for %s" % aid)
1180        if not ename:
1181            raise service_error(service_error.internal, 
1182                    "Can't find experiment name for %s" % aid)
[fd556d1]1183        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
[181aeb4]1184                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
[f3898f7]1185        stopper(self, user, proj, ename, gid)
[e83f2f2]1186        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.