source: fedd/federation/emulab_access.py @ 53b5c18

compt_changesinfo-ops
Last change on this file since 53b5c18 was 53b5c18, checked in by Ted Faber <faber@…>, 12 years ago

Avoid names that are too long

  • Property mode set to 100644
File size: 38.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    max_name_len = 19
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        access_base.__init__(self, config, auth)
61
62        self.max_name_len = access.max_name_len
63
64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75        self.seer_master_start = config.get("access", "seer_master_start")
76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
78        self.ssh_port = config.get("access","ssh_port") or "22"
79        self.boss = config.get("access", "boss")
80        self.ops = config.get("access", "ops")
81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103        elif self.access_type == 'local_emulab':
104            self.start_segment = local_emulab_segment.start_segment
105            self.stop_segment = local_emulab_segment.stop_segment
106        else:
107            self.start_segment = None
108            self.stop_segment = None
109
110        self.restricted = [ ]
111        # XXX: this should go?
112        #if config.has_option("access", "accessdb"):
113        #    self.read_access(config.get("access", "accessdb"))
114        tb = config.get('access', 'testbed')
115        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
116        else: self.testbed = [ ]
117
118        # authorization information
119        self.auth_type = config.get('access', 'auth_type') \
120                or 'legacy'
121        self.auth_dir = config.get('access', 'auth_dir')
122        accessdb = config.get("access", "accessdb")
123        # initialize the authorization system
124        if self.auth_type == 'legacy':
125            self.access = { }
126            if accessdb:
127                self.legacy_read_access(accessdb, self.legacy_access_tuple)
128        elif self.auth_type == 'abac':
129            self.auth = abac_authorizer(load=self.auth_dir)
130            self.access = [ ]
131            if accessdb:
132                self.read_access(accessdb, self.access_tuple)
133        else:
134            raise service_error(service_error.internal, 
135                    "Unknown auth_type: %s" % self.auth_type)
136
137        # read_state in the base_class
138        self.state_lock.acquire()
139        for a  in ('allocation', 'projects', 'keys', 'types'):
140            if a not in self.state:
141                self.state[a] = { }
142        self.allocation = self.state['allocation']
143        self.projects = self.state['projects']
144        self.keys = self.state['keys']
145        self.types = self.state['types']
146        if self.auth_type == "legacy": 
147            # Add the ownership attributes to the authorizer.  Note that the
148            # indices of the allocation dict are strings, but the attributes are
149            # fedids, so there is a conversion.
150            for k in self.allocation.keys():
151                for o in self.allocation[k].get('owners', []):
152                    self.auth.set_attribute(o, fedid(hexstr=k))
153                if self.allocation[k].has_key('userconfig'):
154                    sfid = self.allocation[k]['userconfig']
155                    fid = fedid(hexstr=sfid)
156                    self.auth.set_attribute(fid, "/%s" % sfid)
157        self.state_lock.release()
158        self.exports = {
159                'SMB': self.export_SMB,
160                'seer': self.export_seer,
161                'tmcd': self.export_tmcd,
162                'userconfig': self.export_userconfig,
163                'project_export': self.export_project_export,
164                'local_seer_control': self.export_local_seer,
165                'seer_master': self.export_seer_master,
166                'hide_hosts': self.export_hide_hosts,
167                }
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.local_seer_start:
171            if 'local_seer_control' in self.exports:
172                del self.exports['local_seer_control']
173
174        if not self.local_seer_image or not self.local_seer_software or \
175                not self.seer_master_start:
176            if 'seer_master' in self.exports:
177                del self.exports['seer_master']
178
179
180        self.soap_services = {\
181            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
182            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
183            'StartSegment': soap_handler("StartSegment", self.StartSegment),
184            'TerminateSegment': soap_handler("TerminateSegment",
185                self.TerminateSegment),
186            }
187        self.xmlrpc_services =  {\
188            'RequestAccess': xmlrpc_handler('RequestAccess',
189                self.RequestAccess),
190            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
191                self.ReleaseAccess),
192            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
193            'TerminateSegment': xmlrpc_handler('TerminateSegment',
194                self.TerminateSegment),
195            }
196
197        self.call_SetValue = service_caller('SetValue')
198        self.call_GetValue = service_caller('GetValue', log=self.log)
199
200        if not config.has_option("allocate", "uri"):
201            self.allocate_project = \
202                allocate_project_local(config, auth)
203        else:
204            self.allocate_project = \
205                allocate_project_remote(config, auth)
206
207
208        # If the project allocator exports services, put them in this object's
209        # maps so that classes that instantiate this can call the services.
210        self.soap_services.update(self.allocate_project.soap_services)
211        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
212
213    @staticmethod
214    def legacy_access_tuple(str):
215        """
216        Convert a string of the form (id[:resources:resouces], id, id) into a
217        tuple of the form (project, user, user) where users may be names or
218        fedids.  The resources strings are obsolete and ignored.
219        """
220        def parse_name(n):
221            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
222            else: return n
223
224        str = str.strip()
225        if str.startswith('(') and str.endswith(')'):
226            str = str[1:-1]
227            names = [ s.strip() for s in str.split(",")]
228            if len(names) > 3:
229                raise self.parse_error("More than three fields in name")
230            first = names[0].split(":")
231            if first == 'fedid:':
232                del first[0]
233                first[0] = fedid(hexstr=first[0])
234            names[0] = first[0]
235
236            for i in range(1,2):
237                names[i] = parse_name(names[i])
238
239            return tuple(names)
240        else:
241            raise self.parse_error('Bad mapping (unbalanced parens)')
242
243    @staticmethod
244    def access_tuple(str):
245        """
246        Convert a string of the form (id, id) into an access_project.  This is
247        called by read_access to convert to local attributes.  It returns
248        a tuple of the form (project, user, user) where the two users are
249        always the same.
250        """
251
252        str = str.strip()
253        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
254            # The slice takes the parens off the string.
255            proj, user = str[1:-1].split(',')
256            return (proj.strip(), user.strip(), user.strip())
257        else:
258            raise self.parse_error(
259                    'Bad mapping (unbalanced parens or more than 1 comma)')
260
261    # RequestAccess support routines
262
263    def legacy_lookup_access(self, req, fid):
264        """
265        Look up the local access control information mapped to this fedid and
266        credentials.  In this case it is a (project, create_user, access_user)
267        triple, and a triple of three booleans indicating which, if any will
268        need to be dynamically created.  Finally a list of owners for that
269        allocation is returned.
270
271        lookup_access_base pulls the first triple out, and it is parsed by this
272        routine into the boolean map.  Owners is always the controlling fedid.
273        """
274        # Return values
275        rp = None
276        ru = None
277        # This maps a valid user to the Emulab projects and users to use
278        found, match = self.legacy_lookup_access_base(req, fid)
279        tb, project, user = match
280       
281        if found == None:
282            raise service_error(service_error.access,
283                    "Access denied - cannot map access")
284
285        # resolve <dynamic> and <same> in found
286        dyn_proj = False
287        dyn_create_user = False
288        dyn_service_user = False
289
290        if found[0] == "<same>":
291            if project != None:
292                rp = project
293            else : 
294                raise service_error(\
295                        service_error.server_config,
296                        "Project matched <same> when no project given")
297        elif found[0] == "<dynamic>":
298            rp = None
299            dyn_proj = True
300        else:
301            rp = found[0]
302
303        if found[1] == "<same>":
304            if user_match == "<any>":
305                if user != None: rcu = user[0]
306                else: raise service_error(\
307                        service_error.server_config,
308                        "Matched <same> on anonymous request")
309            else:
310                rcu = user_match
311        elif found[1] == "<dynamic>":
312            rcu = None
313            dyn_create_user = True
314        else:
315            rcu = found[1]
316       
317        if found[2] == "<same>":
318            if user_match == "<any>":
319                if user != None: rsu = user[0]
320                else: raise service_error(\
321                        service_error.server_config,
322                        "Matched <same> on anonymous request")
323            else:
324                rsu = user_match
325        elif found[2] == "<dynamic>":
326            rsu = None
327            dyn_service_user = True
328        else:
329            rsu = found[2]
330
331        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
332                [ fid ]
333
334    def do_project_allocation(self, dyn, project, user):
335        """
336        Call the project allocation routines and return the info.
337        """
338        if dyn: 
339            # Compose the dynamic project request
340            # (only dynamic, dynamic currently allowed)
341            preq = { 'AllocateProjectRequestBody': \
342                        { 'project' : {\
343                            'user': [ \
344                            { \
345                                'access': [ { 
346                                    'sshPubkey': self.ssh_pubkey_file } ],
347                                 'role': "serviceAccess",\
348                            }, \
349                            { \
350                                'access': [ { 
351                                    'sshPubkey': self.ssh_pubkey_file } ],
352                                 'role': "experimentCreation",\
353                            }, \
354                            ], \
355                            }\
356                        }\
357                    }
358            return self.allocate_project.dynamic_project(preq)
359        else:
360            preq = {'StaticProjectRequestBody' : \
361                    { 'project': \
362                        { 'name' : { 'localname' : project },\
363                          'user' : [ \
364                            {\
365                                'userID': { 'localname' : user }, \
366                                'access': [ { 
367                                    'sshPubkey': self.ssh_pubkey_file } ],
368                                'role': 'experimentCreation'\
369                            },\
370                            {\
371                                'userID': { 'localname' : user}, \
372                                'access': [ { 
373                                    'sshPubkey': self.ssh_pubkey_file } ],
374                                'role': 'serviceAccess'\
375                            },\
376                        ]}\
377                    }\
378            }
379            return self.allocate_project.static_project(preq)
380
381    def save_project_state(self, aid, ap, dyn, owners):
382        """
383        Parse out and save the information relevant to the project created for
384        this experiment.  That info is largely in ap and owners.  dyn indicates
385        that the project was created dynamically.  Return the user and project
386        names.
387        """
388        self.state_lock.acquire()
389        self.allocation[aid] = { }
390        self.allocation[aid]['auth'] = set()
391        try:
392            pname = ap['project']['name']['localname']
393        except KeyError:
394            pname = None
395
396        if dyn:
397            if not pname:
398                self.state_lock.release()
399                raise service_error(service_error.internal,
400                        "Misformed allocation response?")
401            if pname in self.projects: self.projects[pname] += 1
402            else: self.projects[pname] = 1
403            self.allocation[aid]['project'] = pname
404        else:
405            # sproject is a static project associated with this allocation.
406            self.allocation[aid]['sproject'] = pname
407
408        self.allocation[aid]['keys'] = [ ]
409
410        try:
411            for u in ap['project']['user']:
412                uname = u['userID']['localname']
413                if u['role'] == 'experimentCreation':
414                    self.allocation[aid]['user'] = uname
415                for k in [ k['sshPubkey'] for k in u['access'] \
416                        if k.has_key('sshPubkey') ]:
417                    kv = "%s:%s" % (uname, k)
418                    if self.keys.has_key(kv): self.keys[kv] += 1
419                    else: self.keys[kv] = 1
420                    self.allocation[aid]['keys'].append((uname, k))
421        except KeyError:
422            self.state_lock.release()
423            raise service_error(service_error.internal,
424                    "Misformed allocation response?")
425
426        self.allocation[aid]['owners'] = owners
427        self.write_state()
428        self.state_lock.release()
429        return (pname, uname)
430
431    # End of RequestAccess support routines
432
433    def RequestAccess(self, req, fid):
434        """
435        Handle the access request.  Proxy if not for us.
436
437        Parse out the fields and make the allocations or rejections if for us,
438        otherwise, assuming we're willing to proxy, proxy the request out.
439        """
440
441        def gateway_hardware(h):
442            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
443            else: return h
444
445        def get_export_project(svcs):
446            """
447            if the service requests includes one to export a project, return
448            that project.
449            """
450            rv = None
451            for s in svcs:
452                if s.get('name', '') == 'project_export' and \
453                        s.get('visibility', '') == 'export':
454                    if not rv: 
455                        for a in s.get('fedAttr', []):
456                            if a.get('attribute', '') == 'project' \
457                                    and 'value' in a:
458                                rv = a['value']
459                    else:
460                        raise service_error(service_error, access, 
461                                'Requesting multiple project exports is ' + \
462                                        'not supported');
463            return rv
464
465        # The dance to get into the request body
466        if req.has_key('RequestAccessRequestBody'):
467            req = req['RequestAccessRequestBody']
468        else:
469            raise service_error(service_error.req, "No request!?")
470
471        # if this includes a project export request, construct a filter such
472        # that only the ABAC attributes mapped to that project are checked for
473        # access.
474        if 'service' in req:
475            ep = get_export_project(req['service'])
476            if ep: pf = lambda(a): a.value[0] == ep
477            else: pf = None
478        else:
479            ep = None
480            pf = None
481
482        if self.auth.import_credentials(
483                data_list=req.get('abac_credential', [])):
484            self.auth.save()
485
486        if self.auth_type == "legacy":
487            found, dyn, owners= self.legacy_lookup_access(req, fid)
488            proof = access_proof("me", fid, "create")
489        elif self.auth_type == 'abac':
490            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
491        else:
492            raise service_error(service_error.internal, 
493                    'Unknown auth_type: %s' % self.auth_type)
494        ap = None
495
496        # This only happens in legacy lookups, but if this user has access to
497        # the testbed but not the project to be exported, raise the error.
498        if ep and ep != found[0]:
499            raise service_error(service_error.access,
500                    "Cannot export %s" % ep)
501
502        if self.ssh_pubkey_file:
503            ap = self.do_project_allocation(dyn[1], found[0], found[1])
504        else:
505            raise service_error(service_error.internal, 
506                    "SSH access parameters required")
507        # keep track of what's been added
508        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
509        aid = unicode(allocID)
510
511        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
512
513        services, svc_state = self.export_services(req.get('service',[]),
514                pname, uname)
515        self.state_lock.acquire()
516        # Store services state in global state
517        for k, v in svc_state.items():
518            self.allocation[aid][k] = v
519        self.append_allocation_authorization(aid, 
520                set([(o, allocID) for o in owners]), state_attr='allocation')
521        self.write_state()
522        self.state_lock.release()
523        try:
524            f = open("%s/%s.pem" % (self.certdir, aid), "w")
525            print >>f, alloc_cert
526            f.close()
527        except EnvironmentError, e:
528            raise service_error(service_error.internal, 
529                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
530        resp = self.build_access_response({ 'fedid': allocID } ,
531                ap, services, proof)
532        return resp
533
534    def do_release_project(self, del_project, del_users, del_types):
535        """
536        If a project and users has to be deleted, make the call.
537        """
538        msg = { 'project': { }}
539        if del_project:
540            msg['project']['name']= {'localname': del_project}
541        users = [ ]
542        for u in del_users.keys():
543            users.append({ 'userID': { 'localname': u },\
544                'access' :  \
545                        [ {'sshPubkey' : s } for s in del_users[u]]\
546            })
547        if users: 
548            msg['project']['user'] = users
549        if len(del_types) > 0:
550            msg['resources'] = { 'node': \
551                    [ {'hardware': [ h ] } for h in del_types ]\
552                }
553        if self.allocate_project.release_project:
554            msg = { 'ReleaseProjectRequestBody' : msg}
555            self.allocate_project.release_project(msg)
556
557    def ReleaseAccess(self, req, fid):
558        # The dance to get into the request body
559        if req.has_key('ReleaseAccessRequestBody'):
560            req = req['ReleaseAccessRequestBody']
561        else:
562            raise service_error(service_error.req, "No request!?")
563
564        try:
565            if req['allocID'].has_key('localname'):
566                auth_attr = aid = req['allocID']['localname']
567            elif req['allocID'].has_key('fedid'):
568                aid = unicode(req['allocID']['fedid'])
569                auth_attr = req['allocID']['fedid']
570            else:
571                raise service_error(service_error.req,
572                        "Only localnames and fedids are understood")
573        except KeyError:
574            raise service_error(service_error.req, "Badly formed request")
575
576        self.log.debug("[access] deallocation requested for %s by %s" % \
577                (aid, fid))
578        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
579                with_proof=True)
580        if not access_ok:
581            self.log.debug("[access] deallocation denied for %s", aid)
582            raise service_error(service_error.access, "Access Denied")
583
584        # If we know this allocation, reduce the reference counts and
585        # remove the local allocations.  Otherwise report an error.  If
586        # there is an allocation to delete, del_users will be a dictonary
587        # of sets where the key is the user that owns the keys in the set.
588        # We use a set to avoid duplicates.  del_project is just the name
589        # of any dynamic project to delete.  We're somewhat lazy about
590        # deleting authorization attributes.  Having access to something
591        # that doesn't exist isn't harmful.
592        del_users = { }
593        del_project = None
594        del_types = set()
595
596        self.state_lock.acquire()
597        if aid in self.allocation:
598            self.log.debug("Found allocation for %s" %aid)
599            self.clear_allocation_authorization(aid, state_attr='allocation')
600            for k in self.allocation[aid]['keys']:
601                kk = "%s:%s" % k
602                self.keys[kk] -= 1
603                if self.keys[kk] == 0:
604                    if not del_users.has_key(k[0]):
605                        del_users[k[0]] = set()
606                    del_users[k[0]].add(k[1])
607                    del self.keys[kk]
608
609            if 'project' in self.allocation[aid]:
610                pname = self.allocation[aid]['project']
611                self.projects[pname] -= 1
612                if self.projects[pname] == 0:
613                    del_project = pname
614                    del self.projects[pname]
615
616            if 'types' in self.allocation[aid]:
617                for t in self.allocation[aid]['types']:
618                    self.types[t] -= 1
619                    if self.types[t] == 0:
620                        if not del_project: del_project = t[0]
621                        del_types.add(t[1])
622                        del self.types[t]
623
624            del self.allocation[aid]
625            self.write_state()
626            self.state_lock.release()
627            # If we actually have resources to deallocate, prepare the call.
628            if del_project or del_users:
629                self.do_release_project(del_project, del_users, del_types)
630            # And remove the access cert
631            cf = "%s/%s.pem" % (self.certdir, aid)
632            self.log.debug("Removing %s" % cf)
633            os.remove(cf)
634            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
635        else:
636            self.state_lock.release()
637            raise service_error(service_error.req, "No such allocation")
638
639    # These are subroutines for StartSegment
640    def generate_ns2(self, topo, expfn, softdir, connInfo):
641        """
642        Convert topo into an ns2 file, decorated with appropriate commands for
643        the particular testbed setup.  Convert all requests for software, etc
644        to point at the staged copies on this testbed and add the federation
645        startcommands.
646        """
647        class dragon_commands:
648            """
649            Functor to spit out approrpiate dragon commands for nodes listed in
650            the connectivity description.  The constructor makes a dict mapping
651            dragon nodes to their parameters and the __call__ checks each
652            element in turn for membership.
653            """
654            def __init__(self, map):
655                self.node_info = map
656
657            def __call__(self, e):
658                s = ""
659                if isinstance(e, topdl.Computer):
660                    if self.node_info.has_key(e.name):
661                        info = self.node_info[e.name]
662                        for ifname, vlan, type in info:
663                            for i in e.interface:
664                                if i.name == ifname:
665                                    addr = i.get_attribute('ip4_address')
666                                    subs = i.substrate[0]
667                                    break
668                            else:
669                                raise service_error(service_error.internal,
670                                        "No interface %s on element %s" % \
671                                                (ifname, e.name))
672                            # XXX: do netmask right
673                            if type =='link':
674                                s = ("tb-allow-external ${%s} " + \
675                                        "dragonportal ip %s vlan %s " + \
676                                        "netmask 255.255.255.0\n") % \
677                                        (topdl.to_tcl_name(e.name), addr, vlan)
678                            elif type =='lan':
679                                s = ("tb-allow-external ${%s} " + \
680                                        "dragonportal " + \
681                                        "ip %s vlan %s usurp %s\n") % \
682                                        (topdl.to_tcl_name(e.name), addr, 
683                                                vlan, subs)
684                            else:
685                                raise service_error(service_error_internal,
686                                        "Unknown DRAGON type %s" % type)
687                return s
688
689        class not_dragon:
690            """
691            Return true if a node is in the given map of dragon nodes.
692            """
693            def __init__(self, map):
694                self.nodes = set(map.keys())
695
696            def __call__(self, e):
697                return e.name not in self.nodes
698
699        # Main line of generate_ns2
700        t = topo.clone()
701
702        # Create the map of nodes that need direct connections (dragon
703        # connections) from the connInfo
704        dragon_map = { }
705        for i in [ i for i in connInfo if i['type'] == 'transit']:
706            for a in i.get('fedAttr', []):
707                if a['attribute'] == 'vlan_id':
708                    vlan = a['value']
709                    break
710            else:
711                raise service_error(service_error.internal, "No vlan tag")
712            members = i.get('member', [])
713            if len(members) > 1: type = 'lan'
714            else: type = 'link'
715
716            try:
717                for m in members:
718                    if m['element'] in dragon_map:
719                        dragon_map[m['element']].append(( m['interface'], 
720                            vlan, type))
721                    else:
722                        dragon_map[m['element']] = [( m['interface'], 
723                            vlan, type),]
724            except KeyError:
725                raise service_error(service_error.req,
726                        "Missing connectivity info")
727
728        # Weed out the things we aren't going to instantiate: Segments, portal
729        # substrates, and portal interfaces.  (The copy in the for loop allows
730        # us to delete from e.elements in side the for loop).  While we're
731        # touching all the elements, we also adjust paths from the original
732        # testbed to local testbed paths and put the federation commands into
733        # the start commands
734        for e in [e for e in t.elements]:
735            if isinstance(e, topdl.Segment):
736                t.elements.remove(e)
737            if isinstance(e, topdl.Computer):
738                self.add_kit(e, self.federation_software)
739                if e.get_attribute('portal') and self.portal_startcommand:
740                    # Add local portal support software
741                    self.add_kit(e, self.portal_software)
742                    # Portals never have a user-specified start command
743                    e.set_attribute('startup', self.portal_startcommand)
744                elif self.node_startcommand:
745                    if e.get_attribute('startup'):
746                        e.set_attribute('startup', "%s \\$USER '%s'" % \
747                                (self.node_startcommand, 
748                                    e.get_attribute('startup')))
749                    else:
750                        e.set_attribute('startup', self.node_startcommand)
751
752                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
753                # Remove portal interfaces that do not connect to DRAGON
754                e.interface = [i for i in e.interface \
755                        if not i.get_attribute('portal') or i.name in dinf ]
756            # Fix software paths
757            for s in getattr(e, 'software', []):
758                s.location = re.sub("^.*/", softdir, s.location)
759
760        t.substrates = [ s.clone() for s in t.substrates ]
761        t.incorporate_elements()
762
763        # Customize the ns2 output for local portal commands and images
764        filters = []
765
766        if self.dragon_endpoint:
767            add_filter = not_dragon(dragon_map)
768            filters.append(dragon_commands(dragon_map))
769        else:
770            add_filter = None
771
772        if self.portal_command:
773            filters.append(topdl.generate_portal_command_filter(
774                self.portal_command, add_filter=add_filter))
775
776        if self.portal_image:
777            filters.append(topdl.generate_portal_image_filter(
778                self.portal_image))
779
780        if self.portal_type:
781            filters.append(topdl.generate_portal_hardware_filter(
782                self.portal_type))
783
784        # Convert to ns and write it out
785        expfile = topdl.topology_to_ns2(t, filters)
786        try:
787            f = open(expfn, "w")
788            print >>f, expfile
789            f.close()
790        except EnvironmentError:
791            raise service_error(service_error.internal,
792                    "Cannot write experiment file %s: %s" % (expfn,e))
793
794    def export_store_info(self, cf, proj, ename, connInfo):
795        """
796        For the export requests in the connection info, install the peer names
797        at the experiment controller via SetValue calls.
798        """
799
800        for c in connInfo:
801            for p in [ p for p in c.get('parameter', []) \
802                    if p.get('type', '') == 'output']:
803
804                if p.get('name', '') == 'peer':
805                    k = p.get('key', None)
806                    surl = p.get('store', None)
807                    if surl and k and k.index('/') != -1:
808                        value = "%s.%s.%s%s" % \
809                                (k[k.index('/')+1:], ename, proj, self.domain)
810                        req = { 'name': k, 'value': value }
811                        self.log.debug("Setting %s to %s on %s" % \
812                                (k, value, surl))
813                        self.call_SetValue(surl, req, cf)
814                    else:
815                        self.log.error("Bad export request: %s" % p)
816                elif p.get('name', '') == 'ssh_port':
817                    k = p.get('key', None)
818                    surl = p.get('store', None)
819                    if surl and k:
820                        req = { 'name': k, 'value': self.ssh_port }
821                        self.log.debug("Setting %s to %s on %s" % \
822                                (k, self.ssh_port, surl))
823                        self.call_SetValue(surl, req, cf)
824                    else:
825                        self.log.error("Bad export request: %s" % p)
826                else:
827                    self.log.error("Unknown export parameter: %s" % \
828                            p.get('name'))
829                    continue
830
831    def add_seer_node(self, topo, name, startup):
832        """
833        Add a seer node to the given topology, with the startup command passed
834        in.  Used by configure seer_services.
835        """
836        c_node = topdl.Computer(
837                name=name, 
838                os= topdl.OperatingSystem(
839                    attribute=[
840                    { 'attribute': 'osid', 
841                        'value': self.local_seer_image },
842                    ]),
843                attribute=[
844                    { 'attribute': 'startup', 'value': startup },
845                    ]
846                )
847        self.add_kit(c_node, self.local_seer_software)
848        topo.elements.append(c_node)
849
850    def configure_seer_services(self, services, topo, softdir):
851        """
852        Make changes to the topology required for the seer requests being made.
853        Specifically, add any control or master nodes required and set up the
854        start commands on the nodes to interconnect them.
855        """
856        local_seer = False      # True if we need to add a control node
857        collect_seer = False    # True if there is a seer-master node
858        seer_master= False      # True if we need to add the seer-master
859        for s in services:
860            s_name = s.get('name', '')
861            s_vis = s.get('visibility','')
862
863            if s_name  == 'local_seer_control' and s_vis == 'export':
864                local_seer = True
865            elif s_name == 'seer_master':
866                if s_vis == 'import':
867                    collect_seer = True
868                elif s_vis == 'export':
869                    seer_master = True
870       
871        # We've got the whole picture now, so add nodes if needed and configure
872        # them to interconnect properly.
873        if local_seer or seer_master:
874            # Copy local seer control node software to the tempdir
875            for l, f in self.local_seer_software:
876                base = os.path.basename(f)
877                copy_file(f, "%s/%s" % (softdir, base))
878        # If we're collecting seers somewhere the controllers need to talk to
879        # the master.  In testbeds that export the service, that will be a
880        # local node that we'll add below.  Elsewhere it will be the control
881        # portal that will port forward to the exporting master.
882        if local_seer:
883            if collect_seer:
884                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
885            else:
886                startup = self.local_seer_start
887            self.add_seer_node(topo, 'control', startup)
888        # If this is the seer master, add that node, too.
889        if seer_master:
890            self.add_seer_node(topo, 'seer-master', 
891                    "%s -R -n -R seer-master -R -A -R sink" % \
892                            self.seer_master_start)
893
894    def retrieve_software(self, topo, certfile, softdir):
895        """
896        Collect the software that nodes in the topology need loaded and stage
897        it locally.  This implies retrieving it from the experiment_controller
898        and placing it into softdir.  Certfile is used to prove that this node
899        has access to that data (it's the allocation/segment fedid).  Finally
900        local portal and federation software is also copied to the same staging
901        directory for simplicity - all software needed for experiment creation
902        is in softdir.
903        """
904        sw = set()
905        for e in topo.elements:
906            for s in getattr(e, 'software', []):
907                sw.add(s.location)
908        for s in sw:
909            self.log.debug("Retrieving %s" % s)
910            try:
911                get_url(s, certfile, softdir)
912            except:
913                t, v, st = sys.exc_info()
914                raise service_error(service_error.internal,
915                        "Error retrieving %s: %s" % (s, v))
916
917        # Copy local federation and portal node software to the tempdir
918        for s in (self.federation_software, self.portal_software):
919            for l, f in s:
920                base = os.path.basename(f)
921                copy_file(f, "%s/%s" % (softdir, base))
922
923
924    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
925        """
926        Gather common configuration files, retrieve or create an experiment
927        name and project name, and return the ssh_key filenames.  Create an
928        allocation log bound to the state log variable as well.
929        """
930        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
931        ename = None
932        pubkey_base = None
933        secretkey_base = None
934        proj = None
935        user = None
936        alloc_log = None
937        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
938
939        for a in attrs:
940            if a['attribute'] in configs:
941                try:
942                    self.log.debug("Retrieving %s from %s" % \
943                            (a['attribute'], a['value']))
944                    get_url(a['value'], certfile, tmpdir)
945                except:
946                    t, v, st = sys.exc_info()
947                    raise service_error(service_error.internal,
948                            "Error retrieving %s: %s" % (a.get('value', ""), v))
949            if a['attribute'] == 'ssh_pubkey':
950                pubkey_base = a['value'].rpartition('/')[2]
951            if a['attribute'] == 'ssh_secretkey':
952                secretkey_base = a['value'].rpartition('/')[2]
953            if a['attribute'] == 'experiment_name':
954                ename = a['value']
955
956        # Names longer than the emulab max are discarder
957        if ename and len(ename) <= self.max_name_len:
958            # Clean up the experiment name so that emulab will accept it.
959            ename = re.sub(vchars_re, '-', ename)
960
961        else:
962            ename = ""
963            for i in range(0,5):
964                ename += random.choice(string.ascii_letters)
965            self.log.warn("No experiment name or suggestion too long: " + \
966                    "picked one randomly: %s" % ename)
967
968        if not pubkey_base:
969            raise service_error(service_error.req, 
970                    "No public key attribute")
971
972        if not secretkey_base:
973            raise service_error(service_error.req, 
974                    "No secret key attribute")
975
976        self.state_lock.acquire()
977        if aid in self.allocation:
978            proj = self.allocation[aid].get('project', None)
979            if not proj: 
980                proj = self.allocation[aid].get('sproject', None)
981            user = self.allocation[aid].get('user', None)
982            self.allocation[aid]['experiment'] = ename
983            self.allocation[aid]['log'] = [ ]
984            # Create a logger that logs to the experiment's state object as
985            # well as to the main log file.
986            alloc_log = logging.getLogger('fedd.access.%s' % ename)
987            h = logging.StreamHandler(
988                    list_log.list_log(self.allocation[aid]['log']))
989            # XXX: there should be a global one of these rather than
990            # repeating the code.
991            h.setFormatter(logging.Formatter(
992                "%(asctime)s %(name)s %(message)s",
993                        '%d %b %y %H:%M:%S'))
994            alloc_log.addHandler(h)
995            self.write_state()
996        self.state_lock.release()
997
998        if not proj:
999            raise service_error(service_error.internal, 
1000                    "Can't find project for %s" %aid)
1001
1002        if not user:
1003            raise service_error(service_error.internal, 
1004                    "Can't find creation user for %s" %aid)
1005
1006        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1007
1008    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1009        """
1010        Store key bits of experiment state in the global repository, including
1011        the response that may need to be replayed, and return the response.
1012        """
1013        # Copy the assigned names into the return topology
1014        embedding = [ ]
1015        for n in starter.node:
1016            embedding.append({ 
1017                'toponame': n,
1018                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1019                })
1020        # Grab the log (this is some anal locking, but better safe than
1021        # sorry)
1022        self.state_lock.acquire()
1023        logv = "".join(self.allocation[aid]['log'])
1024        # It's possible that the StartSegment call gets retried (!).
1025        # if the 'started' key is in the allocation, we'll return it rather
1026        # than redo the setup.
1027        self.allocation[aid]['started'] = { 
1028                'allocID': alloc_id,
1029                'allocationLog': logv,
1030                'segmentdescription': { 
1031                    'topdldescription': topo.clone().to_dict()
1032                    },
1033                'embedding': embedding,
1034                'proof': proof.to_dict(),
1035                }
1036        retval = copy.copy(self.allocation[aid]['started'])
1037        self.write_state()
1038        self.state_lock.release()
1039        return retval
1040   
1041    # End of StartSegment support routines
1042
1043    def StartSegment(self, req, fid):
1044        err = None  # Any service_error generated after tmpdir is created
1045        rv = None   # Return value from segment creation
1046
1047        try:
1048            req = req['StartSegmentRequestBody']
1049            auth_attr = req['allocID']['fedid']
1050            topref = req['segmentdescription']['topdldescription']
1051        except KeyError:
1052            raise service_error(server_error.req, "Badly formed request")
1053
1054        connInfo = req.get('connection', [])
1055        services = req.get('service', [])
1056        aid = "%s" % auth_attr
1057        attrs = req.get('fedAttr', [])
1058
1059        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1060                with_proof=True)
1061        if not access_ok:
1062            raise service_error(service_error.access, "Access denied")
1063        else:
1064            # See if this is a replay of an earlier succeeded StartSegment -
1065            # sometimes SSL kills 'em.  If so, replay the response rather than
1066            # redoing the allocation.
1067            self.state_lock.acquire()
1068            retval = self.allocation[aid].get('started', None)
1069            self.state_lock.release()
1070            if retval:
1071                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1072                        "replaying response")
1073                return retval
1074
1075        # A new request.  Do it.
1076
1077        if topref: topo = topdl.Topology(**topref)
1078        else:
1079            raise service_error(service_error.req, 
1080                    "Request missing segmentdescription'")
1081       
1082        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1083        try:
1084            tmpdir = tempfile.mkdtemp(prefix="access-")
1085            softdir = "%s/software" % tmpdir
1086            os.mkdir(softdir)
1087        except EnvironmentError:
1088            raise service_error(service_error.internal, "Cannot create tmp dir")
1089
1090        # Try block alllows us to clean up temporary files.
1091        try:
1092            self.retrieve_software(topo, certfile, softdir)
1093            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1094                    self.initialize_experiment_info(attrs, aid, 
1095                            certfile, tmpdir)
1096
1097            # Set up userconf and seer if needed
1098            self.configure_userconf(services, tmpdir)
1099            self.configure_seer_services(services, topo, softdir)
1100            # Get and send synch store variables
1101            self.export_store_info(certfile, proj, ename, connInfo)
1102            self.import_store_info(certfile, connInfo)
1103
1104            expfile = "%s/experiment.tcl" % tmpdir
1105
1106            self.generate_portal_configs(topo, pubkey_base, 
1107                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1108            self.generate_ns2(topo, expfile, 
1109                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1110
1111            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1112                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1113                    cert=self.xmlrpc_cert)
1114            rv = starter(self, ename, proj, user, expfile, tmpdir)
1115        except service_error, e:
1116            err = e
1117        except:
1118            t, v, st = sys.exc_info()
1119            err = service_error(service_error.internal, "%s: %s" % \
1120                    (v, traceback.extract_tb(st)))
1121
1122        # Walk up tmpdir, deleting as we go
1123        if self.cleanup: self.remove_dirs(tmpdir)
1124        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1125
1126        if rv:
1127            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1128                    proof)
1129        elif err:
1130            raise service_error(service_error.federant,
1131                    "Swapin failed: %s" % err)
1132        else:
1133            raise service_error(service_error.federant, "Swapin failed")
1134
1135    def TerminateSegment(self, req, fid):
1136        try:
1137            req = req['TerminateSegmentRequestBody']
1138        except KeyError:
1139            raise service_error(server_error.req, "Badly formed request")
1140
1141        auth_attr = req['allocID']['fedid']
1142        aid = "%s" % auth_attr
1143        attrs = req.get('fedAttr', [])
1144
1145        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1146                with_proof=True)
1147        if not access_ok:
1148            raise service_error(service_error.access, "Access denied")
1149
1150        self.state_lock.acquire()
1151        if aid in self.allocation:
1152            proj = self.allocation[aid].get('project', None)
1153            if not proj: 
1154                proj = self.allocation[aid].get('sproject', None)
1155            user = self.allocation[aid].get('user', None)
1156            ename = self.allocation[aid].get('experiment', None)
1157        else:
1158            proj = None
1159            user = None
1160            ename = None
1161        self.state_lock.release()
1162
1163        if not proj:
1164            raise service_error(service_error.internal, 
1165                    "Can't find project for %s" % aid)
1166
1167        if not user:
1168            raise service_error(service_error.internal, 
1169                    "Can't find creation user for %s" % aid)
1170        if not ename:
1171            raise service_error(service_error.internal, 
1172                    "Can't find experiment name for %s" % aid)
1173        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1174                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1175        stopper(self, user, proj, ename)
1176        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.