source: fedd/federation/emulab_access.py @ 913dc7a

axis_examplecompt_changesinfo-ops
Last change on this file since 913dc7a was 262328f, checked in by Ted Faber <faber@…>, 14 years ago

Clean up user proposed experiment names that DETER/Emulab don't like.

Fixes #8

  • Property mode set to 100644
File size: 38.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        # XXX: this should go?
107        #if config.has_option("access", "accessdb"):
108        #    self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'legacy'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'legacy':
120            self.access = { }
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            self.access = [ ]
126            if accessdb:
127                self.read_access(accessdb, self.access_tuple)
128        else:
129            raise service_error(service_error.internal, 
130                    "Unknown auth_type: %s" % self.auth_type)
131
132        # read_state in the base_class
133        self.state_lock.acquire()
134        for a  in ('allocation', 'projects', 'keys', 'types'):
135            if a not in self.state:
136                self.state[a] = { }
137        self.allocation = self.state['allocation']
138        self.projects = self.state['projects']
139        self.keys = self.state['keys']
140        self.types = self.state['types']
141        if self.auth_type == "legacy": 
142            # Add the ownership attributes to the authorizer.  Note that the
143            # indices of the allocation dict are strings, but the attributes are
144            # fedids, so there is a conversion.
145            for k in self.allocation.keys():
146                for o in self.allocation[k].get('owners', []):
147                    self.auth.set_attribute(o, fedid(hexstr=k))
148                if self.allocation[k].has_key('userconfig'):
149                    sfid = self.allocation[k]['userconfig']
150                    fid = fedid(hexstr=sfid)
151                    self.auth.set_attribute(fid, "/%s" % sfid)
152        self.state_lock.release()
153        self.exports = {
154                'SMB': self.export_SMB,
155                'seer': self.export_seer,
156                'tmcd': self.export_tmcd,
157                'userconfig': self.export_userconfig,
158                'project_export': self.export_project_export,
159                'local_seer_control': self.export_local_seer,
160                'seer_master': self.export_seer_master,
161                'hide_hosts': self.export_hide_hosts,
162                }
163
164        if not self.local_seer_image or not self.local_seer_software or \
165                not self.local_seer_start:
166            if 'local_seer_control' in self.exports:
167                del self.exports['local_seer_control']
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.seer_master_start:
171            if 'seer_master' in self.exports:
172                del self.exports['seer_master']
173
174
175        self.soap_services = {\
176            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
177            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
178            'StartSegment': soap_handler("StartSegment", self.StartSegment),
179            'TerminateSegment': soap_handler("TerminateSegment",
180                self.TerminateSegment),
181            }
182        self.xmlrpc_services =  {\
183            'RequestAccess': xmlrpc_handler('RequestAccess',
184                self.RequestAccess),
185            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
186                self.ReleaseAccess),
187            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
188            'TerminateSegment': xmlrpc_handler('TerminateSegment',
189                self.TerminateSegment),
190            }
191
192        self.call_SetValue = service_caller('SetValue')
193        self.call_GetValue = service_caller('GetValue', log=self.log)
194
195        if not config.has_option("allocate", "uri"):
196            self.allocate_project = \
197                allocate_project_local(config, auth)
198        else:
199            self.allocate_project = \
200                allocate_project_remote(config, auth)
201
202
203        # If the project allocator exports services, put them in this object's
204        # maps so that classes that instantiate this can call the services.
205        self.soap_services.update(self.allocate_project.soap_services)
206        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
207
208    @staticmethod
209    def legacy_access_tuple(str):
210        """
211        Convert a string of the form (id[:resources:resouces], id, id) into a
212        tuple of the form (project, user, user) where users may be names or
213        fedids.  The resources strings are obsolete and ignored.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = first[0]
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def access_tuple(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            # The slice takes the parens off the string.
250            proj, user = str[1:-1].split(',')
251            return (proj.strip(), user.strip(), user.strip())
252        else:
253            raise self.parse_error(
254                    'Bad mapping (unbalanced parens or more than 1 comma)')
255
256
257    # RequestAccess support routines
258
259    def legacy_lookup_access(self, req, fid):
260        """
261        Look up the local access control information mapped to this fedid and
262        credentials.  In this case it is a (project, create_user, access_user)
263        triple, and a triple of three booleans indicating which, if any will
264        need to be dynamically created.  Finally a list of owners for that
265        allocation is returned.
266
267        lookup_access_base pulls the first triple out, and it is parsed by this
268        routine into the boolean map.  Owners is always the controlling fedid.
269        """
270        # Return values
271        rp = None
272        ru = None
273        # This maps a valid user to the Emulab projects and users to use
274        found, match = self.legacy_lookup_access_base(req, fid)
275        tb, project, user = match
276       
277        if found == None:
278            raise service_error(service_error.access,
279                    "Access denied - cannot map access")
280
281        # resolve <dynamic> and <same> in found
282        dyn_proj = False
283        dyn_create_user = False
284        dyn_service_user = False
285
286        if found[0] == "<same>":
287            if project != None:
288                rp = project
289            else : 
290                raise service_error(\
291                        service_error.server_config,
292                        "Project matched <same> when no project given")
293        elif found[0] == "<dynamic>":
294            rp = None
295            dyn_proj = True
296        else:
297            rp = found[0]
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def do_project_allocation(self, dyn, project, user):
331        """
332        Call the project allocation routines and return the info.
333        """
334        if dyn: 
335            # Compose the dynamic project request
336            # (only dynamic, dynamic currently allowed)
337            preq = { 'AllocateProjectRequestBody': \
338                        { 'project' : {\
339                            'user': [ \
340                            { \
341                                'access': [ { 
342                                    'sshPubkey': self.ssh_pubkey_file } ],
343                                 'role': "serviceAccess",\
344                            }, \
345                            { \
346                                'access': [ { 
347                                    'sshPubkey': self.ssh_pubkey_file } ],
348                                 'role': "experimentCreation",\
349                            }, \
350                            ], \
351                            }\
352                        }\
353                    }
354            return self.allocate_project.dynamic_project(preq)
355        else:
356            preq = {'StaticProjectRequestBody' : \
357                    { 'project': \
358                        { 'name' : { 'localname' : project },\
359                          'user' : [ \
360                            {\
361                                'userID': { 'localname' : user }, \
362                                'access': [ { 
363                                    'sshPubkey': self.ssh_pubkey_file } ],
364                                'role': 'experimentCreation'\
365                            },\
366                            {\
367                                'userID': { 'localname' : user}, \
368                                'access': [ { 
369                                    'sshPubkey': self.ssh_pubkey_file } ],
370                                'role': 'serviceAccess'\
371                            },\
372                        ]}\
373                    }\
374            }
375            return self.allocate_project.static_project(preq)
376
377    def save_project_state(self, aid, ap, dyn, owners):
378        """
379        Parse out and save the information relevant to the project created for
380        this experiment.  That info is largely in ap and owners.  dyn indicates
381        that the project was created dynamically.  Return the user and project
382        names.
383        """
384        self.state_lock.acquire()
385        self.allocation[aid] = { }
386        try:
387            pname = ap['project']['name']['localname']
388        except KeyError:
389            pname = None
390
391        if dyn:
392            if not pname:
393                self.state_lock.release()
394                raise service_error(service_error.internal,
395                        "Misformed allocation response?")
396            if pname in self.projects: self.projects[pname] += 1
397            else: self.projects[pname] = 1
398            self.allocation[aid]['project'] = pname
399        else:
400            # sproject is a static project associated with this allocation.
401            self.allocation[aid]['sproject'] = pname
402
403        self.allocation[aid]['keys'] = [ ]
404
405        try:
406            for u in ap['project']['user']:
407                uname = u['userID']['localname']
408                if u['role'] == 'experimentCreation':
409                    self.allocation[aid]['user'] = uname
410                for k in [ k['sshPubkey'] for k in u['access'] \
411                        if k.has_key('sshPubkey') ]:
412                    kv = "%s:%s" % (uname, k)
413                    if self.keys.has_key(kv): self.keys[kv] += 1
414                    else: self.keys[kv] = 1
415                    self.allocation[aid]['keys'].append((uname, k))
416        except KeyError:
417            self.state_lock.release()
418            raise service_error(service_error.internal,
419                    "Misformed allocation response?")
420
421        self.allocation[aid]['owners'] = owners
422        self.write_state()
423        self.state_lock.release()
424        return (pname, uname)
425
426    # End of RequestAccess support routines
427
428    def RequestAccess(self, req, fid):
429        """
430        Handle the access request.  Proxy if not for us.
431
432        Parse out the fields and make the allocations or rejections if for us,
433        otherwise, assuming we're willing to proxy, proxy the request out.
434        """
435
436        def gateway_hardware(h):
437            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
438            else: return h
439
440        def get_export_project(svcs):
441            """
442            if the service requests includes one to export a project, return
443            that project.
444            """
445            rv = None
446            for s in svcs:
447                if s.get('name', '') == 'project_export' and \
448                        s.get('visibility', '') == 'export':
449                    if not rv: 
450                        for a in s.get('fedAttr', []):
451                            if a.get('attribute', '') == 'project' \
452                                    and 'value' in a:
453                                rv = a['value']
454                    else:
455                        raise service_error(service_error, access, 
456                                'Requesting multiple project exports is ' + \
457                                        'not supported');
458            return rv
459
460        # The dance to get into the request body
461        if req.has_key('RequestAccessRequestBody'):
462            req = req['RequestAccessRequestBody']
463        else:
464            raise service_error(service_error.req, "No request!?")
465
466        # if this includes a project export request, construct a filter such
467        # that only the ABAC attributes mapped to that project are checked for
468        # access.
469        if 'service' in req:
470            ep = get_export_project(req['service'])
471            if ep: pf = lambda(a): a.value[0] == ep
472            else: pf = None
473        else:
474            ep = None
475            pf = None
476
477        if self.auth.import_credentials(
478                data_list=req.get('abac_credential', [])):
479            self.auth.save()
480
481        if self.auth_type == "legacy":
482            found, dyn, owners = self.legacy_lookup_access(req, fid)
483        elif self.auth_type == 'abac':
484            found, dyn, owners = self.lookup_access(req, fid, filter=pf)
485        else:
486            raise service_error(service_error.internal, 
487                    'Unknown auth_type: %s' % self.auth_type)
488        ap = None
489
490        # This only happens in legacy lookups, but if this user has access to
491        # the testbed but not the project to be exported, raise the error.
492        if ep and ep != found[0]:
493            raise service_error(service_error.access,
494                    "Cannot export %s" % ep)
495
496        if self.ssh_pubkey_file:
497            ap = self.do_project_allocation(dyn[1], found[0], found[1])
498        else:
499            raise service_error(service_error.internal, 
500                    "SSH access parameters required")
501        # keep track of what's been added
502        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
503        aid = unicode(allocID)
504
505        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
506
507        services, svc_state = self.export_services(req.get('service',[]),
508                pname, uname)
509        self.state_lock.acquire()
510        # Store services state in global state
511        for k, v in svc_state.items():
512            self.allocation[aid][k] = v
513        self.write_state()
514        self.state_lock.release()
515        # Give the owners the right to change this allocation
516        for o in owners:
517            self.auth.set_attribute(o, allocID)
518        self.auth.save()
519        try:
520            f = open("%s/%s.pem" % (self.certdir, aid), "w")
521            print >>f, alloc_cert
522            f.close()
523        except EnvironmentError, e:
524            raise service_error(service_error.internal, 
525                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
526        resp = self.build_access_response({ 'fedid': allocID } ,
527                ap, services)
528        return resp
529
530    def do_release_project(self, del_project, del_users, del_types):
531        """
532        If a project and users has to be deleted, make the call.
533        """
534        msg = { 'project': { }}
535        if del_project:
536            msg['project']['name']= {'localname': del_project}
537        users = [ ]
538        for u in del_users.keys():
539            users.append({ 'userID': { 'localname': u },\
540                'access' :  \
541                        [ {'sshPubkey' : s } for s in del_users[u]]\
542            })
543        if users: 
544            msg['project']['user'] = users
545        if len(del_types) > 0:
546            msg['resources'] = { 'node': \
547                    [ {'hardware': [ h ] } for h in del_types ]\
548                }
549        if self.allocate_project.release_project:
550            msg = { 'ReleaseProjectRequestBody' : msg}
551            self.allocate_project.release_project(msg)
552
553    def ReleaseAccess(self, req, fid):
554        # The dance to get into the request body
555        if req.has_key('ReleaseAccessRequestBody'):
556            req = req['ReleaseAccessRequestBody']
557        else:
558            raise service_error(service_error.req, "No request!?")
559
560        try:
561            if req['allocID'].has_key('localname'):
562                auth_attr = aid = req['allocID']['localname']
563            elif req['allocID'].has_key('fedid'):
564                aid = unicode(req['allocID']['fedid'])
565                auth_attr = req['allocID']['fedid']
566            else:
567                raise service_error(service_error.req,
568                        "Only localnames and fedids are understood")
569        except KeyError:
570            raise service_error(service_error.req, "Badly formed request")
571
572        self.log.debug("[access] deallocation requested for %s by %s" % \
573                (aid, fid))
574        if not self.auth.check_attribute(fid, auth_attr):
575            self.log.debug("[access] deallocation denied for %s", aid)
576            raise service_error(service_error.access, "Access Denied")
577
578        # If we know this allocation, reduce the reference counts and
579        # remove the local allocations.  Otherwise report an error.  If
580        # there is an allocation to delete, del_users will be a dictonary
581        # of sets where the key is the user that owns the keys in the set.
582        # We use a set to avoid duplicates.  del_project is just the name
583        # of any dynamic project to delete.  We're somewhat lazy about
584        # deleting authorization attributes.  Having access to something
585        # that doesn't exist isn't harmful.
586        del_users = { }
587        del_project = None
588        del_types = set()
589
590        self.state_lock.acquire()
591        if aid in self.allocation:
592            self.log.debug("Found allocation for %s" %aid)
593            for k in self.allocation[aid]['keys']:
594                kk = "%s:%s" % k
595                self.keys[kk] -= 1
596                if self.keys[kk] == 0:
597                    if not del_users.has_key(k[0]):
598                        del_users[k[0]] = set()
599                    del_users[k[0]].add(k[1])
600                    del self.keys[kk]
601
602            if 'project' in self.allocation[aid]:
603                pname = self.allocation[aid]['project']
604                self.projects[pname] -= 1
605                if self.projects[pname] == 0:
606                    del_project = pname
607                    del self.projects[pname]
608
609            if 'types' in self.allocation[aid]:
610                for t in self.allocation[aid]['types']:
611                    self.types[t] -= 1
612                    if self.types[t] == 0:
613                        if not del_project: del_project = t[0]
614                        del_types.add(t[1])
615                        del self.types[t]
616
617            del self.allocation[aid]
618            self.write_state()
619            self.state_lock.release()
620            # If we actually have resources to deallocate, prepare the call.
621            if del_project or del_users:
622                self.do_release_project(del_project, del_users, del_types)
623            # And remove the access cert
624            cf = "%s/%s.pem" % (self.certdir, aid)
625            self.log.debug("Removing %s" % cf)
626            os.remove(cf)
627            return { 'allocID': req['allocID'] } 
628        else:
629            self.state_lock.release()
630            raise service_error(service_error.req, "No such allocation")
631
632    # These are subroutines for StartSegment
633    def generate_ns2(self, topo, expfn, softdir, connInfo):
634        """
635        Convert topo into an ns2 file, decorated with appropriate commands for
636        the particular testbed setup.  Convert all requests for software, etc
637        to point at the staged copies on this testbed and add the federation
638        startcommands.
639        """
640        class dragon_commands:
641            """
642            Functor to spit out approrpiate dragon commands for nodes listed in
643            the connectivity description.  The constructor makes a dict mapping
644            dragon nodes to their parameters and the __call__ checks each
645            element in turn for membership.
646            """
647            def __init__(self, map):
648                self.node_info = map
649
650            def __call__(self, e):
651                s = ""
652                if isinstance(e, topdl.Computer):
653                    if self.node_info.has_key(e.name):
654                        info = self.node_info[e.name]
655                        for ifname, vlan, type in info:
656                            for i in e.interface:
657                                if i.name == ifname:
658                                    addr = i.get_attribute('ip4_address')
659                                    subs = i.substrate[0]
660                                    break
661                            else:
662                                raise service_error(service_error.internal,
663                                        "No interface %s on element %s" % \
664                                                (ifname, e.name))
665                            # XXX: do netmask right
666                            if type =='link':
667                                s = ("tb-allow-external ${%s} " + \
668                                        "dragonportal ip %s vlan %s " + \
669                                        "netmask 255.255.255.0\n") % \
670                                        (topdl.to_tcl_name(e.name), addr, vlan)
671                            elif type =='lan':
672                                s = ("tb-allow-external ${%s} " + \
673                                        "dragonportal " + \
674                                        "ip %s vlan %s usurp %s\n") % \
675                                        (topdl.to_tcl_name(e.name), addr, 
676                                                vlan, subs)
677                            else:
678                                raise service_error(service_error_internal,
679                                        "Unknown DRAGON type %s" % type)
680                return s
681
682        class not_dragon:
683            """
684            Return true if a node is in the given map of dragon nodes.
685            """
686            def __init__(self, map):
687                self.nodes = set(map.keys())
688
689            def __call__(self, e):
690                return e.name not in self.nodes
691
692        # Main line of generate_ns2
693        t = topo.clone()
694
695        # Create the map of nodes that need direct connections (dragon
696        # connections) from the connInfo
697        dragon_map = { }
698        for i in [ i for i in connInfo if i['type'] == 'transit']:
699            for a in i.get('fedAttr', []):
700                if a['attribute'] == 'vlan_id':
701                    vlan = a['value']
702                    break
703            else:
704                raise service_error(service_error.internal, "No vlan tag")
705            members = i.get('member', [])
706            if len(members) > 1: type = 'lan'
707            else: type = 'link'
708
709            try:
710                for m in members:
711                    if m['element'] in dragon_map:
712                        dragon_map[m['element']].append(( m['interface'], 
713                            vlan, type))
714                    else:
715                        dragon_map[m['element']] = [( m['interface'], 
716                            vlan, type),]
717            except KeyError:
718                raise service_error(service_error.req,
719                        "Missing connectivity info")
720
721        # Weed out the things we aren't going to instantiate: Segments, portal
722        # substrates, and portal interfaces.  (The copy in the for loop allows
723        # us to delete from e.elements in side the for loop).  While we're
724        # touching all the elements, we also adjust paths from the original
725        # testbed to local testbed paths and put the federation commands into
726        # the start commands
727        for e in [e for e in t.elements]:
728            if isinstance(e, topdl.Segment):
729                t.elements.remove(e)
730            if isinstance(e, topdl.Computer):
731                self.add_kit(e, self.federation_software)
732                if e.get_attribute('portal') and self.portal_startcommand:
733                    # Add local portal support software
734                    self.add_kit(e, self.portal_software)
735                    # Portals never have a user-specified start command
736                    e.set_attribute('startup', self.portal_startcommand)
737                elif self.node_startcommand:
738                    if e.get_attribute('startup'):
739                        e.set_attribute('startup', "%s \\$USER '%s'" % \
740                                (self.node_startcommand, 
741                                    e.get_attribute('startup')))
742                    else:
743                        e.set_attribute('startup', self.node_startcommand)
744
745                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
746                # Remove portal interfaces that do not connect to DRAGON
747                e.interface = [i for i in e.interface \
748                        if not i.get_attribute('portal') or i.name in dinf ]
749            # Fix software paths
750            for s in getattr(e, 'software', []):
751                s.location = re.sub("^.*/", softdir, s.location)
752
753        t.substrates = [ s.clone() for s in t.substrates ]
754        t.incorporate_elements()
755
756        # Customize the ns2 output for local portal commands and images
757        filters = []
758
759        if self.dragon_endpoint:
760            add_filter = not_dragon(dragon_map)
761            filters.append(dragon_commands(dragon_map))
762        else:
763            add_filter = None
764
765        if self.portal_command:
766            filters.append(topdl.generate_portal_command_filter(
767                self.portal_command, add_filter=add_filter))
768
769        if self.portal_image:
770            filters.append(topdl.generate_portal_image_filter(
771                self.portal_image))
772
773        if self.portal_type:
774            filters.append(topdl.generate_portal_hardware_filter(
775                self.portal_type))
776
777        # Convert to ns and write it out
778        expfile = topdl.topology_to_ns2(t, filters)
779        try:
780            f = open(expfn, "w")
781            print >>f, expfile
782            f.close()
783        except EnvironmentError:
784            raise service_error(service_error.internal,
785                    "Cannot write experiment file %s: %s" % (expfn,e))
786
787    def export_store_info(self, cf, proj, ename, connInfo):
788        """
789        For the export requests in the connection info, install the peer names
790        at the experiment controller via SetValue calls.
791        """
792
793        for c in connInfo:
794            for p in [ p for p in c.get('parameter', []) \
795                    if p.get('type', '') == 'output']:
796
797                if p.get('name', '') == 'peer':
798                    k = p.get('key', None)
799                    surl = p.get('store', None)
800                    if surl and k and k.index('/') != -1:
801                        value = "%s.%s.%s%s" % \
802                                (k[k.index('/')+1:], ename, proj, self.domain)
803                        req = { 'name': k, 'value': value }
804                        self.log.debug("Setting %s to %s on %s" % \
805                                (k, value, surl))
806                        self.call_SetValue(surl, req, cf)
807                    else:
808                        self.log.error("Bad export request: %s" % p)
809                elif p.get('name', '') == 'ssh_port':
810                    k = p.get('key', None)
811                    surl = p.get('store', None)
812                    if surl and k:
813                        req = { 'name': k, 'value': self.ssh_port }
814                        self.log.debug("Setting %s to %s on %s" % \
815                                (k, self.ssh_port, surl))
816                        self.call_SetValue(surl, req, cf)
817                    else:
818                        self.log.error("Bad export request: %s" % p)
819                else:
820                    self.log.error("Unknown export parameter: %s" % \
821                            p.get('name'))
822                    continue
823
824    def add_seer_node(self, topo, name, startup):
825        """
826        Add a seer node to the given topology, with the startup command passed
827        in.  Used by configure seer_services.
828        """
829        c_node = topdl.Computer(
830                name=name, 
831                os= topdl.OperatingSystem(
832                    attribute=[
833                    { 'attribute': 'osid', 
834                        'value': self.local_seer_image },
835                    ]),
836                attribute=[
837                    { 'attribute': 'startup', 'value': startup },
838                    ]
839                )
840        self.add_kit(c_node, self.local_seer_software)
841        topo.elements.append(c_node)
842
843    def configure_seer_services(self, services, topo, softdir):
844        """
845        Make changes to the topology required for the seer requests being made.
846        Specifically, add any control or master nodes required and set up the
847        start commands on the nodes to interconnect them.
848        """
849        local_seer = False      # True if we need to add a control node
850        collect_seer = False    # True if there is a seer-master node
851        seer_master= False      # True if we need to add the seer-master
852        for s in services:
853            s_name = s.get('name', '')
854            s_vis = s.get('visibility','')
855
856            if s_name  == 'local_seer_control' and s_vis == 'export':
857                local_seer = True
858            elif s_name == 'seer_master':
859                if s_vis == 'import':
860                    collect_seer = True
861                elif s_vis == 'export':
862                    seer_master = True
863       
864        # We've got the whole picture now, so add nodes if needed and configure
865        # them to interconnect properly.
866        if local_seer or seer_master:
867            # Copy local seer control node software to the tempdir
868            for l, f in self.local_seer_software:
869                base = os.path.basename(f)
870                copy_file(f, "%s/%s" % (softdir, base))
871        # If we're collecting seers somewhere the controllers need to talk to
872        # the master.  In testbeds that export the service, that will be a
873        # local node that we'll add below.  Elsewhere it will be the control
874        # portal that will port forward to the exporting master.
875        if local_seer:
876            if collect_seer:
877                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
878            else:
879                startup = self.local_seer_start
880            self.add_seer_node(topo, 'control', startup)
881        # If this is the seer master, add that node, too.
882        if seer_master:
883            self.add_seer_node(topo, 'seer-master', 
884                    "%s -R -n -R seer-master -R -A -R sink" % \
885                            self.seer_master_start)
886
887    def retrieve_software(self, topo, certfile, softdir):
888        """
889        Collect the software that nodes in the topology need loaded and stage
890        it locally.  This implies retrieving it from the experiment_controller
891        and placing it into softdir.  Certfile is used to prove that this node
892        has access to that data (it's the allocation/segment fedid).  Finally
893        local portal and federation software is also copied to the same staging
894        directory for simplicity - all software needed for experiment creation
895        is in softdir.
896        """
897        sw = set()
898        for e in topo.elements:
899            for s in getattr(e, 'software', []):
900                sw.add(s.location)
901        for s in sw:
902            self.log.debug("Retrieving %s" % s)
903            try:
904                get_url(s, certfile, softdir)
905            except:
906                t, v, st = sys.exc_info()
907                raise service_error(service_error.internal,
908                        "Error retrieving %s: %s" % (s, v))
909
910        # Copy local federation and portal node software to the tempdir
911        for s in (self.federation_software, self.portal_software):
912            for l, f in s:
913                base = os.path.basename(f)
914                copy_file(f, "%s/%s" % (softdir, base))
915
916
917    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
918        """
919        Gather common configuration files, retrieve or create an experiment
920        name and project name, and return the ssh_key filenames.  Create an
921        allocation log bound to the state log variable as well.
922        """
923        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
924        ename = None
925        pubkey_base = None
926        secretkey_base = None
927        proj = None
928        user = None
929        alloc_log = None
930        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
931
932        for a in attrs:
933            if a['attribute'] in configs:
934                try:
935                    self.log.debug("Retrieving %s from %s" % \
936                            (a['attribute'], a['value']))
937                    get_url(a['value'], certfile, tmpdir)
938                except:
939                    t, v, st = sys.exc_info()
940                    raise service_error(service_error.internal,
941                            "Error retrieving %s: %s" % (a.get('value', ""), v))
942            if a['attribute'] == 'ssh_pubkey':
943                pubkey_base = a['value'].rpartition('/')[2]
944            if a['attribute'] == 'ssh_secretkey':
945                secretkey_base = a['value'].rpartition('/')[2]
946            if a['attribute'] == 'experiment_name':
947                ename = a['value']
948
949        if ename:
950            # Clean up the experiment name so that emulab will accept it.
951            ename = re.sub(vchars_re, '-', ename)
952
953        else:
954            ename = ""
955            for i in range(0,5):
956                ename += random.choice(string.ascii_letters)
957            self.log.warn("No experiment name: picked one randomly: %s" \
958                    % ename)
959
960        if not pubkey_base:
961            raise service_error(service_error.req, 
962                    "No public key attribute")
963
964        if not secretkey_base:
965            raise service_error(service_error.req, 
966                    "No secret key attribute")
967
968        self.state_lock.acquire()
969        if aid in self.allocation:
970            proj = self.allocation[aid].get('project', None)
971            if not proj: 
972                proj = self.allocation[aid].get('sproject', None)
973            user = self.allocation[aid].get('user', None)
974            self.allocation[aid]['experiment'] = ename
975            self.allocation[aid]['log'] = [ ]
976            # Create a logger that logs to the experiment's state object as
977            # well as to the main log file.
978            alloc_log = logging.getLogger('fedd.access.%s' % ename)
979            h = logging.StreamHandler(
980                    list_log.list_log(self.allocation[aid]['log']))
981            # XXX: there should be a global one of these rather than
982            # repeating the code.
983            h.setFormatter(logging.Formatter(
984                "%(asctime)s %(name)s %(message)s",
985                        '%d %b %y %H:%M:%S'))
986            alloc_log.addHandler(h)
987            self.write_state()
988        self.state_lock.release()
989
990        if not proj:
991            raise service_error(service_error.internal, 
992                    "Can't find project for %s" %aid)
993
994        if not user:
995            raise service_error(service_error.internal, 
996                    "Can't find creation user for %s" %aid)
997
998        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
999
1000    def finalize_experiment(self, starter, topo, aid, alloc_id):
1001        """
1002        Store key bits of experiment state in the global repository, including
1003        the response that may need to be replayed, and return the response.
1004        """
1005        # Copy the assigned names into the return topology
1006        embedding = [ ]
1007        for n in starter.node:
1008            embedding.append({ 
1009                'toponame': n,
1010                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1011                })
1012        # Grab the log (this is some anal locking, but better safe than
1013        # sorry)
1014        self.state_lock.acquire()
1015        logv = "".join(self.allocation[aid]['log'])
1016        # It's possible that the StartSegment call gets retried (!).
1017        # if the 'started' key is in the allocation, we'll return it rather
1018        # than redo the setup.
1019        self.allocation[aid]['started'] = { 
1020                'allocID': alloc_id,
1021                'allocationLog': logv,
1022                'segmentdescription': { 
1023                    'topdldescription': topo.clone().to_dict()
1024                    },
1025                'embedding': embedding
1026                }
1027        retval = copy.copy(self.allocation[aid]['started'])
1028        self.write_state()
1029        self.state_lock.release()
1030        return retval
1031   
1032    # End of StartSegment support routines
1033
1034    def StartSegment(self, req, fid):
1035        err = None  # Any service_error generated after tmpdir is created
1036        rv = None   # Return value from segment creation
1037
1038        try:
1039            req = req['StartSegmentRequestBody']
1040            auth_attr = req['allocID']['fedid']
1041            topref = req['segmentdescription']['topdldescription']
1042        except KeyError:
1043            raise service_error(server_error.req, "Badly formed request")
1044
1045        connInfo = req.get('connection', [])
1046        services = req.get('service', [])
1047        aid = "%s" % auth_attr
1048        attrs = req.get('fedAttr', [])
1049        if not self.auth.check_attribute(fid, auth_attr):
1050            raise service_error(service_error.access, "Access denied")
1051        else:
1052            # See if this is a replay of an earlier succeeded StartSegment -
1053            # sometimes SSL kills 'em.  If so, replay the response rather than
1054            # redoing the allocation.
1055            self.state_lock.acquire()
1056            retval = self.allocation[aid].get('started', None)
1057            self.state_lock.release()
1058            if retval:
1059                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1060                        "replaying response")
1061                return retval
1062
1063        # A new request.  Do it.
1064
1065        if topref: topo = topdl.Topology(**topref)
1066        else:
1067            raise service_error(service_error.req, 
1068                    "Request missing segmentdescription'")
1069       
1070        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1071        try:
1072            tmpdir = tempfile.mkdtemp(prefix="access-")
1073            softdir = "%s/software" % tmpdir
1074            os.mkdir(softdir)
1075        except EnvironmentError:
1076            raise service_error(service_error.internal, "Cannot create tmp dir")
1077
1078        # Try block alllows us to clean up temporary files.
1079        try:
1080            self.retrieve_software(topo, certfile, softdir)
1081            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1082                    self.initialize_experiment_info(attrs, aid, 
1083                            certfile, tmpdir)
1084
1085            # Set up userconf and seer if needed
1086            self.configure_userconf(services, tmpdir)
1087            self.configure_seer_services(services, topo, softdir)
1088            # Get and send synch store variables
1089            self.export_store_info(certfile, proj, ename, connInfo)
1090            self.import_store_info(certfile, connInfo)
1091
1092            expfile = "%s/experiment.tcl" % tmpdir
1093
1094            self.generate_portal_configs(topo, pubkey_base, 
1095                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1096            self.generate_ns2(topo, expfile, 
1097                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1098
1099            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1100                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1101                    cert=self.xmlrpc_cert)
1102            rv = starter(self, ename, proj, user, expfile, tmpdir)
1103        except service_error, e:
1104            err = e
1105        except:
1106            t, v, st = sys.exc_info()
1107            err = service_error(service_error.internal, "%s: %s" % \
1108                    (v, traceback.extract_tb(st)))
1109
1110        # Walk up tmpdir, deleting as we go
1111        if self.cleanup: self.remove_dirs(tmpdir)
1112        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1113
1114        if rv:
1115            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1116        elif err:
1117            raise service_error(service_error.federant,
1118                    "Swapin failed: %s" % err)
1119        else:
1120            raise service_error(service_error.federant, "Swapin failed")
1121
1122    def TerminateSegment(self, req, fid):
1123        try:
1124            req = req['TerminateSegmentRequestBody']
1125        except KeyError:
1126            raise service_error(server_error.req, "Badly formed request")
1127
1128        auth_attr = req['allocID']['fedid']
1129        aid = "%s" % auth_attr
1130        attrs = req.get('fedAttr', [])
1131        if not self.auth.check_attribute(fid, auth_attr):
1132            raise service_error(service_error.access, "Access denied")
1133
1134        self.state_lock.acquire()
1135        if aid in self.allocation:
1136            proj = self.allocation[aid].get('project', None)
1137            if not proj: 
1138                proj = self.allocation[aid].get('sproject', None)
1139            user = self.allocation[aid].get('user', None)
1140            ename = self.allocation[aid].get('experiment', None)
1141        else:
1142            proj = None
1143            user = None
1144            ename = None
1145        self.state_lock.release()
1146
1147        if not proj:
1148            raise service_error(service_error.internal, 
1149                    "Can't find project for %s" % aid)
1150
1151        if not user:
1152            raise service_error(service_error.internal, 
1153                    "Can't find creation user for %s" % aid)
1154        if not ename:
1155            raise service_error(service_error.internal, 
1156                    "Can't find experiment name for %s" % aid)
1157        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1158                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1159        stopper(self, user, proj, ename)
1160        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.