source: fedd/federation/emulab_access.py @ 6e33086

compt_changesinfo-ops
Last change on this file since 6e33086 was 6e33086, checked in by Ted Faber <faber@…>, 13 years ago

InfoSegment? to emulab access controllers

  • Property mode set to 100644
File size: 41.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    max_name_len = 19
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        access_base.__init__(self, config, auth)
61
62        self.max_name_len = access.max_name_len
63
64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75        self.seer_master_start = config.get("access", "seer_master_start")
76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
78        self.ssh_port = config.get("access","ssh_port") or "22"
79        self.boss = config.get("access", "boss")
80        self.ops = config.get("access", "ops")
81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103            self.info_segment = proxy_emulab_segment.info_segment
104        elif self.access_type == 'local_emulab':
105            self.start_segment = local_emulab_segment.start_segment
106            self.stop_segment = local_emulab_segment.stop_segment
107            self.info_segment = local_emulab_segment.info_segment
108        else:
109            self.start_segment = None
110            self.stop_segment = None
111            self.info_segment = None
112
113        self.restricted = [ ]
114        tb = config.get('access', 'testbed')
115        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
116        else: self.testbed = [ ]
117
118        # authorization information
119        self.auth_type = config.get('access', 'auth_type') \
120                or 'legacy'
121        self.auth_dir = config.get('access', 'auth_dir')
122        accessdb = config.get("access", "accessdb")
123        # initialize the authorization system
124        if self.auth_type == 'legacy':
125            self.access = { }
126            if accessdb:
127                self.legacy_read_access(accessdb, self.legacy_access_tuple)
128        elif self.auth_type == 'abac':
129            self.auth = abac_authorizer(load=self.auth_dir)
130            self.access = [ ]
131            if accessdb:
132                self.read_access(accessdb, self.access_tuple)
133        else:
134            raise service_error(service_error.internal, 
135                    "Unknown auth_type: %s" % self.auth_type)
136
137        # read_state in the base_class
138        self.state_lock.acquire()
139        for a  in ('allocation', 'projects', 'keys', 'types'):
140            if a not in self.state:
141                self.state[a] = { }
142        self.allocation = self.state['allocation']
143        self.projects = self.state['projects']
144        self.keys = self.state['keys']
145        self.types = self.state['types']
146        if self.auth_type == "legacy": 
147            # Add the ownership attributes to the authorizer.  Note that the
148            # indices of the allocation dict are strings, but the attributes are
149            # fedids, so there is a conversion.
150            for k in self.allocation.keys():
151                for o in self.allocation[k].get('owners', []):
152                    self.auth.set_attribute(o, fedid(hexstr=k))
153                if self.allocation[k].has_key('userconfig'):
154                    sfid = self.allocation[k]['userconfig']
155                    fid = fedid(hexstr=sfid)
156                    self.auth.set_attribute(fid, "/%s" % sfid)
157        self.state_lock.release()
158        self.exports = {
159                'SMB': self.export_SMB,
160                'seer': self.export_seer,
161                'tmcd': self.export_tmcd,
162                'userconfig': self.export_userconfig,
163                'project_export': self.export_project_export,
164                'local_seer_control': self.export_local_seer,
165                'seer_master': self.export_seer_master,
166                'hide_hosts': self.export_hide_hosts,
167                }
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.local_seer_start:
171            if 'local_seer_control' in self.exports:
172                del self.exports['local_seer_control']
173
174        if not self.local_seer_image or not self.local_seer_software or \
175                not self.seer_master_start:
176            if 'seer_master' in self.exports:
177                del self.exports['seer_master']
178
179
180        self.soap_services = {\
181            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
182            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
183            'StartSegment': soap_handler("StartSegment", self.StartSegment),
184            'TerminateSegment': soap_handler("TerminateSegment",
185                self.TerminateSegment),
186            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
187            }
188        self.xmlrpc_services =  {\
189            'RequestAccess': xmlrpc_handler('RequestAccess',
190                self.RequestAccess),
191            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
192                self.ReleaseAccess),
193            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
194            'TerminateSegment': xmlrpc_handler('TerminateSegment',
195                self.TerminateSegment),
196            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
197            }
198
199        self.call_SetValue = service_caller('SetValue')
200        self.call_GetValue = service_caller('GetValue', log=self.log)
201
202        if not config.has_option("allocate", "uri"):
203            self.allocate_project = \
204                allocate_project_local(config, auth)
205        else:
206            self.allocate_project = \
207                allocate_project_remote(config, auth)
208
209
210        # If the project allocator exports services, put them in this object's
211        # maps so that classes that instantiate this can call the services.
212        self.soap_services.update(self.allocate_project.soap_services)
213        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
214
215    @staticmethod
216    def legacy_access_tuple(str):
217        """
218        Convert a string of the form (id[:resources:resouces], id, id) into a
219        tuple of the form (project, user, user) where users may be names or
220        fedids.  The resources strings are obsolete and ignored.
221        """
222        def parse_name(n):
223            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
224            else: return n
225
226        str = str.strip()
227        if str.startswith('(') and str.endswith(')'):
228            str = str[1:-1]
229            names = [ s.strip() for s in str.split(",")]
230            if len(names) > 3:
231                raise self.parse_error("More than three fields in name")
232            first = names[0].split(":")
233            if first == 'fedid:':
234                del first[0]
235                first[0] = fedid(hexstr=first[0])
236            names[0] = first[0]
237
238            for i in range(1,2):
239                names[i] = parse_name(names[i])
240
241            return tuple(names)
242        else:
243            raise self.parse_error('Bad mapping (unbalanced parens)')
244
245    @staticmethod
246    def access_tuple(str):
247        """
248        Convert a string of the form (id, id) into an access_project.  This is
249        called by read_access to convert to local attributes.  It returns
250        a tuple of the form (project, user, user) where the two users are
251        always the same.
252        """
253
254        str = str.strip()
255        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
256            # The slice takes the parens off the string.
257            proj, user = str[1:-1].split(',')
258            return (proj.strip(), user.strip(), user.strip())
259        else:
260            raise self.parse_error(
261                    'Bad mapping (unbalanced parens or more than 1 comma)')
262
263    # RequestAccess support routines
264
265    def legacy_lookup_access(self, req, fid):
266        """
267        Look up the local access control information mapped to this fedid and
268        credentials.  In this case it is a (project, create_user, access_user)
269        triple, and a triple of three booleans indicating which, if any will
270        need to be dynamically created.  Finally a list of owners for that
271        allocation is returned.
272
273        lookup_access_base pulls the first triple out, and it is parsed by this
274        routine into the boolean map.  Owners is always the controlling fedid.
275        """
276        # Return values
277        rp = None
278        ru = None
279        # This maps a valid user to the Emulab projects and users to use
280        found, match = self.legacy_lookup_access_base(req, fid)
281        tb, project, user = match
282       
283        if found == None:
284            raise service_error(service_error.access,
285                    "Access denied - cannot map access")
286
287        # resolve <dynamic> and <same> in found
288        dyn_proj = False
289        dyn_create_user = False
290        dyn_service_user = False
291
292        if found[0] == "<same>":
293            if project != None:
294                rp = project
295            else : 
296                raise service_error(\
297                        service_error.server_config,
298                        "Project matched <same> when no project given")
299        elif found[0] == "<dynamic>":
300            rp = None
301            dyn_proj = True
302        else:
303            rp = found[0]
304
305        if found[1] == "<same>":
306            if user_match == "<any>":
307                if user != None: rcu = user[0]
308                else: raise service_error(\
309                        service_error.server_config,
310                        "Matched <same> on anonymous request")
311            else:
312                rcu = user_match
313        elif found[1] == "<dynamic>":
314            rcu = None
315            dyn_create_user = True
316        else:
317            rcu = found[1]
318       
319        if found[2] == "<same>":
320            if user_match == "<any>":
321                if user != None: rsu = user[0]
322                else: raise service_error(\
323                        service_error.server_config,
324                        "Matched <same> on anonymous request")
325            else:
326                rsu = user_match
327        elif found[2] == "<dynamic>":
328            rsu = None
329            dyn_service_user = True
330        else:
331            rsu = found[2]
332
333        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
334                [ fid ]
335
336    def do_project_allocation(self, dyn, project, user):
337        """
338        Call the project allocation routines and return the info.
339        """
340        if dyn: 
341            # Compose the dynamic project request
342            # (only dynamic, dynamic currently allowed)
343            preq = { 'AllocateProjectRequestBody': \
344                        { 'project' : {\
345                            'user': [ \
346                            { \
347                                'access': [ { 
348                                    'sshPubkey': self.ssh_pubkey_file } ],
349                                 'role': "serviceAccess",\
350                            }, \
351                            { \
352                                'access': [ { 
353                                    'sshPubkey': self.ssh_pubkey_file } ],
354                                 'role': "experimentCreation",\
355                            }, \
356                            ], \
357                            }\
358                        }\
359                    }
360            return self.allocate_project.dynamic_project(preq)
361        else:
362            preq = {'StaticProjectRequestBody' : \
363                    { 'project': \
364                        { 'name' : { 'localname' : project },\
365                          'user' : [ \
366                            {\
367                                'userID': { 'localname' : user }, \
368                                'access': [ { 
369                                    'sshPubkey': self.ssh_pubkey_file } ],
370                                'role': 'experimentCreation'\
371                            },\
372                            {\
373                                'userID': { 'localname' : user}, \
374                                'access': [ { 
375                                    'sshPubkey': self.ssh_pubkey_file } ],
376                                'role': 'serviceAccess'\
377                            },\
378                        ]}\
379                    }\
380            }
381            return self.allocate_project.static_project(preq)
382
383    def save_project_state(self, aid, ap, dyn, owners):
384        """
385        Parse out and save the information relevant to the project created for
386        this experiment.  That info is largely in ap and owners.  dyn indicates
387        that the project was created dynamically.  Return the user and project
388        names.
389        """
390        self.state_lock.acquire()
391        self.allocation[aid] = { }
392        self.allocation[aid]['auth'] = set()
393        try:
394            pname = ap['project']['name']['localname']
395        except KeyError:
396            pname = None
397
398        if dyn:
399            if not pname:
400                self.state_lock.release()
401                raise service_error(service_error.internal,
402                        "Misformed allocation response?")
403            if pname in self.projects: self.projects[pname] += 1
404            else: self.projects[pname] = 1
405            self.allocation[aid]['project'] = pname
406        else:
407            # sproject is a static project associated with this allocation.
408            self.allocation[aid]['sproject'] = pname
409
410        self.allocation[aid]['keys'] = [ ]
411
412        try:
413            for u in ap['project']['user']:
414                uname = u['userID']['localname']
415                if u['role'] == 'experimentCreation':
416                    self.allocation[aid]['user'] = uname
417                for k in [ k['sshPubkey'] for k in u['access'] \
418                        if k.has_key('sshPubkey') ]:
419                    kv = "%s:%s" % (uname, k)
420                    if self.keys.has_key(kv): self.keys[kv] += 1
421                    else: self.keys[kv] = 1
422                    self.allocation[aid]['keys'].append((uname, k))
423        except KeyError:
424            self.state_lock.release()
425            raise service_error(service_error.internal,
426                    "Misformed allocation response?")
427
428        self.allocation[aid]['owners'] = owners
429        self.write_state()
430        self.state_lock.release()
431        return (pname, uname)
432
433    # End of RequestAccess support routines
434
435    def RequestAccess(self, req, fid):
436        """
437        Handle the access request.  Proxy if not for us.
438
439        Parse out the fields and make the allocations or rejections if for us,
440        otherwise, assuming we're willing to proxy, proxy the request out.
441        """
442
443        def gateway_hardware(h):
444            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
445            else: return h
446
447        def get_export_project(svcs):
448            """
449            if the service requests includes one to export a project, return
450            that project.
451            """
452            rv = None
453            for s in svcs:
454                if s.get('name', '') == 'project_export' and \
455                        s.get('visibility', '') == 'export':
456                    if not rv: 
457                        for a in s.get('fedAttr', []):
458                            if a.get('attribute', '') == 'project' \
459                                    and 'value' in a:
460                                rv = a['value']
461                    else:
462                        raise service_error(service_error, access, 
463                                'Requesting multiple project exports is ' + \
464                                        'not supported');
465            return rv
466
467        # The dance to get into the request body
468        if req.has_key('RequestAccessRequestBody'):
469            req = req['RequestAccessRequestBody']
470        else:
471            raise service_error(service_error.req, "No request!?")
472
473        # if this includes a project export request, construct a filter such
474        # that only the ABAC attributes mapped to that project are checked for
475        # access.
476        if 'service' in req:
477            ep = get_export_project(req['service'])
478            if ep: pf = lambda(a): a.value[0] == ep
479            else: pf = None
480        else:
481            ep = None
482            pf = None
483
484        if self.auth.import_credentials(
485                data_list=req.get('abac_credential', [])):
486            self.auth.save()
487
488        if self.auth_type == "legacy":
489            found, dyn, owners= self.legacy_lookup_access(req, fid)
490            proof = access_proof("me", fid, "create")
491        elif self.auth_type == 'abac':
492            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
493        else:
494            raise service_error(service_error.internal, 
495                    'Unknown auth_type: %s' % self.auth_type)
496        ap = None
497
498        # This only happens in legacy lookups, but if this user has access to
499        # the testbed but not the project to be exported, raise the error.
500        if ep and ep != found[0]:
501            raise service_error(service_error.access,
502                    "Cannot export %s" % ep)
503
504        if self.ssh_pubkey_file:
505            ap = self.do_project_allocation(dyn[1], found[0], found[1])
506        else:
507            raise service_error(service_error.internal, 
508                    "SSH access parameters required")
509        # keep track of what's been added
510        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
511        aid = unicode(allocID)
512
513        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
514
515        services, svc_state = self.export_services(req.get('service',[]),
516                pname, uname)
517        self.state_lock.acquire()
518        # Store services state in global state
519        for k, v in svc_state.items():
520            self.allocation[aid][k] = v
521        self.append_allocation_authorization(aid, 
522                set([(o, allocID) for o in owners]), state_attr='allocation')
523        self.write_state()
524        self.state_lock.release()
525        try:
526            f = open("%s/%s.pem" % (self.certdir, aid), "w")
527            print >>f, alloc_cert
528            f.close()
529        except EnvironmentError, e:
530            raise service_error(service_error.internal, 
531                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
532        resp = self.build_access_response({ 'fedid': allocID } ,
533                ap, services, proof)
534        return resp
535
536    def do_release_project(self, del_project, del_users, del_types):
537        """
538        If a project and users has to be deleted, make the call.
539        """
540        msg = { 'project': { }}
541        if del_project:
542            msg['project']['name']= {'localname': del_project}
543        users = [ ]
544        for u in del_users.keys():
545            users.append({ 'userID': { 'localname': u },\
546                'access' :  \
547                        [ {'sshPubkey' : s } for s in del_users[u]]\
548            })
549        if users: 
550            msg['project']['user'] = users
551        if len(del_types) > 0:
552            msg['resources'] = { 'node': \
553                    [ {'hardware': [ h ] } for h in del_types ]\
554                }
555        if self.allocate_project.release_project:
556            msg = { 'ReleaseProjectRequestBody' : msg}
557            self.allocate_project.release_project(msg)
558
559    def ReleaseAccess(self, req, fid):
560        # The dance to get into the request body
561        if req.has_key('ReleaseAccessRequestBody'):
562            req = req['ReleaseAccessRequestBody']
563        else:
564            raise service_error(service_error.req, "No request!?")
565
566        try:
567            if req['allocID'].has_key('localname'):
568                auth_attr = aid = req['allocID']['localname']
569            elif req['allocID'].has_key('fedid'):
570                aid = unicode(req['allocID']['fedid'])
571                auth_attr = req['allocID']['fedid']
572            else:
573                raise service_error(service_error.req,
574                        "Only localnames and fedids are understood")
575        except KeyError:
576            raise service_error(service_error.req, "Badly formed request")
577
578        self.log.debug("[access] deallocation requested for %s by %s" % \
579                (aid, fid))
580        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
581                with_proof=True)
582        if not access_ok:
583            self.log.debug("[access] deallocation denied for %s", aid)
584            raise service_error(service_error.access, "Access Denied")
585
586        # If we know this allocation, reduce the reference counts and
587        # remove the local allocations.  Otherwise report an error.  If
588        # there is an allocation to delete, del_users will be a dictonary
589        # of sets where the key is the user that owns the keys in the set.
590        # We use a set to avoid duplicates.  del_project is just the name
591        # of any dynamic project to delete.  We're somewhat lazy about
592        # deleting authorization attributes.  Having access to something
593        # that doesn't exist isn't harmful.
594        del_users = { }
595        del_project = None
596        del_types = set()
597
598        self.state_lock.acquire()
599        if aid in self.allocation:
600            self.log.debug("Found allocation for %s" %aid)
601            self.clear_allocation_authorization(aid, state_attr='allocation')
602            for k in self.allocation[aid]['keys']:
603                kk = "%s:%s" % k
604                self.keys[kk] -= 1
605                if self.keys[kk] == 0:
606                    if not del_users.has_key(k[0]):
607                        del_users[k[0]] = set()
608                    del_users[k[0]].add(k[1])
609                    del self.keys[kk]
610
611            if 'project' in self.allocation[aid]:
612                pname = self.allocation[aid]['project']
613                self.projects[pname] -= 1
614                if self.projects[pname] == 0:
615                    del_project = pname
616                    del self.projects[pname]
617
618            if 'types' in self.allocation[aid]:
619                for t in self.allocation[aid]['types']:
620                    self.types[t] -= 1
621                    if self.types[t] == 0:
622                        if not del_project: del_project = t[0]
623                        del_types.add(t[1])
624                        del self.types[t]
625
626            del self.allocation[aid]
627            self.write_state()
628            self.state_lock.release()
629            # If we actually have resources to deallocate, prepare the call.
630            if del_project or del_users:
631                self.do_release_project(del_project, del_users, del_types)
632            # And remove the access cert
633            cf = "%s/%s.pem" % (self.certdir, aid)
634            self.log.debug("Removing %s" % cf)
635            os.remove(cf)
636            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
637        else:
638            self.state_lock.release()
639            raise service_error(service_error.req, "No such allocation")
640
641    # These are subroutines for StartSegment
642    def generate_ns2(self, topo, expfn, softdir, connInfo):
643        """
644        Convert topo into an ns2 file, decorated with appropriate commands for
645        the particular testbed setup.  Convert all requests for software, etc
646        to point at the staged copies on this testbed and add the federation
647        startcommands.
648        """
649        class dragon_commands:
650            """
651            Functor to spit out approrpiate dragon commands for nodes listed in
652            the connectivity description.  The constructor makes a dict mapping
653            dragon nodes to their parameters and the __call__ checks each
654            element in turn for membership.
655            """
656            def __init__(self, map):
657                self.node_info = map
658
659            def __call__(self, e):
660                s = ""
661                if isinstance(e, topdl.Computer):
662                    if self.node_info.has_key(e.name):
663                        info = self.node_info[e.name]
664                        for ifname, vlan, type in info:
665                            for i in e.interface:
666                                if i.name == ifname:
667                                    addr = i.get_attribute('ip4_address')
668                                    subs = i.substrate[0]
669                                    break
670                            else:
671                                raise service_error(service_error.internal,
672                                        "No interface %s on element %s" % \
673                                                (ifname, e.name))
674                            # XXX: do netmask right
675                            if type =='link':
676                                s = ("tb-allow-external ${%s} " + \
677                                        "dragonportal ip %s vlan %s " + \
678                                        "netmask 255.255.255.0\n") % \
679                                        (topdl.to_tcl_name(e.name), addr, vlan)
680                            elif type =='lan':
681                                s = ("tb-allow-external ${%s} " + \
682                                        "dragonportal " + \
683                                        "ip %s vlan %s usurp %s\n") % \
684                                        (topdl.to_tcl_name(e.name), addr, 
685                                                vlan, subs)
686                            else:
687                                raise service_error(service_error_internal,
688                                        "Unknown DRAGON type %s" % type)
689                return s
690
691        class not_dragon:
692            """
693            Return true if a node is in the given map of dragon nodes.
694            """
695            def __init__(self, map):
696                self.nodes = set(map.keys())
697
698            def __call__(self, e):
699                return e.name not in self.nodes
700
701        # Main line of generate_ns2
702        t = topo.clone()
703
704        # Create the map of nodes that need direct connections (dragon
705        # connections) from the connInfo
706        dragon_map = { }
707        for i in [ i for i in connInfo if i['type'] == 'transit']:
708            for a in i.get('fedAttr', []):
709                if a['attribute'] == 'vlan_id':
710                    vlan = a['value']
711                    break
712            else:
713                raise service_error(service_error.internal, "No vlan tag")
714            members = i.get('member', [])
715            if len(members) > 1: type = 'lan'
716            else: type = 'link'
717
718            try:
719                for m in members:
720                    if m['element'] in dragon_map:
721                        dragon_map[m['element']].append(( m['interface'], 
722                            vlan, type))
723                    else:
724                        dragon_map[m['element']] = [( m['interface'], 
725                            vlan, type),]
726            except KeyError:
727                raise service_error(service_error.req,
728                        "Missing connectivity info")
729
730        # Weed out the things we aren't going to instantiate: Segments, portal
731        # substrates, and portal interfaces.  (The copy in the for loop allows
732        # us to delete from e.elements in side the for loop).  While we're
733        # touching all the elements, we also adjust paths from the original
734        # testbed to local testbed paths and put the federation commands into
735        # the start commands
736        for e in [e for e in t.elements]:
737            if isinstance(e, topdl.Segment):
738                t.elements.remove(e)
739            if isinstance(e, topdl.Computer):
740                self.add_kit(e, self.federation_software)
741                if e.get_attribute('portal') and self.portal_startcommand:
742                    # Add local portal support software
743                    self.add_kit(e, self.portal_software)
744                    # Portals never have a user-specified start command
745                    e.set_attribute('startup', self.portal_startcommand)
746                elif self.node_startcommand:
747                    if e.get_attribute('startup'):
748                        e.set_attribute('startup', "%s \\$USER '%s'" % \
749                                (self.node_startcommand, 
750                                    e.get_attribute('startup')))
751                    else:
752                        e.set_attribute('startup', self.node_startcommand)
753
754                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
755                # Remove portal interfaces that do not connect to DRAGON
756                e.interface = [i for i in e.interface \
757                        if not i.get_attribute('portal') or i.name in dinf ]
758            # Fix software paths
759            for s in getattr(e, 'software', []):
760                s.location = re.sub("^.*/", softdir, s.location)
761
762        t.substrates = [ s.clone() for s in t.substrates ]
763        t.incorporate_elements()
764
765        # Customize the ns2 output for local portal commands and images
766        filters = []
767
768        if self.dragon_endpoint:
769            add_filter = not_dragon(dragon_map)
770            filters.append(dragon_commands(dragon_map))
771        else:
772            add_filter = None
773
774        if self.portal_command:
775            filters.append(topdl.generate_portal_command_filter(
776                self.portal_command, add_filter=add_filter))
777
778        if self.portal_image:
779            filters.append(topdl.generate_portal_image_filter(
780                self.portal_image))
781
782        if self.portal_type:
783            filters.append(topdl.generate_portal_hardware_filter(
784                self.portal_type))
785
786        # Convert to ns and write it out
787        expfile = topdl.topology_to_ns2(t, filters)
788        try:
789            f = open(expfn, "w")
790            print >>f, expfile
791            f.close()
792        except EnvironmentError:
793            raise service_error(service_error.internal,
794                    "Cannot write experiment file %s: %s" % (expfn,e))
795
796    def export_store_info(self, cf, proj, ename, connInfo):
797        """
798        For the export requests in the connection info, install the peer names
799        at the experiment controller via SetValue calls.
800        """
801
802        for c in connInfo:
803            for p in [ p for p in c.get('parameter', []) \
804                    if p.get('type', '') == 'output']:
805
806                if p.get('name', '') == 'peer':
807                    k = p.get('key', None)
808                    surl = p.get('store', None)
809                    if surl and k and k.index('/') != -1:
810                        value = "%s.%s.%s%s" % \
811                                (k[k.index('/')+1:], ename, proj, self.domain)
812                        req = { 'name': k, 'value': value }
813                        self.log.debug("Setting %s to %s on %s" % \
814                                (k, value, surl))
815                        self.call_SetValue(surl, req, cf)
816                    else:
817                        self.log.error("Bad export request: %s" % p)
818                elif p.get('name', '') == 'ssh_port':
819                    k = p.get('key', None)
820                    surl = p.get('store', None)
821                    if surl and k:
822                        req = { 'name': k, 'value': self.ssh_port }
823                        self.log.debug("Setting %s to %s on %s" % \
824                                (k, self.ssh_port, surl))
825                        self.call_SetValue(surl, req, cf)
826                    else:
827                        self.log.error("Bad export request: %s" % p)
828                else:
829                    self.log.error("Unknown export parameter: %s" % \
830                            p.get('name'))
831                    continue
832
833    def add_seer_node(self, topo, name, startup):
834        """
835        Add a seer node to the given topology, with the startup command passed
836        in.  Used by configure seer_services.
837        """
838        c_node = topdl.Computer(
839                name=name, 
840                os= topdl.OperatingSystem(
841                    attribute=[
842                    { 'attribute': 'osid', 
843                        'value': self.local_seer_image },
844                    ]),
845                attribute=[
846                    { 'attribute': 'startup', 'value': startup },
847                    ]
848                )
849        self.add_kit(c_node, self.local_seer_software)
850        topo.elements.append(c_node)
851
852    def configure_seer_services(self, services, topo, softdir):
853        """
854        Make changes to the topology required for the seer requests being made.
855        Specifically, add any control or master nodes required and set up the
856        start commands on the nodes to interconnect them.
857        """
858        local_seer = False      # True if we need to add a control node
859        collect_seer = False    # True if there is a seer-master node
860        seer_master= False      # True if we need to add the seer-master
861        for s in services:
862            s_name = s.get('name', '')
863            s_vis = s.get('visibility','')
864
865            if s_name  == 'local_seer_control' and s_vis == 'export':
866                local_seer = True
867            elif s_name == 'seer_master':
868                if s_vis == 'import':
869                    collect_seer = True
870                elif s_vis == 'export':
871                    seer_master = True
872       
873        # We've got the whole picture now, so add nodes if needed and configure
874        # them to interconnect properly.
875        if local_seer or seer_master:
876            # Copy local seer control node software to the tempdir
877            for l, f in self.local_seer_software:
878                base = os.path.basename(f)
879                copy_file(f, "%s/%s" % (softdir, base))
880        # If we're collecting seers somewhere the controllers need to talk to
881        # the master.  In testbeds that export the service, that will be a
882        # local node that we'll add below.  Elsewhere it will be the control
883        # portal that will port forward to the exporting master.
884        if local_seer:
885            if collect_seer:
886                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
887            else:
888                startup = self.local_seer_start
889            self.add_seer_node(topo, 'control', startup)
890        # If this is the seer master, add that node, too.
891        if seer_master:
892            self.add_seer_node(topo, 'seer-master', 
893                    "%s -R -n -R seer-master -R -A -R sink" % \
894                            self.seer_master_start)
895
896    def retrieve_software(self, topo, certfile, softdir):
897        """
898        Collect the software that nodes in the topology need loaded and stage
899        it locally.  This implies retrieving it from the experiment_controller
900        and placing it into softdir.  Certfile is used to prove that this node
901        has access to that data (it's the allocation/segment fedid).  Finally
902        local portal and federation software is also copied to the same staging
903        directory for simplicity - all software needed for experiment creation
904        is in softdir.
905        """
906        sw = set()
907        for e in topo.elements:
908            for s in getattr(e, 'software', []):
909                sw.add(s.location)
910        for s in sw:
911            self.log.debug("Retrieving %s" % s)
912            try:
913                get_url(s, certfile, softdir)
914            except:
915                t, v, st = sys.exc_info()
916                raise service_error(service_error.internal,
917                        "Error retrieving %s: %s" % (s, v))
918
919        # Copy local federation and portal node software to the tempdir
920        for s in (self.federation_software, self.portal_software):
921            for l, f in s:
922                base = os.path.basename(f)
923                copy_file(f, "%s/%s" % (softdir, base))
924
925
926    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
927        """
928        Gather common configuration files, retrieve or create an experiment
929        name and project name, and return the ssh_key filenames.  Create an
930        allocation log bound to the state log variable as well.
931        """
932        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
933                'seer_ca_pem', 'seer_node_pem')
934        ename = None
935        pubkey_base = None
936        secretkey_base = None
937        proj = None
938        user = None
939        alloc_log = None
940        nonce_experiment = False
941        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
942
943        self.state_lock.acquire()
944        if aid in self.allocation:
945            proj = self.allocation[aid].get('project', None)
946            if not proj: 
947                proj = self.allocation[aid].get('sproject', None)
948        self.state_lock.release()
949
950        if not proj:
951            raise service_error(service_error.internal, 
952                    "Can't find project for %s" %aid)
953
954        for a in attrs:
955            if a['attribute'] in configs:
956                try:
957                    self.log.debug("Retrieving %s from %s" % \
958                            (a['attribute'], a['value']))
959                    get_url(a['value'], certfile, tmpdir)
960                except:
961                    t, v, st = sys.exc_info()
962                    raise service_error(service_error.internal,
963                            "Error retrieving %s: %s" % (a.get('value', ""), v))
964            if a['attribute'] == 'ssh_pubkey':
965                pubkey_base = a['value'].rpartition('/')[2]
966            if a['attribute'] == 'ssh_secretkey':
967                secretkey_base = a['value'].rpartition('/')[2]
968            if a['attribute'] == 'experiment_name':
969                ename = a['value']
970
971        # Names longer than the emulab max are discarded
972        if ename and len(ename) <= self.max_name_len:
973            # Clean up the experiment name so that emulab will accept it.
974            ename = re.sub(vchars_re, '-', ename)
975
976        else:
977            ename = ""
978            for i in range(0,5):
979                ename += random.choice(string.ascii_letters)
980            nonce_experiment = True
981            self.log.warn("No experiment name or suggestion too long: " + \
982                    "picked one randomly: %s" % ename)
983
984        if not pubkey_base:
985            raise service_error(service_error.req, 
986                    "No public key attribute")
987
988        if not secretkey_base:
989            raise service_error(service_error.req, 
990                    "No secret key attribute")
991
992        self.state_lock.acquire()
993        if aid in self.allocation:
994            user = self.allocation[aid].get('user', None)
995            self.allocation[aid]['experiment'] = ename
996            self.allocation[aid]['nonce'] = nonce_experiment
997            self.allocation[aid]['log'] = [ ]
998            # Create a logger that logs to the experiment's state object as
999            # well as to the main log file.
1000            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1001            h = logging.StreamHandler(
1002                    list_log.list_log(self.allocation[aid]['log']))
1003            # XXX: there should be a global one of these rather than
1004            # repeating the code.
1005            h.setFormatter(logging.Formatter(
1006                "%(asctime)s %(name)s %(message)s",
1007                        '%d %b %y %H:%M:%S'))
1008            alloc_log.addHandler(h)
1009            self.write_state()
1010        self.state_lock.release()
1011
1012        if not user:
1013            raise service_error(service_error.internal, 
1014                    "Can't find creation user for %s" %aid)
1015
1016        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1017
1018    def decorate_topology(self, info, t):
1019        """
1020        Copy the physical mapping and status onto the topology.  Used by
1021        StartSegment and InfoSegment
1022        """
1023        def add_new(ann, attr):
1024            for a in ann:
1025                if a not in attr: attr.append(a)
1026
1027        # Copy the assigned names into the return topology
1028        for e in t.elements:
1029            if isinstance(e, topdl.Computer):
1030                if not self.create_debug:
1031                    if e.name in info.node:
1032                        add_new(("%s%s" % (info.node[e.name][0], self.domain),),
1033                                e.localname)
1034                        e.status = info.node[e.name][1]
1035                else:
1036                    # Simple debugging assignment
1037                    add_new(("node%d%s" % (i, self.domain),), e.localname)
1038                    e.status = 'active'
1039                    add_new(('testop1', 'testop2'), e.operation)
1040                    i += 1
1041
1042
1043    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1044        """
1045        Store key bits of experiment state in the global repository, including
1046        the response that may need to be replayed, and return the response.
1047        """
1048        i = 0
1049        t = topo.clone()
1050        self.decorate_topology(starter, t)
1051        # Grab the log (this is some anal locking, but better safe than
1052        # sorry)
1053        self.state_lock.acquire()
1054        logv = "".join(self.allocation[aid]['log'])
1055        # It's possible that the StartSegment call gets retried (!).
1056        # if the 'started' key is in the allocation, we'll return it rather
1057        # than redo the setup.
1058        self.allocation[aid]['started'] = { 
1059                'allocID': alloc_id,
1060                'allocationLog': logv,
1061                'segmentdescription': { 
1062                    'topdldescription': t.clone().to_dict()
1063                    },
1064                'proof': proof.to_dict(),
1065                }
1066        self.allocation[aid]['topo'] = topo.clone()
1067        retval = copy.copy(self.allocation[aid]['started'])
1068        self.write_state()
1069        self.state_lock.release()
1070        return retval
1071   
1072    # End of StartSegment support routines
1073
1074    def StartSegment(self, req, fid):
1075        err = None  # Any service_error generated after tmpdir is created
1076        rv = None   # Return value from segment creation
1077
1078        try:
1079            req = req['StartSegmentRequestBody']
1080            auth_attr = req['allocID']['fedid']
1081            topref = req['segmentdescription']['topdldescription']
1082        except KeyError:
1083            raise service_error(server_error.req, "Badly formed request")
1084
1085        connInfo = req.get('connection', [])
1086        services = req.get('service', [])
1087        aid = "%s" % auth_attr
1088        attrs = req.get('fedAttr', [])
1089
1090        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1091                with_proof=True)
1092        if not access_ok:
1093            raise service_error(service_error.access, "Access denied")
1094        else:
1095            # See if this is a replay of an earlier succeeded StartSegment -
1096            # sometimes SSL kills 'em.  If so, replay the response rather than
1097            # redoing the allocation.
1098            self.state_lock.acquire()
1099            retval = self.allocation[aid].get('started', None)
1100            self.state_lock.release()
1101            if retval:
1102                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1103                        "replaying response")
1104                return retval
1105
1106        # A new request.  Do it.
1107
1108        if topref: topo = topdl.Topology(**topref)
1109        else:
1110            raise service_error(service_error.req, 
1111                    "Request missing segmentdescription'")
1112       
1113        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1114        try:
1115            tmpdir = tempfile.mkdtemp(prefix="access-")
1116            softdir = "%s/software" % tmpdir
1117            os.mkdir(softdir)
1118        except EnvironmentError:
1119            raise service_error(service_error.internal, "Cannot create tmp dir")
1120
1121        # Try block alllows us to clean up temporary files.
1122        try:
1123            self.retrieve_software(topo, certfile, softdir)
1124            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1125                    self.initialize_experiment_info(attrs, aid, 
1126                            certfile, tmpdir)
1127
1128            if '/' in proj: proj, gid = proj.split('/')
1129            else: gid = None
1130
1131
1132            # Set up userconf and seer if needed
1133            self.configure_userconf(services, tmpdir)
1134            self.configure_seer_services(services, topo, softdir)
1135            # Get and send synch store variables
1136            self.export_store_info(certfile, proj, ename, connInfo)
1137            self.import_store_info(certfile, connInfo)
1138
1139            expfile = "%s/experiment.tcl" % tmpdir
1140
1141            self.generate_portal_configs(topo, pubkey_base, 
1142                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1143            self.generate_ns2(topo, expfile, 
1144                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1145
1146            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1147                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1148                    cert=self.xmlrpc_cert)
1149            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1150        except service_error, e:
1151            err = e
1152        except:
1153            t, v, st = sys.exc_info()
1154            err = service_error(service_error.internal, "%s: %s" % \
1155                    (v, traceback.extract_tb(st)))
1156
1157        # Walk up tmpdir, deleting as we go
1158        if self.cleanup: self.remove_dirs(tmpdir)
1159        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1160
1161        if rv:
1162            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1163                    proof)
1164        elif err:
1165            raise service_error(service_error.federant,
1166                    "Swapin failed: %s" % err)
1167        else:
1168            raise service_error(service_error.federant, "Swapin failed")
1169
1170    def TerminateSegment(self, req, fid):
1171        try:
1172            req = req['TerminateSegmentRequestBody']
1173        except KeyError:
1174            raise service_error(server_error.req, "Badly formed request")
1175
1176        auth_attr = req['allocID']['fedid']
1177        aid = "%s" % auth_attr
1178        attrs = req.get('fedAttr', [])
1179
1180        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1181                with_proof=True)
1182        if not access_ok:
1183            raise service_error(service_error.access, "Access denied")
1184
1185        self.state_lock.acquire()
1186        if aid in self.allocation:
1187            proj = self.allocation[aid].get('project', None)
1188            if not proj: 
1189                proj = self.allocation[aid].get('sproject', None)
1190            user = self.allocation[aid].get('user', None)
1191            ename = self.allocation[aid].get('experiment', None)
1192            nonce = self.allocation[aid].get('nonce', False)
1193        else:
1194            proj = None
1195            user = None
1196            ename = None
1197            nonce = False
1198        self.state_lock.release()
1199
1200        if not proj:
1201            raise service_error(service_error.internal, 
1202                    "Can't find project for %s" % aid)
1203        else:
1204            if '/' in proj: proj, gid = proj.split('/')
1205            else: gid = None
1206
1207        if not user:
1208            raise service_error(service_error.internal, 
1209                    "Can't find creation user for %s" % aid)
1210        if not ename:
1211            raise service_error(service_error.internal, 
1212                    "Can't find experiment name for %s" % aid)
1213        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1214                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1215        stopper(self, user, proj, ename, gid, nonce)
1216        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1217
1218    def InfoSegment(self, req, fid):
1219        try:
1220            req = req['InfoSegmentRequestBody']
1221        except KeyError:
1222            raise service_error(server_error.req, "Badly formed request")
1223
1224        auth_attr = req['allocID']['fedid']
1225        aid = "%s" % auth_attr
1226
1227        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1228                with_proof=True)
1229        if not access_ok:
1230            raise service_error(service_error.access, "Access denied")
1231
1232        self.state_lock.acquire()
1233        if aid in self.allocation:
1234            topo = self.allocation[aid].get('topo', None)
1235            if topo: topo = topo.clone()
1236            proj = self.allocation[aid].get('project', None)
1237            if not proj: 
1238                proj = self.allocation[aid].get('sproject', None)
1239            user = self.allocation[aid].get('user', None)
1240            ename = self.allocation[aid].get('experiment', None)
1241            nonce = self.allocation[aid].get('nonce', False)
1242        else:
1243            proj = None
1244            user = None
1245            ename = None
1246            nonce = False
1247        self.state_lock.release()
1248
1249        if not proj:
1250            raise service_error(service_error.internal, 
1251                    "Can't find project for %s" % aid)
1252        else:
1253            if '/' in proj: proj, gid = proj.split('/')
1254            else: gid = None
1255
1256        if not user:
1257            raise service_error(service_error.internal, 
1258                    "Can't find creation user for %s" % aid)
1259        if not ename:
1260            raise service_error(service_error.internal, 
1261                    "Can't find experiment name for %s" % aid)
1262        info = self.info_segment(keyfile=self.ssh_privkey_file,
1263                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1264        info(self, user, proj, ename)
1265        self.decorate_topology(info, topo)
1266        return { 
1267                'allocID': req['allocID'], 
1268                'segmentdescription': 
1269                    { 'topdldescription' : topo.to_dict() },
1270                'proof': proof.to_dict(),
1271                }
Note: See TracBrowser for help on using the repository browser.