source: fedd/federation/emulab_access.py @ de2ef5c

compt_changesinfo-ops
Last change on this file since de2ef5c was e83f2f2, checked in by Ted Faber <faber@…>, 14 years ago

Move proofs around. Lots of changes, including fault handling.

  • Property mode set to 100644
File size: 38.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        access_base.__init__(self, config, auth)
59
60        self.allow_proxy = config.getboolean("access", "allow_proxy")
61
62        self.domain = config.get("access", "domain")
63        self.userconfdir = config.get("access","userconfdir")
64        self.userconfcmd = config.get("access","userconfcmd")
65        self.userconfurl = config.get("access","userconfurl")
66        self.federation_software = config.get("access", "federation_software")
67        self.portal_software = config.get("access", "portal_software")
68        self.local_seer_software = config.get("access", "local_seer_software")
69        self.local_seer_image = config.get("access", "local_seer_image")
70        self.local_seer_start = config.get("access", "local_seer_start")
71        self.seer_master_start = config.get("access", "seer_master_start")
72        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
73        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
74        self.ssh_port = config.get("access","ssh_port") or "22"
75        self.boss = config.get("access", "boss")
76        self.ops = config.get("access", "ops")
77        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
78        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
79
80        self.dragon_endpoint = config.get("access", "dragon")
81        self.dragon_vlans = config.get("access", "dragon_vlans")
82        self.deter_internal = config.get("access", "deter_internal")
83
84        self.tunnel_config = config.getboolean("access", "tunnel_config")
85        self.portal_command = config.get("access", "portal_command")
86        self.portal_image = config.get("access", "portal_image")
87        self.portal_type = config.get("access", "portal_type") or "pc"
88        self.portal_startcommand = config.get("access", "portal_startcommand")
89        self.node_startcommand = config.get("access", "node_startcommand")
90
91        self.federation_software = self.software_list(self.federation_software)
92        self.portal_software = self.software_list(self.portal_software)
93        self.local_seer_software = self.software_list(self.local_seer_software)
94
95        self.access_type = self.access_type.lower()
96        if self.access_type == 'remote_emulab':
97            self.start_segment = proxy_emulab_segment.start_segment
98            self.stop_segment = proxy_emulab_segment.stop_segment
99        elif self.access_type == 'local_emulab':
100            self.start_segment = local_emulab_segment.start_segment
101            self.stop_segment = local_emulab_segment.stop_segment
102        else:
103            self.start_segment = None
104            self.stop_segment = None
105
106        self.restricted = [ ]
107        # XXX: this should go?
108        #if config.has_option("access", "accessdb"):
109        #    self.read_access(config.get("access", "accessdb"))
110        tb = config.get('access', 'testbed')
111        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
112        else: self.testbed = [ ]
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            self.access = { }
122            if accessdb:
123                self.legacy_read_access(accessdb, self.legacy_access_tuple)
124        elif self.auth_type == 'abac':
125            self.auth = abac_authorizer(load=self.auth_dir)
126            self.access = [ ]
127            if accessdb:
128                self.read_access(accessdb, self.access_tuple)
129        else:
130            raise service_error(service_error.internal, 
131                    "Unknown auth_type: %s" % self.auth_type)
132
133        # read_state in the base_class
134        self.state_lock.acquire()
135        for a  in ('allocation', 'projects', 'keys', 'types'):
136            if a not in self.state:
137                self.state[a] = { }
138        self.allocation = self.state['allocation']
139        self.projects = self.state['projects']
140        self.keys = self.state['keys']
141        self.types = self.state['types']
142        if self.auth_type == "legacy": 
143            # Add the ownership attributes to the authorizer.  Note that the
144            # indices of the allocation dict are strings, but the attributes are
145            # fedids, so there is a conversion.
146            for k in self.allocation.keys():
147                for o in self.allocation[k].get('owners', []):
148                    self.auth.set_attribute(o, fedid(hexstr=k))
149                if self.allocation[k].has_key('userconfig'):
150                    sfid = self.allocation[k]['userconfig']
151                    fid = fedid(hexstr=sfid)
152                    self.auth.set_attribute(fid, "/%s" % sfid)
153        self.state_lock.release()
154        self.exports = {
155                'SMB': self.export_SMB,
156                'seer': self.export_seer,
157                'tmcd': self.export_tmcd,
158                'userconfig': self.export_userconfig,
159                'project_export': self.export_project_export,
160                'local_seer_control': self.export_local_seer,
161                'seer_master': self.export_seer_master,
162                'hide_hosts': self.export_hide_hosts,
163                }
164
165        if not self.local_seer_image or not self.local_seer_software or \
166                not self.local_seer_start:
167            if 'local_seer_control' in self.exports:
168                del self.exports['local_seer_control']
169
170        if not self.local_seer_image or not self.local_seer_software or \
171                not self.seer_master_start:
172            if 'seer_master' in self.exports:
173                del self.exports['seer_master']
174
175
176        self.soap_services = {\
177            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
178            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
179            'StartSegment': soap_handler("StartSegment", self.StartSegment),
180            'TerminateSegment': soap_handler("TerminateSegment",
181                self.TerminateSegment),
182            }
183        self.xmlrpc_services =  {\
184            'RequestAccess': xmlrpc_handler('RequestAccess',
185                self.RequestAccess),
186            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
187                self.ReleaseAccess),
188            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
189            'TerminateSegment': xmlrpc_handler('TerminateSegment',
190                self.TerminateSegment),
191            }
192
193        self.call_SetValue = service_caller('SetValue')
194        self.call_GetValue = service_caller('GetValue', log=self.log)
195
196        if not config.has_option("allocate", "uri"):
197            self.allocate_project = \
198                allocate_project_local(config, auth)
199        else:
200            self.allocate_project = \
201                allocate_project_remote(config, auth)
202
203
204        # If the project allocator exports services, put them in this object's
205        # maps so that classes that instantiate this can call the services.
206        self.soap_services.update(self.allocate_project.soap_services)
207        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
208
209    @staticmethod
210    def legacy_access_tuple(str):
211        """
212        Convert a string of the form (id[:resources:resouces], id, id) into a
213        tuple of the form (project, user, user) where users may be names or
214        fedids.  The resources strings are obsolete and ignored.
215        """
216        def parse_name(n):
217            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
218            else: return n
219
220        str = str.strip()
221        if str.startswith('(') and str.endswith(')'):
222            str = str[1:-1]
223            names = [ s.strip() for s in str.split(",")]
224            if len(names) > 3:
225                raise self.parse_error("More than three fields in name")
226            first = names[0].split(":")
227            if first == 'fedid:':
228                del first[0]
229                first[0] = fedid(hexstr=first[0])
230            names[0] = first[0]
231
232            for i in range(1,2):
233                names[i] = parse_name(names[i])
234
235            return tuple(names)
236        else:
237            raise self.parse_error('Bad mapping (unbalanced parens)')
238
239    @staticmethod
240    def access_tuple(str):
241        """
242        Convert a string of the form (id, id) into an access_project.  This is
243        called by read_access to convert to local attributes.  It returns
244        a tuple of the form (project, user, user) where the two users are
245        always the same.
246        """
247
248        str = str.strip()
249        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
250            # The slice takes the parens off the string.
251            proj, user = str[1:-1].split(',')
252            return (proj.strip(), user.strip(), user.strip())
253        else:
254            raise self.parse_error(
255                    'Bad mapping (unbalanced parens or more than 1 comma)')
256
257    # RequestAccess support routines
258
259    def legacy_lookup_access(self, req, fid):
260        """
261        Look up the local access control information mapped to this fedid and
262        credentials.  In this case it is a (project, create_user, access_user)
263        triple, and a triple of three booleans indicating which, if any will
264        need to be dynamically created.  Finally a list of owners for that
265        allocation is returned.
266
267        lookup_access_base pulls the first triple out, and it is parsed by this
268        routine into the boolean map.  Owners is always the controlling fedid.
269        """
270        # Return values
271        rp = None
272        ru = None
273        # This maps a valid user to the Emulab projects and users to use
274        found, match = self.legacy_lookup_access_base(req, fid)
275        tb, project, user = match
276       
277        if found == None:
278            raise service_error(service_error.access,
279                    "Access denied - cannot map access")
280
281        # resolve <dynamic> and <same> in found
282        dyn_proj = False
283        dyn_create_user = False
284        dyn_service_user = False
285
286        if found[0] == "<same>":
287            if project != None:
288                rp = project
289            else : 
290                raise service_error(\
291                        service_error.server_config,
292                        "Project matched <same> when no project given")
293        elif found[0] == "<dynamic>":
294            rp = None
295            dyn_proj = True
296        else:
297            rp = found[0]
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def do_project_allocation(self, dyn, project, user):
331        """
332        Call the project allocation routines and return the info.
333        """
334        if dyn: 
335            # Compose the dynamic project request
336            # (only dynamic, dynamic currently allowed)
337            preq = { 'AllocateProjectRequestBody': \
338                        { 'project' : {\
339                            'user': [ \
340                            { \
341                                'access': [ { 
342                                    'sshPubkey': self.ssh_pubkey_file } ],
343                                 'role': "serviceAccess",\
344                            }, \
345                            { \
346                                'access': [ { 
347                                    'sshPubkey': self.ssh_pubkey_file } ],
348                                 'role': "experimentCreation",\
349                            }, \
350                            ], \
351                            }\
352                        }\
353                    }
354            return self.allocate_project.dynamic_project(preq)
355        else:
356            preq = {'StaticProjectRequestBody' : \
357                    { 'project': \
358                        { 'name' : { 'localname' : project },\
359                          'user' : [ \
360                            {\
361                                'userID': { 'localname' : user }, \
362                                'access': [ { 
363                                    'sshPubkey': self.ssh_pubkey_file } ],
364                                'role': 'experimentCreation'\
365                            },\
366                            {\
367                                'userID': { 'localname' : user}, \
368                                'access': [ { 
369                                    'sshPubkey': self.ssh_pubkey_file } ],
370                                'role': 'serviceAccess'\
371                            },\
372                        ]}\
373                    }\
374            }
375            return self.allocate_project.static_project(preq)
376
377    def save_project_state(self, aid, ap, dyn, owners):
378        """
379        Parse out and save the information relevant to the project created for
380        this experiment.  That info is largely in ap and owners.  dyn indicates
381        that the project was created dynamically.  Return the user and project
382        names.
383        """
384        self.state_lock.acquire()
385        self.allocation[aid] = { }
386        self.allocation[aid]['auth'] = set()
387        try:
388            pname = ap['project']['name']['localname']
389        except KeyError:
390            pname = None
391
392        if dyn:
393            if not pname:
394                self.state_lock.release()
395                raise service_error(service_error.internal,
396                        "Misformed allocation response?")
397            if pname in self.projects: self.projects[pname] += 1
398            else: self.projects[pname] = 1
399            self.allocation[aid]['project'] = pname
400        else:
401            # sproject is a static project associated with this allocation.
402            self.allocation[aid]['sproject'] = pname
403
404        self.allocation[aid]['keys'] = [ ]
405
406        try:
407            for u in ap['project']['user']:
408                uname = u['userID']['localname']
409                if u['role'] == 'experimentCreation':
410                    self.allocation[aid]['user'] = uname
411                for k in [ k['sshPubkey'] for k in u['access'] \
412                        if k.has_key('sshPubkey') ]:
413                    kv = "%s:%s" % (uname, k)
414                    if self.keys.has_key(kv): self.keys[kv] += 1
415                    else: self.keys[kv] = 1
416                    self.allocation[aid]['keys'].append((uname, k))
417        except KeyError:
418            self.state_lock.release()
419            raise service_error(service_error.internal,
420                    "Misformed allocation response?")
421
422        self.allocation[aid]['owners'] = owners
423        self.write_state()
424        self.state_lock.release()
425        return (pname, uname)
426
427    # End of RequestAccess support routines
428
429    def RequestAccess(self, req, fid):
430        """
431        Handle the access request.  Proxy if not for us.
432
433        Parse out the fields and make the allocations or rejections if for us,
434        otherwise, assuming we're willing to proxy, proxy the request out.
435        """
436
437        def gateway_hardware(h):
438            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
439            else: return h
440
441        def get_export_project(svcs):
442            """
443            if the service requests includes one to export a project, return
444            that project.
445            """
446            rv = None
447            for s in svcs:
448                if s.get('name', '') == 'project_export' and \
449                        s.get('visibility', '') == 'export':
450                    if not rv: 
451                        for a in s.get('fedAttr', []):
452                            if a.get('attribute', '') == 'project' \
453                                    and 'value' in a:
454                                rv = a['value']
455                    else:
456                        raise service_error(service_error, access, 
457                                'Requesting multiple project exports is ' + \
458                                        'not supported');
459            return rv
460
461        # The dance to get into the request body
462        if req.has_key('RequestAccessRequestBody'):
463            req = req['RequestAccessRequestBody']
464        else:
465            raise service_error(service_error.req, "No request!?")
466
467        # if this includes a project export request, construct a filter such
468        # that only the ABAC attributes mapped to that project are checked for
469        # access.
470        if 'service' in req:
471            ep = get_export_project(req['service'])
472            if ep: pf = lambda(a): a.value[0] == ep
473            else: pf = None
474        else:
475            ep = None
476            pf = None
477
478        if self.auth.import_credentials(
479                data_list=req.get('abac_credential', [])):
480            self.auth.save()
481
482        if self.auth_type == "legacy":
483            found, dyn, owners= self.legacy_lookup_access(req, fid)
484            proof = access_proof("me", fid, "create")
485        elif self.auth_type == 'abac':
486            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
487        else:
488            raise service_error(service_error.internal, 
489                    'Unknown auth_type: %s' % self.auth_type)
490        ap = None
491
492        # This only happens in legacy lookups, but if this user has access to
493        # the testbed but not the project to be exported, raise the error.
494        if ep and ep != found[0]:
495            raise service_error(service_error.access,
496                    "Cannot export %s" % ep)
497
498        if self.ssh_pubkey_file:
499            ap = self.do_project_allocation(dyn[1], found[0], found[1])
500        else:
501            raise service_error(service_error.internal, 
502                    "SSH access parameters required")
503        # keep track of what's been added
504        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
505        aid = unicode(allocID)
506
507        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
508
509        services, svc_state = self.export_services(req.get('service',[]),
510                pname, uname)
511        self.state_lock.acquire()
512        # Store services state in global state
513        for k, v in svc_state.items():
514            self.allocation[aid][k] = v
515        self.append_allocation_authorization(aid, 
516                set([(o, allocID) for o in owners]), state_attr='allocation')
517        self.write_state()
518        self.state_lock.release()
519        try:
520            f = open("%s/%s.pem" % (self.certdir, aid), "w")
521            print >>f, alloc_cert
522            f.close()
523        except EnvironmentError, e:
524            raise service_error(service_error.internal, 
525                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
526        resp = self.build_access_response({ 'fedid': allocID } ,
527                ap, services, proof)
528        return resp
529
530    def do_release_project(self, del_project, del_users, del_types):
531        """
532        If a project and users has to be deleted, make the call.
533        """
534        msg = { 'project': { }}
535        if del_project:
536            msg['project']['name']= {'localname': del_project}
537        users = [ ]
538        for u in del_users.keys():
539            users.append({ 'userID': { 'localname': u },\
540                'access' :  \
541                        [ {'sshPubkey' : s } for s in del_users[u]]\
542            })
543        if users: 
544            msg['project']['user'] = users
545        if len(del_types) > 0:
546            msg['resources'] = { 'node': \
547                    [ {'hardware': [ h ] } for h in del_types ]\
548                }
549        if self.allocate_project.release_project:
550            msg = { 'ReleaseProjectRequestBody' : msg}
551            self.allocate_project.release_project(msg)
552
553    def ReleaseAccess(self, req, fid):
554        # The dance to get into the request body
555        if req.has_key('ReleaseAccessRequestBody'):
556            req = req['ReleaseAccessRequestBody']
557        else:
558            raise service_error(service_error.req, "No request!?")
559
560        try:
561            if req['allocID'].has_key('localname'):
562                auth_attr = aid = req['allocID']['localname']
563            elif req['allocID'].has_key('fedid'):
564                aid = unicode(req['allocID']['fedid'])
565                auth_attr = req['allocID']['fedid']
566            else:
567                raise service_error(service_error.req,
568                        "Only localnames and fedids are understood")
569        except KeyError:
570            raise service_error(service_error.req, "Badly formed request")
571
572        self.log.debug("[access] deallocation requested for %s by %s" % \
573                (aid, fid))
574        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
575                with_proof=True)
576        if not access_ok:
577            self.log.debug("[access] deallocation denied for %s", aid)
578            raise service_error(service_error.access, "Access Denied")
579
580        # If we know this allocation, reduce the reference counts and
581        # remove the local allocations.  Otherwise report an error.  If
582        # there is an allocation to delete, del_users will be a dictonary
583        # of sets where the key is the user that owns the keys in the set.
584        # We use a set to avoid duplicates.  del_project is just the name
585        # of any dynamic project to delete.  We're somewhat lazy about
586        # deleting authorization attributes.  Having access to something
587        # that doesn't exist isn't harmful.
588        del_users = { }
589        del_project = None
590        del_types = set()
591
592        self.state_lock.acquire()
593        if aid in self.allocation:
594            self.log.debug("Found allocation for %s" %aid)
595            self.clear_allocation_authorization(aid, state_attr='allocation')
596            for k in self.allocation[aid]['keys']:
597                kk = "%s:%s" % k
598                self.keys[kk] -= 1
599                if self.keys[kk] == 0:
600                    if not del_users.has_key(k[0]):
601                        del_users[k[0]] = set()
602                    del_users[k[0]].add(k[1])
603                    del self.keys[kk]
604
605            if 'project' in self.allocation[aid]:
606                pname = self.allocation[aid]['project']
607                self.projects[pname] -= 1
608                if self.projects[pname] == 0:
609                    del_project = pname
610                    del self.projects[pname]
611
612            if 'types' in self.allocation[aid]:
613                for t in self.allocation[aid]['types']:
614                    self.types[t] -= 1
615                    if self.types[t] == 0:
616                        if not del_project: del_project = t[0]
617                        del_types.add(t[1])
618                        del self.types[t]
619
620            del self.allocation[aid]
621            self.write_state()
622            self.state_lock.release()
623            # If we actually have resources to deallocate, prepare the call.
624            if del_project or del_users:
625                self.do_release_project(del_project, del_users, del_types)
626            # And remove the access cert
627            cf = "%s/%s.pem" % (self.certdir, aid)
628            self.log.debug("Removing %s" % cf)
629            os.remove(cf)
630            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
631        else:
632            self.state_lock.release()
633            raise service_error(service_error.req, "No such allocation")
634
635    # These are subroutines for StartSegment
636    def generate_ns2(self, topo, expfn, softdir, connInfo):
637        """
638        Convert topo into an ns2 file, decorated with appropriate commands for
639        the particular testbed setup.  Convert all requests for software, etc
640        to point at the staged copies on this testbed and add the federation
641        startcommands.
642        """
643        class dragon_commands:
644            """
645            Functor to spit out approrpiate dragon commands for nodes listed in
646            the connectivity description.  The constructor makes a dict mapping
647            dragon nodes to their parameters and the __call__ checks each
648            element in turn for membership.
649            """
650            def __init__(self, map):
651                self.node_info = map
652
653            def __call__(self, e):
654                s = ""
655                if isinstance(e, topdl.Computer):
656                    if self.node_info.has_key(e.name):
657                        info = self.node_info[e.name]
658                        for ifname, vlan, type in info:
659                            for i in e.interface:
660                                if i.name == ifname:
661                                    addr = i.get_attribute('ip4_address')
662                                    subs = i.substrate[0]
663                                    break
664                            else:
665                                raise service_error(service_error.internal,
666                                        "No interface %s on element %s" % \
667                                                (ifname, e.name))
668                            # XXX: do netmask right
669                            if type =='link':
670                                s = ("tb-allow-external ${%s} " + \
671                                        "dragonportal ip %s vlan %s " + \
672                                        "netmask 255.255.255.0\n") % \
673                                        (topdl.to_tcl_name(e.name), addr, vlan)
674                            elif type =='lan':
675                                s = ("tb-allow-external ${%s} " + \
676                                        "dragonportal " + \
677                                        "ip %s vlan %s usurp %s\n") % \
678                                        (topdl.to_tcl_name(e.name), addr, 
679                                                vlan, subs)
680                            else:
681                                raise service_error(service_error_internal,
682                                        "Unknown DRAGON type %s" % type)
683                return s
684
685        class not_dragon:
686            """
687            Return true if a node is in the given map of dragon nodes.
688            """
689            def __init__(self, map):
690                self.nodes = set(map.keys())
691
692            def __call__(self, e):
693                return e.name not in self.nodes
694
695        # Main line of generate_ns2
696        t = topo.clone()
697
698        # Create the map of nodes that need direct connections (dragon
699        # connections) from the connInfo
700        dragon_map = { }
701        for i in [ i for i in connInfo if i['type'] == 'transit']:
702            for a in i.get('fedAttr', []):
703                if a['attribute'] == 'vlan_id':
704                    vlan = a['value']
705                    break
706            else:
707                raise service_error(service_error.internal, "No vlan tag")
708            members = i.get('member', [])
709            if len(members) > 1: type = 'lan'
710            else: type = 'link'
711
712            try:
713                for m in members:
714                    if m['element'] in dragon_map:
715                        dragon_map[m['element']].append(( m['interface'], 
716                            vlan, type))
717                    else:
718                        dragon_map[m['element']] = [( m['interface'], 
719                            vlan, type),]
720            except KeyError:
721                raise service_error(service_error.req,
722                        "Missing connectivity info")
723
724        # Weed out the things we aren't going to instantiate: Segments, portal
725        # substrates, and portal interfaces.  (The copy in the for loop allows
726        # us to delete from e.elements in side the for loop).  While we're
727        # touching all the elements, we also adjust paths from the original
728        # testbed to local testbed paths and put the federation commands into
729        # the start commands
730        for e in [e for e in t.elements]:
731            if isinstance(e, topdl.Segment):
732                t.elements.remove(e)
733            if isinstance(e, topdl.Computer):
734                self.add_kit(e, self.federation_software)
735                if e.get_attribute('portal') and self.portal_startcommand:
736                    # Add local portal support software
737                    self.add_kit(e, self.portal_software)
738                    # Portals never have a user-specified start command
739                    e.set_attribute('startup', self.portal_startcommand)
740                elif self.node_startcommand:
741                    if e.get_attribute('startup'):
742                        e.set_attribute('startup', "%s \\$USER '%s'" % \
743                                (self.node_startcommand, 
744                                    e.get_attribute('startup')))
745                    else:
746                        e.set_attribute('startup', self.node_startcommand)
747
748                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
749                # Remove portal interfaces that do not connect to DRAGON
750                e.interface = [i for i in e.interface \
751                        if not i.get_attribute('portal') or i.name in dinf ]
752            # Fix software paths
753            for s in getattr(e, 'software', []):
754                s.location = re.sub("^.*/", softdir, s.location)
755
756        t.substrates = [ s.clone() for s in t.substrates ]
757        t.incorporate_elements()
758
759        # Customize the ns2 output for local portal commands and images
760        filters = []
761
762        if self.dragon_endpoint:
763            add_filter = not_dragon(dragon_map)
764            filters.append(dragon_commands(dragon_map))
765        else:
766            add_filter = None
767
768        if self.portal_command:
769            filters.append(topdl.generate_portal_command_filter(
770                self.portal_command, add_filter=add_filter))
771
772        if self.portal_image:
773            filters.append(topdl.generate_portal_image_filter(
774                self.portal_image))
775
776        if self.portal_type:
777            filters.append(topdl.generate_portal_hardware_filter(
778                self.portal_type))
779
780        # Convert to ns and write it out
781        expfile = topdl.topology_to_ns2(t, filters)
782        try:
783            f = open(expfn, "w")
784            print >>f, expfile
785            f.close()
786        except EnvironmentError:
787            raise service_error(service_error.internal,
788                    "Cannot write experiment file %s: %s" % (expfn,e))
789
790    def export_store_info(self, cf, proj, ename, connInfo):
791        """
792        For the export requests in the connection info, install the peer names
793        at the experiment controller via SetValue calls.
794        """
795
796        for c in connInfo:
797            for p in [ p for p in c.get('parameter', []) \
798                    if p.get('type', '') == 'output']:
799
800                if p.get('name', '') == 'peer':
801                    k = p.get('key', None)
802                    surl = p.get('store', None)
803                    if surl and k and k.index('/') != -1:
804                        value = "%s.%s.%s%s" % \
805                                (k[k.index('/')+1:], ename, proj, self.domain)
806                        req = { 'name': k, 'value': value }
807                        self.log.debug("Setting %s to %s on %s" % \
808                                (k, value, surl))
809                        self.call_SetValue(surl, req, cf)
810                    else:
811                        self.log.error("Bad export request: %s" % p)
812                elif p.get('name', '') == 'ssh_port':
813                    k = p.get('key', None)
814                    surl = p.get('store', None)
815                    if surl and k:
816                        req = { 'name': k, 'value': self.ssh_port }
817                        self.log.debug("Setting %s to %s on %s" % \
818                                (k, self.ssh_port, surl))
819                        self.call_SetValue(surl, req, cf)
820                    else:
821                        self.log.error("Bad export request: %s" % p)
822                else:
823                    self.log.error("Unknown export parameter: %s" % \
824                            p.get('name'))
825                    continue
826
827    def add_seer_node(self, topo, name, startup):
828        """
829        Add a seer node to the given topology, with the startup command passed
830        in.  Used by configure seer_services.
831        """
832        c_node = topdl.Computer(
833                name=name, 
834                os= topdl.OperatingSystem(
835                    attribute=[
836                    { 'attribute': 'osid', 
837                        'value': self.local_seer_image },
838                    ]),
839                attribute=[
840                    { 'attribute': 'startup', 'value': startup },
841                    ]
842                )
843        self.add_kit(c_node, self.local_seer_software)
844        topo.elements.append(c_node)
845
846    def configure_seer_services(self, services, topo, softdir):
847        """
848        Make changes to the topology required for the seer requests being made.
849        Specifically, add any control or master nodes required and set up the
850        start commands on the nodes to interconnect them.
851        """
852        local_seer = False      # True if we need to add a control node
853        collect_seer = False    # True if there is a seer-master node
854        seer_master= False      # True if we need to add the seer-master
855        for s in services:
856            s_name = s.get('name', '')
857            s_vis = s.get('visibility','')
858
859            if s_name  == 'local_seer_control' and s_vis == 'export':
860                local_seer = True
861            elif s_name == 'seer_master':
862                if s_vis == 'import':
863                    collect_seer = True
864                elif s_vis == 'export':
865                    seer_master = True
866       
867        # We've got the whole picture now, so add nodes if needed and configure
868        # them to interconnect properly.
869        if local_seer or seer_master:
870            # Copy local seer control node software to the tempdir
871            for l, f in self.local_seer_software:
872                base = os.path.basename(f)
873                copy_file(f, "%s/%s" % (softdir, base))
874        # If we're collecting seers somewhere the controllers need to talk to
875        # the master.  In testbeds that export the service, that will be a
876        # local node that we'll add below.  Elsewhere it will be the control
877        # portal that will port forward to the exporting master.
878        if local_seer:
879            if collect_seer:
880                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
881            else:
882                startup = self.local_seer_start
883            self.add_seer_node(topo, 'control', startup)
884        # If this is the seer master, add that node, too.
885        if seer_master:
886            self.add_seer_node(topo, 'seer-master', 
887                    "%s -R -n -R seer-master -R -A -R sink" % \
888                            self.seer_master_start)
889
890    def retrieve_software(self, topo, certfile, softdir):
891        """
892        Collect the software that nodes in the topology need loaded and stage
893        it locally.  This implies retrieving it from the experiment_controller
894        and placing it into softdir.  Certfile is used to prove that this node
895        has access to that data (it's the allocation/segment fedid).  Finally
896        local portal and federation software is also copied to the same staging
897        directory for simplicity - all software needed for experiment creation
898        is in softdir.
899        """
900        sw = set()
901        for e in topo.elements:
902            for s in getattr(e, 'software', []):
903                sw.add(s.location)
904        for s in sw:
905            self.log.debug("Retrieving %s" % s)
906            try:
907                get_url(s, certfile, softdir)
908            except:
909                t, v, st = sys.exc_info()
910                raise service_error(service_error.internal,
911                        "Error retrieving %s: %s" % (s, v))
912
913        # Copy local federation and portal node software to the tempdir
914        for s in (self.federation_software, self.portal_software):
915            for l, f in s:
916                base = os.path.basename(f)
917                copy_file(f, "%s/%s" % (softdir, base))
918
919
920    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
921        """
922        Gather common configuration files, retrieve or create an experiment
923        name and project name, and return the ssh_key filenames.  Create an
924        allocation log bound to the state log variable as well.
925        """
926        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
927        ename = None
928        pubkey_base = None
929        secretkey_base = None
930        proj = None
931        user = None
932        alloc_log = None
933        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
934
935        for a in attrs:
936            if a['attribute'] in configs:
937                try:
938                    self.log.debug("Retrieving %s from %s" % \
939                            (a['attribute'], a['value']))
940                    get_url(a['value'], certfile, tmpdir)
941                except:
942                    t, v, st = sys.exc_info()
943                    raise service_error(service_error.internal,
944                            "Error retrieving %s: %s" % (a.get('value', ""), v))
945            if a['attribute'] == 'ssh_pubkey':
946                pubkey_base = a['value'].rpartition('/')[2]
947            if a['attribute'] == 'ssh_secretkey':
948                secretkey_base = a['value'].rpartition('/')[2]
949            if a['attribute'] == 'experiment_name':
950                ename = a['value']
951
952        if ename:
953            # Clean up the experiment name so that emulab will accept it.
954            ename = re.sub(vchars_re, '-', ename)
955
956        else:
957            ename = ""
958            for i in range(0,5):
959                ename += random.choice(string.ascii_letters)
960            self.log.warn("No experiment name: picked one randomly: %s" \
961                    % ename)
962
963        if not pubkey_base:
964            raise service_error(service_error.req, 
965                    "No public key attribute")
966
967        if not secretkey_base:
968            raise service_error(service_error.req, 
969                    "No secret key attribute")
970
971        self.state_lock.acquire()
972        if aid in self.allocation:
973            proj = self.allocation[aid].get('project', None)
974            if not proj: 
975                proj = self.allocation[aid].get('sproject', None)
976            user = self.allocation[aid].get('user', None)
977            self.allocation[aid]['experiment'] = ename
978            self.allocation[aid]['log'] = [ ]
979            # Create a logger that logs to the experiment's state object as
980            # well as to the main log file.
981            alloc_log = logging.getLogger('fedd.access.%s' % ename)
982            h = logging.StreamHandler(
983                    list_log.list_log(self.allocation[aid]['log']))
984            # XXX: there should be a global one of these rather than
985            # repeating the code.
986            h.setFormatter(logging.Formatter(
987                "%(asctime)s %(name)s %(message)s",
988                        '%d %b %y %H:%M:%S'))
989            alloc_log.addHandler(h)
990            self.write_state()
991        self.state_lock.release()
992
993        if not proj:
994            raise service_error(service_error.internal, 
995                    "Can't find project for %s" %aid)
996
997        if not user:
998            raise service_error(service_error.internal, 
999                    "Can't find creation user for %s" %aid)
1000
1001        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1002
1003    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1004        """
1005        Store key bits of experiment state in the global repository, including
1006        the response that may need to be replayed, and return the response.
1007        """
1008        # Copy the assigned names into the return topology
1009        embedding = [ ]
1010        for n in starter.node:
1011            embedding.append({ 
1012                'toponame': n,
1013                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1014                })
1015        # Grab the log (this is some anal locking, but better safe than
1016        # sorry)
1017        self.state_lock.acquire()
1018        logv = "".join(self.allocation[aid]['log'])
1019        # It's possible that the StartSegment call gets retried (!).
1020        # if the 'started' key is in the allocation, we'll return it rather
1021        # than redo the setup.
1022        self.allocation[aid]['started'] = { 
1023                'allocID': alloc_id,
1024                'allocationLog': logv,
1025                'segmentdescription': { 
1026                    'topdldescription': topo.clone().to_dict()
1027                    },
1028                'embedding': embedding,
1029                'proof': proof.to_dict(),
1030                }
1031        retval = copy.copy(self.allocation[aid]['started'])
1032        self.write_state()
1033        self.state_lock.release()
1034        return retval
1035   
1036    # End of StartSegment support routines
1037
1038    def StartSegment(self, req, fid):
1039        err = None  # Any service_error generated after tmpdir is created
1040        rv = None   # Return value from segment creation
1041
1042        try:
1043            req = req['StartSegmentRequestBody']
1044            auth_attr = req['allocID']['fedid']
1045            topref = req['segmentdescription']['topdldescription']
1046        except KeyError:
1047            raise service_error(server_error.req, "Badly formed request")
1048
1049        connInfo = req.get('connection', [])
1050        services = req.get('service', [])
1051        aid = "%s" % auth_attr
1052        attrs = req.get('fedAttr', [])
1053
1054        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1055                with_proof=True)
1056        if not access_ok:
1057            raise service_error(service_error.access, "Access denied")
1058        else:
1059            # See if this is a replay of an earlier succeeded StartSegment -
1060            # sometimes SSL kills 'em.  If so, replay the response rather than
1061            # redoing the allocation.
1062            self.state_lock.acquire()
1063            retval = self.allocation[aid].get('started', None)
1064            self.state_lock.release()
1065            if retval:
1066                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1067                        "replaying response")
1068                return retval
1069
1070        # A new request.  Do it.
1071
1072        if topref: topo = topdl.Topology(**topref)
1073        else:
1074            raise service_error(service_error.req, 
1075                    "Request missing segmentdescription'")
1076       
1077        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1078        try:
1079            tmpdir = tempfile.mkdtemp(prefix="access-")
1080            softdir = "%s/software" % tmpdir
1081            os.mkdir(softdir)
1082        except EnvironmentError:
1083            raise service_error(service_error.internal, "Cannot create tmp dir")
1084
1085        # Try block alllows us to clean up temporary files.
1086        try:
1087            self.retrieve_software(topo, certfile, softdir)
1088            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1089                    self.initialize_experiment_info(attrs, aid, 
1090                            certfile, tmpdir)
1091
1092            # Set up userconf and seer if needed
1093            self.configure_userconf(services, tmpdir)
1094            self.configure_seer_services(services, topo, softdir)
1095            # Get and send synch store variables
1096            self.export_store_info(certfile, proj, ename, connInfo)
1097            self.import_store_info(certfile, connInfo)
1098
1099            expfile = "%s/experiment.tcl" % tmpdir
1100
1101            self.generate_portal_configs(topo, pubkey_base, 
1102                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1103            self.generate_ns2(topo, expfile, 
1104                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1105
1106            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1107                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1108                    cert=self.xmlrpc_cert)
1109            rv = starter(self, ename, proj, user, expfile, tmpdir)
1110        except service_error, e:
1111            err = e
1112        except:
1113            t, v, st = sys.exc_info()
1114            err = service_error(service_error.internal, "%s: %s" % \
1115                    (v, traceback.extract_tb(st)))
1116
1117        # Walk up tmpdir, deleting as we go
1118        if self.cleanup: self.remove_dirs(tmpdir)
1119        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1120
1121        if rv:
1122            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1123                    proof)
1124        elif err:
1125            raise service_error(service_error.federant,
1126                    "Swapin failed: %s" % err)
1127        else:
1128            raise service_error(service_error.federant, "Swapin failed")
1129
1130    def TerminateSegment(self, req, fid):
1131        try:
1132            req = req['TerminateSegmentRequestBody']
1133        except KeyError:
1134            raise service_error(server_error.req, "Badly formed request")
1135
1136        auth_attr = req['allocID']['fedid']
1137        aid = "%s" % auth_attr
1138        attrs = req.get('fedAttr', [])
1139
1140        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1141                with_proof=True)
1142        if not access_ok:
1143            raise service_error(service_error.access, "Access denied")
1144
1145        self.state_lock.acquire()
1146        if aid in self.allocation:
1147            proj = self.allocation[aid].get('project', None)
1148            if not proj: 
1149                proj = self.allocation[aid].get('sproject', None)
1150            user = self.allocation[aid].get('user', None)
1151            ename = self.allocation[aid].get('experiment', None)
1152        else:
1153            proj = None
1154            user = None
1155            ename = None
1156        self.state_lock.release()
1157
1158        if not proj:
1159            raise service_error(service_error.internal, 
1160                    "Can't find project for %s" % aid)
1161
1162        if not user:
1163            raise service_error(service_error.internal, 
1164                    "Can't find creation user for %s" % aid)
1165        if not ename:
1166            raise service_error(service_error.internal, 
1167                    "Can't find experiment name for %s" % aid)
1168        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1169                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1170        stopper(self, user, proj, ename)
1171        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.