source: fedd/federation/emulab_access.py @ c65b7e4

axis_examplecompt_changesinfo-ops
Last change on this file since c65b7e4 was c65b7e4, checked in by Ted Faber <faber@…>, 13 years ago

Access controllers delete (some) unused ABAC attrs.

  • Property mode set to 100644
File size: 38.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        # XXX: this should go?
107        #if config.has_option("access", "accessdb"):
108        #    self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'legacy'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'legacy':
120            self.access = { }
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            self.access = [ ]
126            if accessdb:
127                self.read_access(accessdb, self.access_tuple)
128        else:
129            raise service_error(service_error.internal, 
130                    "Unknown auth_type: %s" % self.auth_type)
131
132        # read_state in the base_class
133        self.state_lock.acquire()
134        for a  in ('allocation', 'projects', 'keys', 'types'):
135            if a not in self.state:
136                self.state[a] = { }
137        self.allocation = self.state['allocation']
138        self.projects = self.state['projects']
139        self.keys = self.state['keys']
140        self.types = self.state['types']
141        if self.auth_type == "legacy": 
142            # Add the ownership attributes to the authorizer.  Note that the
143            # indices of the allocation dict are strings, but the attributes are
144            # fedids, so there is a conversion.
145            for k in self.allocation.keys():
146                for o in self.allocation[k].get('owners', []):
147                    self.auth.set_attribute(o, fedid(hexstr=k))
148                if self.allocation[k].has_key('userconfig'):
149                    sfid = self.allocation[k]['userconfig']
150                    fid = fedid(hexstr=sfid)
151                    self.auth.set_attribute(fid, "/%s" % sfid)
152        self.state_lock.release()
153        self.exports = {
154                'SMB': self.export_SMB,
155                'seer': self.export_seer,
156                'tmcd': self.export_tmcd,
157                'userconfig': self.export_userconfig,
158                'project_export': self.export_project_export,
159                'local_seer_control': self.export_local_seer,
160                'seer_master': self.export_seer_master,
161                'hide_hosts': self.export_hide_hosts,
162                }
163
164        if not self.local_seer_image or not self.local_seer_software or \
165                not self.local_seer_start:
166            if 'local_seer_control' in self.exports:
167                del self.exports['local_seer_control']
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.seer_master_start:
171            if 'seer_master' in self.exports:
172                del self.exports['seer_master']
173
174
175        self.soap_services = {\
176            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
177            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
178            'StartSegment': soap_handler("StartSegment", self.StartSegment),
179            'TerminateSegment': soap_handler("TerminateSegment",
180                self.TerminateSegment),
181            }
182        self.xmlrpc_services =  {\
183            'RequestAccess': xmlrpc_handler('RequestAccess',
184                self.RequestAccess),
185            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
186                self.ReleaseAccess),
187            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
188            'TerminateSegment': xmlrpc_handler('TerminateSegment',
189                self.TerminateSegment),
190            }
191
192        self.call_SetValue = service_caller('SetValue')
193        self.call_GetValue = service_caller('GetValue', log=self.log)
194
195        if not config.has_option("allocate", "uri"):
196            self.allocate_project = \
197                allocate_project_local(config, auth)
198        else:
199            self.allocate_project = \
200                allocate_project_remote(config, auth)
201
202
203        # If the project allocator exports services, put them in this object's
204        # maps so that classes that instantiate this can call the services.
205        self.soap_services.update(self.allocate_project.soap_services)
206        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
207
208    @staticmethod
209    def legacy_access_tuple(str):
210        """
211        Convert a string of the form (id[:resources:resouces], id, id) into a
212        tuple of the form (project, user, user) where users may be names or
213        fedids.  The resources strings are obsolete and ignored.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = first[0]
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def access_tuple(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            # The slice takes the parens off the string.
250            proj, user = str[1:-1].split(',')
251            return (proj.strip(), user.strip(), user.strip())
252        else:
253            raise self.parse_error(
254                    'Bad mapping (unbalanced parens or more than 1 comma)')
255
256    # RequestAccess support routines
257
258    def legacy_lookup_access(self, req, fid):
259        """
260        Look up the local access control information mapped to this fedid and
261        credentials.  In this case it is a (project, create_user, access_user)
262        triple, and a triple of three booleans indicating which, if any will
263        need to be dynamically created.  Finally a list of owners for that
264        allocation is returned.
265
266        lookup_access_base pulls the first triple out, and it is parsed by this
267        routine into the boolean map.  Owners is always the controlling fedid.
268        """
269        # Return values
270        rp = None
271        ru = None
272        # This maps a valid user to the Emulab projects and users to use
273        found, match = self.legacy_lookup_access_base(req, fid)
274        tb, project, user = match
275       
276        if found == None:
277            raise service_error(service_error.access,
278                    "Access denied - cannot map access")
279
280        # resolve <dynamic> and <same> in found
281        dyn_proj = False
282        dyn_create_user = False
283        dyn_service_user = False
284
285        if found[0] == "<same>":
286            if project != None:
287                rp = project
288            else : 
289                raise service_error(\
290                        service_error.server_config,
291                        "Project matched <same> when no project given")
292        elif found[0] == "<dynamic>":
293            rp = None
294            dyn_proj = True
295        else:
296            rp = found[0]
297
298        if found[1] == "<same>":
299            if user_match == "<any>":
300                if user != None: rcu = user[0]
301                else: raise service_error(\
302                        service_error.server_config,
303                        "Matched <same> on anonymous request")
304            else:
305                rcu = user_match
306        elif found[1] == "<dynamic>":
307            rcu = None
308            dyn_create_user = True
309        else:
310            rcu = found[1]
311       
312        if found[2] == "<same>":
313            if user_match == "<any>":
314                if user != None: rsu = user[0]
315                else: raise service_error(\
316                        service_error.server_config,
317                        "Matched <same> on anonymous request")
318            else:
319                rsu = user_match
320        elif found[2] == "<dynamic>":
321            rsu = None
322            dyn_service_user = True
323        else:
324            rsu = found[2]
325
326        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
327                [ fid ]
328
329    def do_project_allocation(self, dyn, project, user):
330        """
331        Call the project allocation routines and return the info.
332        """
333        if dyn: 
334            # Compose the dynamic project request
335            # (only dynamic, dynamic currently allowed)
336            preq = { 'AllocateProjectRequestBody': \
337                        { 'project' : {\
338                            'user': [ \
339                            { \
340                                'access': [ { 
341                                    'sshPubkey': self.ssh_pubkey_file } ],
342                                 'role': "serviceAccess",\
343                            }, \
344                            { \
345                                'access': [ { 
346                                    'sshPubkey': self.ssh_pubkey_file } ],
347                                 'role': "experimentCreation",\
348                            }, \
349                            ], \
350                            }\
351                        }\
352                    }
353            return self.allocate_project.dynamic_project(preq)
354        else:
355            preq = {'StaticProjectRequestBody' : \
356                    { 'project': \
357                        { 'name' : { 'localname' : project },\
358                          'user' : [ \
359                            {\
360                                'userID': { 'localname' : user }, \
361                                'access': [ { 
362                                    'sshPubkey': self.ssh_pubkey_file } ],
363                                'role': 'experimentCreation'\
364                            },\
365                            {\
366                                'userID': { 'localname' : user}, \
367                                'access': [ { 
368                                    'sshPubkey': self.ssh_pubkey_file } ],
369                                'role': 'serviceAccess'\
370                            },\
371                        ]}\
372                    }\
373            }
374            return self.allocate_project.static_project(preq)
375
376    def save_project_state(self, aid, ap, dyn, owners):
377        """
378        Parse out and save the information relevant to the project created for
379        this experiment.  That info is largely in ap and owners.  dyn indicates
380        that the project was created dynamically.  Return the user and project
381        names.
382        """
383        self.state_lock.acquire()
384        self.allocation[aid] = { }
385        self.allocation[aid]['auth'] = set()
386        try:
387            pname = ap['project']['name']['localname']
388        except KeyError:
389            pname = None
390
391        if dyn:
392            if not pname:
393                self.state_lock.release()
394                raise service_error(service_error.internal,
395                        "Misformed allocation response?")
396            if pname in self.projects: self.projects[pname] += 1
397            else: self.projects[pname] = 1
398            self.allocation[aid]['project'] = pname
399        else:
400            # sproject is a static project associated with this allocation.
401            self.allocation[aid]['sproject'] = pname
402
403        self.allocation[aid]['keys'] = [ ]
404
405        try:
406            for u in ap['project']['user']:
407                uname = u['userID']['localname']
408                if u['role'] == 'experimentCreation':
409                    self.allocation[aid]['user'] = uname
410                for k in [ k['sshPubkey'] for k in u['access'] \
411                        if k.has_key('sshPubkey') ]:
412                    kv = "%s:%s" % (uname, k)
413                    if self.keys.has_key(kv): self.keys[kv] += 1
414                    else: self.keys[kv] = 1
415                    self.allocation[aid]['keys'].append((uname, k))
416        except KeyError:
417            self.state_lock.release()
418            raise service_error(service_error.internal,
419                    "Misformed allocation response?")
420
421        self.allocation[aid]['owners'] = owners
422        self.write_state()
423        self.state_lock.release()
424        return (pname, uname)
425
426    # End of RequestAccess support routines
427
428    def RequestAccess(self, req, fid):
429        """
430        Handle the access request.  Proxy if not for us.
431
432        Parse out the fields and make the allocations or rejections if for us,
433        otherwise, assuming we're willing to proxy, proxy the request out.
434        """
435
436        def gateway_hardware(h):
437            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
438            else: return h
439
440        def get_export_project(svcs):
441            """
442            if the service requests includes one to export a project, return
443            that project.
444            """
445            rv = None
446            for s in svcs:
447                if s.get('name', '') == 'project_export' and \
448                        s.get('visibility', '') == 'export':
449                    if not rv: 
450                        for a in s.get('fedAttr', []):
451                            if a.get('attribute', '') == 'project' \
452                                    and 'value' in a:
453                                rv = a['value']
454                    else:
455                        raise service_error(service_error, access, 
456                                'Requesting multiple project exports is ' + \
457                                        'not supported');
458            return rv
459
460        # The dance to get into the request body
461        if req.has_key('RequestAccessRequestBody'):
462            req = req['RequestAccessRequestBody']
463        else:
464            raise service_error(service_error.req, "No request!?")
465
466        # if this includes a project export request, construct a filter such
467        # that only the ABAC attributes mapped to that project are checked for
468        # access.
469        if 'service' in req:
470            ep = get_export_project(req['service'])
471            if ep: pf = lambda(a): a.value[0] == ep
472            else: pf = None
473        else:
474            ep = None
475            pf = None
476
477        if self.auth.import_credentials(
478                data_list=req.get('abac_credential', [])):
479            self.auth.save()
480
481        if self.auth_type == "legacy":
482            found, dyn, owners = self.legacy_lookup_access(req, fid)
483        elif self.auth_type == 'abac':
484            found, dyn, owners = self.lookup_access(req, fid, filter=pf)
485        else:
486            raise service_error(service_error.internal, 
487                    'Unknown auth_type: %s' % self.auth_type)
488        ap = None
489
490        # This only happens in legacy lookups, but if this user has access to
491        # the testbed but not the project to be exported, raise the error.
492        if ep and ep != found[0]:
493            raise service_error(service_error.access,
494                    "Cannot export %s" % ep)
495
496        if self.ssh_pubkey_file:
497            ap = self.do_project_allocation(dyn[1], found[0], found[1])
498        else:
499            raise service_error(service_error.internal, 
500                    "SSH access parameters required")
501        # keep track of what's been added
502        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
503        aid = unicode(allocID)
504
505        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
506
507        services, svc_state = self.export_services(req.get('service',[]),
508                pname, uname)
509        self.state_lock.acquire()
510        # Store services state in global state
511        for k, v in svc_state.items():
512            self.allocation[aid][k] = v
513        self.append_allocation_authorization(aid, 
514                set([(o, allocID) for o in owners]), state_attr='allocation')
515        self.write_state()
516        self.state_lock.release()
517        try:
518            f = open("%s/%s.pem" % (self.certdir, aid), "w")
519            print >>f, alloc_cert
520            f.close()
521        except EnvironmentError, e:
522            raise service_error(service_error.internal, 
523                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
524        resp = self.build_access_response({ 'fedid': allocID } ,
525                ap, services)
526        return resp
527
528    def do_release_project(self, del_project, del_users, del_types):
529        """
530        If a project and users has to be deleted, make the call.
531        """
532        msg = { 'project': { }}
533        if del_project:
534            msg['project']['name']= {'localname': del_project}
535        users = [ ]
536        for u in del_users.keys():
537            users.append({ 'userID': { 'localname': u },\
538                'access' :  \
539                        [ {'sshPubkey' : s } for s in del_users[u]]\
540            })
541        if users: 
542            msg['project']['user'] = users
543        if len(del_types) > 0:
544            msg['resources'] = { 'node': \
545                    [ {'hardware': [ h ] } for h in del_types ]\
546                }
547        if self.allocate_project.release_project:
548            msg = { 'ReleaseProjectRequestBody' : msg}
549            self.allocate_project.release_project(msg)
550
551    def ReleaseAccess(self, req, fid):
552        # The dance to get into the request body
553        if req.has_key('ReleaseAccessRequestBody'):
554            req = req['ReleaseAccessRequestBody']
555        else:
556            raise service_error(service_error.req, "No request!?")
557
558        try:
559            if req['allocID'].has_key('localname'):
560                auth_attr = aid = req['allocID']['localname']
561            elif req['allocID'].has_key('fedid'):
562                aid = unicode(req['allocID']['fedid'])
563                auth_attr = req['allocID']['fedid']
564            else:
565                raise service_error(service_error.req,
566                        "Only localnames and fedids are understood")
567        except KeyError:
568            raise service_error(service_error.req, "Badly formed request")
569
570        self.log.debug("[access] deallocation requested for %s by %s" % \
571                (aid, fid))
572        if not self.auth.check_attribute(fid, auth_attr):
573            self.log.debug("[access] deallocation denied for %s", aid)
574            raise service_error(service_error.access, "Access Denied")
575
576        # If we know this allocation, reduce the reference counts and
577        # remove the local allocations.  Otherwise report an error.  If
578        # there is an allocation to delete, del_users will be a dictonary
579        # of sets where the key is the user that owns the keys in the set.
580        # We use a set to avoid duplicates.  del_project is just the name
581        # of any dynamic project to delete.  We're somewhat lazy about
582        # deleting authorization attributes.  Having access to something
583        # that doesn't exist isn't harmful.
584        del_users = { }
585        del_project = None
586        del_types = set()
587
588        self.state_lock.acquire()
589        if aid in self.allocation:
590            self.log.debug("Found allocation for %s" %aid)
591            self.clear_allocation_authorization(aid, state_attr='allocation')
592            for k in self.allocation[aid]['keys']:
593                kk = "%s:%s" % k
594                self.keys[kk] -= 1
595                if self.keys[kk] == 0:
596                    if not del_users.has_key(k[0]):
597                        del_users[k[0]] = set()
598                    del_users[k[0]].add(k[1])
599                    del self.keys[kk]
600
601            if 'project' in self.allocation[aid]:
602                pname = self.allocation[aid]['project']
603                self.projects[pname] -= 1
604                if self.projects[pname] == 0:
605                    del_project = pname
606                    del self.projects[pname]
607
608            if 'types' in self.allocation[aid]:
609                for t in self.allocation[aid]['types']:
610                    self.types[t] -= 1
611                    if self.types[t] == 0:
612                        if not del_project: del_project = t[0]
613                        del_types.add(t[1])
614                        del self.types[t]
615
616            del self.allocation[aid]
617            self.write_state()
618            self.state_lock.release()
619            # If we actually have resources to deallocate, prepare the call.
620            if del_project or del_users:
621                self.do_release_project(del_project, del_users, del_types)
622            # And remove the access cert
623            cf = "%s/%s.pem" % (self.certdir, aid)
624            self.log.debug("Removing %s" % cf)
625            os.remove(cf)
626            return { 'allocID': req['allocID'] } 
627        else:
628            self.state_lock.release()
629            raise service_error(service_error.req, "No such allocation")
630
631    # These are subroutines for StartSegment
632    def generate_ns2(self, topo, expfn, softdir, connInfo):
633        """
634        Convert topo into an ns2 file, decorated with appropriate commands for
635        the particular testbed setup.  Convert all requests for software, etc
636        to point at the staged copies on this testbed and add the federation
637        startcommands.
638        """
639        class dragon_commands:
640            """
641            Functor to spit out approrpiate dragon commands for nodes listed in
642            the connectivity description.  The constructor makes a dict mapping
643            dragon nodes to their parameters and the __call__ checks each
644            element in turn for membership.
645            """
646            def __init__(self, map):
647                self.node_info = map
648
649            def __call__(self, e):
650                s = ""
651                if isinstance(e, topdl.Computer):
652                    if self.node_info.has_key(e.name):
653                        info = self.node_info[e.name]
654                        for ifname, vlan, type in info:
655                            for i in e.interface:
656                                if i.name == ifname:
657                                    addr = i.get_attribute('ip4_address')
658                                    subs = i.substrate[0]
659                                    break
660                            else:
661                                raise service_error(service_error.internal,
662                                        "No interface %s on element %s" % \
663                                                (ifname, e.name))
664                            # XXX: do netmask right
665                            if type =='link':
666                                s = ("tb-allow-external ${%s} " + \
667                                        "dragonportal ip %s vlan %s " + \
668                                        "netmask 255.255.255.0\n") % \
669                                        (topdl.to_tcl_name(e.name), addr, vlan)
670                            elif type =='lan':
671                                s = ("tb-allow-external ${%s} " + \
672                                        "dragonportal " + \
673                                        "ip %s vlan %s usurp %s\n") % \
674                                        (topdl.to_tcl_name(e.name), addr, 
675                                                vlan, subs)
676                            else:
677                                raise service_error(service_error_internal,
678                                        "Unknown DRAGON type %s" % type)
679                return s
680
681        class not_dragon:
682            """
683            Return true if a node is in the given map of dragon nodes.
684            """
685            def __init__(self, map):
686                self.nodes = set(map.keys())
687
688            def __call__(self, e):
689                return e.name not in self.nodes
690
691        # Main line of generate_ns2
692        t = topo.clone()
693
694        # Create the map of nodes that need direct connections (dragon
695        # connections) from the connInfo
696        dragon_map = { }
697        for i in [ i for i in connInfo if i['type'] == 'transit']:
698            for a in i.get('fedAttr', []):
699                if a['attribute'] == 'vlan_id':
700                    vlan = a['value']
701                    break
702            else:
703                raise service_error(service_error.internal, "No vlan tag")
704            members = i.get('member', [])
705            if len(members) > 1: type = 'lan'
706            else: type = 'link'
707
708            try:
709                for m in members:
710                    if m['element'] in dragon_map:
711                        dragon_map[m['element']].append(( m['interface'], 
712                            vlan, type))
713                    else:
714                        dragon_map[m['element']] = [( m['interface'], 
715                            vlan, type),]
716            except KeyError:
717                raise service_error(service_error.req,
718                        "Missing connectivity info")
719
720        # Weed out the things we aren't going to instantiate: Segments, portal
721        # substrates, and portal interfaces.  (The copy in the for loop allows
722        # us to delete from e.elements in side the for loop).  While we're
723        # touching all the elements, we also adjust paths from the original
724        # testbed to local testbed paths and put the federation commands into
725        # the start commands
726        for e in [e for e in t.elements]:
727            if isinstance(e, topdl.Segment):
728                t.elements.remove(e)
729            if isinstance(e, topdl.Computer):
730                self.add_kit(e, self.federation_software)
731                if e.get_attribute('portal') and self.portal_startcommand:
732                    # Add local portal support software
733                    self.add_kit(e, self.portal_software)
734                    # Portals never have a user-specified start command
735                    e.set_attribute('startup', self.portal_startcommand)
736                elif self.node_startcommand:
737                    if e.get_attribute('startup'):
738                        e.set_attribute('startup', "%s \\$USER '%s'" % \
739                                (self.node_startcommand, 
740                                    e.get_attribute('startup')))
741                    else:
742                        e.set_attribute('startup', self.node_startcommand)
743
744                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
745                # Remove portal interfaces that do not connect to DRAGON
746                e.interface = [i for i in e.interface \
747                        if not i.get_attribute('portal') or i.name in dinf ]
748            # Fix software paths
749            for s in getattr(e, 'software', []):
750                s.location = re.sub("^.*/", softdir, s.location)
751
752        t.substrates = [ s.clone() for s in t.substrates ]
753        t.incorporate_elements()
754
755        # Customize the ns2 output for local portal commands and images
756        filters = []
757
758        if self.dragon_endpoint:
759            add_filter = not_dragon(dragon_map)
760            filters.append(dragon_commands(dragon_map))
761        else:
762            add_filter = None
763
764        if self.portal_command:
765            filters.append(topdl.generate_portal_command_filter(
766                self.portal_command, add_filter=add_filter))
767
768        if self.portal_image:
769            filters.append(topdl.generate_portal_image_filter(
770                self.portal_image))
771
772        if self.portal_type:
773            filters.append(topdl.generate_portal_hardware_filter(
774                self.portal_type))
775
776        # Convert to ns and write it out
777        expfile = topdl.topology_to_ns2(t, filters)
778        try:
779            f = open(expfn, "w")
780            print >>f, expfile
781            f.close()
782        except EnvironmentError:
783            raise service_error(service_error.internal,
784                    "Cannot write experiment file %s: %s" % (expfn,e))
785
786    def export_store_info(self, cf, proj, ename, connInfo):
787        """
788        For the export requests in the connection info, install the peer names
789        at the experiment controller via SetValue calls.
790        """
791
792        for c in connInfo:
793            for p in [ p for p in c.get('parameter', []) \
794                    if p.get('type', '') == 'output']:
795
796                if p.get('name', '') == 'peer':
797                    k = p.get('key', None)
798                    surl = p.get('store', None)
799                    if surl and k and k.index('/') != -1:
800                        value = "%s.%s.%s%s" % \
801                                (k[k.index('/')+1:], ename, proj, self.domain)
802                        req = { 'name': k, 'value': value }
803                        self.log.debug("Setting %s to %s on %s" % \
804                                (k, value, surl))
805                        self.call_SetValue(surl, req, cf)
806                    else:
807                        self.log.error("Bad export request: %s" % p)
808                elif p.get('name', '') == 'ssh_port':
809                    k = p.get('key', None)
810                    surl = p.get('store', None)
811                    if surl and k:
812                        req = { 'name': k, 'value': self.ssh_port }
813                        self.log.debug("Setting %s to %s on %s" % \
814                                (k, self.ssh_port, surl))
815                        self.call_SetValue(surl, req, cf)
816                    else:
817                        self.log.error("Bad export request: %s" % p)
818                else:
819                    self.log.error("Unknown export parameter: %s" % \
820                            p.get('name'))
821                    continue
822
823    def add_seer_node(self, topo, name, startup):
824        """
825        Add a seer node to the given topology, with the startup command passed
826        in.  Used by configure seer_services.
827        """
828        c_node = topdl.Computer(
829                name=name, 
830                os= topdl.OperatingSystem(
831                    attribute=[
832                    { 'attribute': 'osid', 
833                        'value': self.local_seer_image },
834                    ]),
835                attribute=[
836                    { 'attribute': 'startup', 'value': startup },
837                    ]
838                )
839        self.add_kit(c_node, self.local_seer_software)
840        topo.elements.append(c_node)
841
842    def configure_seer_services(self, services, topo, softdir):
843        """
844        Make changes to the topology required for the seer requests being made.
845        Specifically, add any control or master nodes required and set up the
846        start commands on the nodes to interconnect them.
847        """
848        local_seer = False      # True if we need to add a control node
849        collect_seer = False    # True if there is a seer-master node
850        seer_master= False      # True if we need to add the seer-master
851        for s in services:
852            s_name = s.get('name', '')
853            s_vis = s.get('visibility','')
854
855            if s_name  == 'local_seer_control' and s_vis == 'export':
856                local_seer = True
857            elif s_name == 'seer_master':
858                if s_vis == 'import':
859                    collect_seer = True
860                elif s_vis == 'export':
861                    seer_master = True
862       
863        # We've got the whole picture now, so add nodes if needed and configure
864        # them to interconnect properly.
865        if local_seer or seer_master:
866            # Copy local seer control node software to the tempdir
867            for l, f in self.local_seer_software:
868                base = os.path.basename(f)
869                copy_file(f, "%s/%s" % (softdir, base))
870        # If we're collecting seers somewhere the controllers need to talk to
871        # the master.  In testbeds that export the service, that will be a
872        # local node that we'll add below.  Elsewhere it will be the control
873        # portal that will port forward to the exporting master.
874        if local_seer:
875            if collect_seer:
876                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
877            else:
878                startup = self.local_seer_start
879            self.add_seer_node(topo, 'control', startup)
880        # If this is the seer master, add that node, too.
881        if seer_master:
882            self.add_seer_node(topo, 'seer-master', 
883                    "%s -R -n -R seer-master -R -A -R sink" % \
884                            self.seer_master_start)
885
886    def retrieve_software(self, topo, certfile, softdir):
887        """
888        Collect the software that nodes in the topology need loaded and stage
889        it locally.  This implies retrieving it from the experiment_controller
890        and placing it into softdir.  Certfile is used to prove that this node
891        has access to that data (it's the allocation/segment fedid).  Finally
892        local portal and federation software is also copied to the same staging
893        directory for simplicity - all software needed for experiment creation
894        is in softdir.
895        """
896        sw = set()
897        for e in topo.elements:
898            for s in getattr(e, 'software', []):
899                sw.add(s.location)
900        for s in sw:
901            self.log.debug("Retrieving %s" % s)
902            try:
903                get_url(s, certfile, softdir)
904            except:
905                t, v, st = sys.exc_info()
906                raise service_error(service_error.internal,
907                        "Error retrieving %s: %s" % (s, v))
908
909        # Copy local federation and portal node software to the tempdir
910        for s in (self.federation_software, self.portal_software):
911            for l, f in s:
912                base = os.path.basename(f)
913                copy_file(f, "%s/%s" % (softdir, base))
914
915
916    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
917        """
918        Gather common configuration files, retrieve or create an experiment
919        name and project name, and return the ssh_key filenames.  Create an
920        allocation log bound to the state log variable as well.
921        """
922        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
923        ename = None
924        pubkey_base = None
925        secretkey_base = None
926        proj = None
927        user = None
928        alloc_log = None
929        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
930
931        for a in attrs:
932            if a['attribute'] in configs:
933                try:
934                    self.log.debug("Retrieving %s from %s" % \
935                            (a['attribute'], a['value']))
936                    get_url(a['value'], certfile, tmpdir)
937                except:
938                    t, v, st = sys.exc_info()
939                    raise service_error(service_error.internal,
940                            "Error retrieving %s: %s" % (a.get('value', ""), v))
941            if a['attribute'] == 'ssh_pubkey':
942                pubkey_base = a['value'].rpartition('/')[2]
943            if a['attribute'] == 'ssh_secretkey':
944                secretkey_base = a['value'].rpartition('/')[2]
945            if a['attribute'] == 'experiment_name':
946                ename = a['value']
947
948        if ename:
949            # Clean up the experiment name so that emulab will accept it.
950            ename = re.sub(vchars_re, '-', ename)
951
952        else:
953            ename = ""
954            for i in range(0,5):
955                ename += random.choice(string.ascii_letters)
956            self.log.warn("No experiment name: picked one randomly: %s" \
957                    % ename)
958
959        if not pubkey_base:
960            raise service_error(service_error.req, 
961                    "No public key attribute")
962
963        if not secretkey_base:
964            raise service_error(service_error.req, 
965                    "No secret key attribute")
966
967        self.state_lock.acquire()
968        if aid in self.allocation:
969            proj = self.allocation[aid].get('project', None)
970            if not proj: 
971                proj = self.allocation[aid].get('sproject', None)
972            user = self.allocation[aid].get('user', None)
973            self.allocation[aid]['experiment'] = ename
974            self.allocation[aid]['log'] = [ ]
975            # Create a logger that logs to the experiment's state object as
976            # well as to the main log file.
977            alloc_log = logging.getLogger('fedd.access.%s' % ename)
978            h = logging.StreamHandler(
979                    list_log.list_log(self.allocation[aid]['log']))
980            # XXX: there should be a global one of these rather than
981            # repeating the code.
982            h.setFormatter(logging.Formatter(
983                "%(asctime)s %(name)s %(message)s",
984                        '%d %b %y %H:%M:%S'))
985            alloc_log.addHandler(h)
986            self.write_state()
987        self.state_lock.release()
988
989        if not proj:
990            raise service_error(service_error.internal, 
991                    "Can't find project for %s" %aid)
992
993        if not user:
994            raise service_error(service_error.internal, 
995                    "Can't find creation user for %s" %aid)
996
997        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
998
999    def finalize_experiment(self, starter, topo, aid, alloc_id):
1000        """
1001        Store key bits of experiment state in the global repository, including
1002        the response that may need to be replayed, and return the response.
1003        """
1004        # Copy the assigned names into the return topology
1005        embedding = [ ]
1006        for n in starter.node:
1007            embedding.append({ 
1008                'toponame': n,
1009                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1010                })
1011        # Grab the log (this is some anal locking, but better safe than
1012        # sorry)
1013        self.state_lock.acquire()
1014        logv = "".join(self.allocation[aid]['log'])
1015        # It's possible that the StartSegment call gets retried (!).
1016        # if the 'started' key is in the allocation, we'll return it rather
1017        # than redo the setup.
1018        self.allocation[aid]['started'] = { 
1019                'allocID': alloc_id,
1020                'allocationLog': logv,
1021                'segmentdescription': { 
1022                    'topdldescription': topo.clone().to_dict()
1023                    },
1024                'embedding': embedding
1025                }
1026        retval = copy.copy(self.allocation[aid]['started'])
1027        self.write_state()
1028        self.state_lock.release()
1029        return retval
1030   
1031    # End of StartSegment support routines
1032
1033    def StartSegment(self, req, fid):
1034        err = None  # Any service_error generated after tmpdir is created
1035        rv = None   # Return value from segment creation
1036
1037        try:
1038            req = req['StartSegmentRequestBody']
1039            auth_attr = req['allocID']['fedid']
1040            topref = req['segmentdescription']['topdldescription']
1041        except KeyError:
1042            raise service_error(server_error.req, "Badly formed request")
1043
1044        connInfo = req.get('connection', [])
1045        services = req.get('service', [])
1046        aid = "%s" % auth_attr
1047        attrs = req.get('fedAttr', [])
1048        if not self.auth.check_attribute(fid, auth_attr):
1049            raise service_error(service_error.access, "Access denied")
1050        else:
1051            # See if this is a replay of an earlier succeeded StartSegment -
1052            # sometimes SSL kills 'em.  If so, replay the response rather than
1053            # redoing the allocation.
1054            self.state_lock.acquire()
1055            retval = self.allocation[aid].get('started', None)
1056            self.state_lock.release()
1057            if retval:
1058                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1059                        "replaying response")
1060                return retval
1061
1062        # A new request.  Do it.
1063
1064        if topref: topo = topdl.Topology(**topref)
1065        else:
1066            raise service_error(service_error.req, 
1067                    "Request missing segmentdescription'")
1068       
1069        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1070        try:
1071            tmpdir = tempfile.mkdtemp(prefix="access-")
1072            softdir = "%s/software" % tmpdir
1073            os.mkdir(softdir)
1074        except EnvironmentError:
1075            raise service_error(service_error.internal, "Cannot create tmp dir")
1076
1077        # Try block alllows us to clean up temporary files.
1078        try:
1079            self.retrieve_software(topo, certfile, softdir)
1080            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1081                    self.initialize_experiment_info(attrs, aid, 
1082                            certfile, tmpdir)
1083
1084            # Set up userconf and seer if needed
1085            self.configure_userconf(services, tmpdir)
1086            self.configure_seer_services(services, topo, softdir)
1087            # Get and send synch store variables
1088            self.export_store_info(certfile, proj, ename, connInfo)
1089            self.import_store_info(certfile, connInfo)
1090
1091            expfile = "%s/experiment.tcl" % tmpdir
1092
1093            self.generate_portal_configs(topo, pubkey_base, 
1094                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1095            self.generate_ns2(topo, expfile, 
1096                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1097
1098            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1099                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1100                    cert=self.xmlrpc_cert)
1101            rv = starter(self, ename, proj, user, expfile, tmpdir)
1102        except service_error, e:
1103            err = e
1104        except:
1105            t, v, st = sys.exc_info()
1106            err = service_error(service_error.internal, "%s: %s" % \
1107                    (v, traceback.extract_tb(st)))
1108
1109        # Walk up tmpdir, deleting as we go
1110        if self.cleanup: self.remove_dirs(tmpdir)
1111        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1112
1113        if rv:
1114            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1115        elif err:
1116            raise service_error(service_error.federant,
1117                    "Swapin failed: %s" % err)
1118        else:
1119            raise service_error(service_error.federant, "Swapin failed")
1120
1121    def TerminateSegment(self, req, fid):
1122        try:
1123            req = req['TerminateSegmentRequestBody']
1124        except KeyError:
1125            raise service_error(server_error.req, "Badly formed request")
1126
1127        auth_attr = req['allocID']['fedid']
1128        aid = "%s" % auth_attr
1129        attrs = req.get('fedAttr', [])
1130        if not self.auth.check_attribute(fid, auth_attr):
1131            raise service_error(service_error.access, "Access denied")
1132
1133        self.state_lock.acquire()
1134        if aid in self.allocation:
1135            proj = self.allocation[aid].get('project', None)
1136            if not proj: 
1137                proj = self.allocation[aid].get('sproject', None)
1138            user = self.allocation[aid].get('user', None)
1139            ename = self.allocation[aid].get('experiment', None)
1140        else:
1141            proj = None
1142            user = None
1143            ename = None
1144        self.state_lock.release()
1145
1146        if not proj:
1147            raise service_error(service_error.internal, 
1148                    "Can't find project for %s" % aid)
1149
1150        if not user:
1151            raise service_error(service_error.internal, 
1152                    "Can't find creation user for %s" % aid)
1153        if not ename:
1154            raise service_error(service_error.internal, 
1155                    "Can't find experiment name for %s" % aid)
1156        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1157                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1158        stopper(self, user, proj, ename)
1159        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.