source: fedd/federation/emulab_access.py @ 1f6a573

axis_examplecompt_changesinfo-ops
Last change on this file since 1f6a573 was 1f6a573, checked in by Ted Faber <faber@…>, 13 years ago

Support for priorities and export projects

  • Property mode set to 100644
File size: 39.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        # XXX: this should go?
107        #if config.has_option("access", "accessdb"):
108        #    self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'legacy'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'legacy':
120            self.access = { }
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            self.access = [ ]
126            if accessdb:
127                self.read_access(accessdb, self.access_tuple)
128        else:
129            raise service_error(service_error.internal, 
130                    "Unknown auth_type: %s" % self.auth_type)
131
132        # read_state in the base_class
133        self.state_lock.acquire()
134        for a  in ('allocation', 'projects', 'keys', 'types'):
135            if a not in self.state:
136                self.state[a] = { }
137        self.allocation = self.state['allocation']
138        self.projects = self.state['projects']
139        self.keys = self.state['keys']
140        self.types = self.state['types']
141        if self.auth_type == "legacy": 
142            # Add the ownership attributes to the authorizer.  Note that the
143            # indices of the allocation dict are strings, but the attributes are
144            # fedids, so there is a conversion.
145            for k in self.allocation.keys():
146                for o in self.allocation[k].get('owners', []):
147                    self.auth.set_attribute(o, fedid(hexstr=k))
148                if self.allocation[k].has_key('userconfig'):
149                    sfid = self.allocation[k]['userconfig']
150                    fid = fedid(hexstr=sfid)
151                    self.auth.set_attribute(fid, "/%s" % sfid)
152        self.state_lock.release()
153        self.exports = {
154                'SMB': self.export_SMB,
155                'seer': self.export_seer,
156                'tmcd': self.export_tmcd,
157                'userconfig': self.export_userconfig,
158                'project_export': self.export_project_export,
159                'local_seer_control': self.export_local_seer,
160                'seer_master': self.export_seer_master,
161                'hide_hosts': self.export_hide_hosts,
162                }
163
164        if not self.local_seer_image or not self.local_seer_software or \
165                not self.local_seer_start:
166            if 'local_seer_control' in self.exports:
167                del self.exports['local_seer_control']
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.seer_master_start:
171            if 'seer_master' in self.exports:
172                del self.exports['seer_master']
173
174
175        self.soap_services = {\
176            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
177            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
178            'StartSegment': soap_handler("StartSegment", self.StartSegment),
179            'TerminateSegment': soap_handler("TerminateSegment",
180                self.TerminateSegment),
181            }
182        self.xmlrpc_services =  {\
183            'RequestAccess': xmlrpc_handler('RequestAccess',
184                self.RequestAccess),
185            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
186                self.ReleaseAccess),
187            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
188            'TerminateSegment': xmlrpc_handler('TerminateSegment',
189                self.TerminateSegment),
190            }
191
192        self.call_SetValue = service_caller('SetValue')
193        self.call_GetValue = service_caller('GetValue', log=self.log)
194
195        if not config.has_option("allocate", "uri"):
196            self.allocate_project = \
197                allocate_project_local(config, auth)
198        else:
199            self.allocate_project = \
200                allocate_project_remote(config, auth)
201
202
203        # If the project allocator exports services, put them in this object's
204        # maps so that classes that instantiate this can call the services.
205        self.soap_services.update(self.allocate_project.soap_services)
206        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
207
208    @staticmethod
209    def legacy_access_tuple(str):
210        """
211        Convert a string of the form (id[:resources:resouces], id, id) into a
212        tuple of the form (project, user, user) where users may be names or
213        fedids.  The resources strings are obsolete and ignored.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = first[0]
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def access_tuple(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            # The slice takes the parens off the string.
250            proj, user = str[1:-1].split(',')
251            return (proj.strip(), user.strip(), user.strip())
252        else:
253            raise self.parse_error(
254                    'Bad mapping (unbalanced parens or more than 1 comma)')
255
256
257    # RequestAccess support routines
258
259    def legacy_lookup_access(self, req, fid):
260        """
261        Look up the local access control information mapped to this fedid and
262        credentials.  In this case it is a (project, create_user, access_user)
263        triple, and a triple of three booleans indicating which, if any will
264        need to be dynamically created.  Finally a list of owners for that
265        allocation is returned.
266
267        lookup_access_base pulls the first triple out, and it is parsed by this
268        routine into the boolean map.  Owners is always the controlling fedid.
269        """
270        # Return values
271        rp = None
272        ru = None
273        # This maps a valid user to the Emulab projects and users to use
274        found, match = self.legacy_lookup_access_base(req, fid)
275        tb, project, user = match
276       
277        if found == None:
278            raise service_error(service_error.access,
279                    "Access denied - cannot map access")
280
281        # resolve <dynamic> and <same> in found
282        dyn_proj = False
283        dyn_create_user = False
284        dyn_service_user = False
285
286        if found[0] == "<same>":
287            if project != None:
288                rp = project
289            else : 
290                raise service_error(\
291                        service_error.server_config,
292                        "Project matched <same> when no project given")
293        elif found[0] == "<dynamic>":
294            rp = None
295            dyn_proj = True
296        else:
297            rp = found[0]
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def lookup_access(self, req, fid, filter=None, compare=None): 
331        """
332        Check all the attributes that this controller knows how to map and see
333        if the requester is allowed to use any of them.  If so return one.
334        Filter defined the objects to check - it's a function that returns true
335        for the objects to check - and cmp defines the order to check them in
336        as the cmp field of sorted().  If filter is None, all possibilities are
337        checked.  If cmp is None, the choices are sorted by priority.
338        """
339        # NB: comparison order reversed so numerically larger priorities are
340        # checked first.
341        def prio_cmp(a, b):
342            return cmp(b.priority, a.priority)
343
344
345        # Import request credentials into this (clone later??)
346        if self.auth.import_credentials(
347                data_list=req.get('abac_credential', [])):
348            self.auth.save()
349
350        c = compare or prio_cmp
351        if filter: f = filter
352        else: f = lambda(x): True
353
354        check = sorted([ a for a in self.access if f(a)], cmp=c)
355
356        # Check every attribute that we know how to map and take the first
357        # success.
358        for attr in check:
359            if self.auth.check_attribute(fid, attr.attr):
360                self.log.debug("Access succeeded for %s %s" % (attr.attr, fid))
361                # XXX: needs to deal with dynamics
362                return copy.copy(attr.value), (False, False, False), \
363                        [ fid ]
364            else:
365                self.log.debug("Access failed for %s %s" % (attr.attr, fid))
366        else:
367            raise service_error(service_error.access, "Access denied")
368
369
370    def do_project_allocation(self, dyn, project, user):
371        """
372        Call the project allocation routines and return the info.
373        """
374        if dyn: 
375            # Compose the dynamic project request
376            # (only dynamic, dynamic currently allowed)
377            preq = { 'AllocateProjectRequestBody': \
378                        { 'project' : {\
379                            'user': [ \
380                            { \
381                                'access': [ { 
382                                    'sshPubkey': self.ssh_pubkey_file } ],
383                                 'role': "serviceAccess",\
384                            }, \
385                            { \
386                                'access': [ { 
387                                    'sshPubkey': self.ssh_pubkey_file } ],
388                                 'role': "experimentCreation",\
389                            }, \
390                            ], \
391                            }\
392                        }\
393                    }
394            return self.allocate_project.dynamic_project(preq)
395        else:
396            preq = {'StaticProjectRequestBody' : \
397                    { 'project': \
398                        { 'name' : { 'localname' : project },\
399                          'user' : [ \
400                            {\
401                                'userID': { 'localname' : user }, \
402                                'access': [ { 
403                                    'sshPubkey': self.ssh_pubkey_file } ],
404                                'role': 'experimentCreation'\
405                            },\
406                            {\
407                                'userID': { 'localname' : user}, \
408                                'access': [ { 
409                                    'sshPubkey': self.ssh_pubkey_file } ],
410                                'role': 'serviceAccess'\
411                            },\
412                        ]}\
413                    }\
414            }
415            return self.allocate_project.static_project(preq)
416
417    def save_project_state(self, aid, ap, dyn, owners):
418        """
419        Parse out and save the information relevant to the project created for
420        this experiment.  That info is largely in ap and owners.  dyn indicates
421        that the project was created dynamically.  Return the user and project
422        names.
423        """
424        self.state_lock.acquire()
425        self.allocation[aid] = { }
426        try:
427            pname = ap['project']['name']['localname']
428        except KeyError:
429            pname = None
430
431        if dyn:
432            if not pname:
433                self.state_lock.release()
434                raise service_error(service_error.internal,
435                        "Misformed allocation response?")
436            if pname in self.projects: self.projects[pname] += 1
437            else: self.projects[pname] = 1
438            self.allocation[aid]['project'] = pname
439        else:
440            # sproject is a static project associated with this allocation.
441            self.allocation[aid]['sproject'] = pname
442
443        self.allocation[aid]['keys'] = [ ]
444
445        try:
446            for u in ap['project']['user']:
447                uname = u['userID']['localname']
448                if u['role'] == 'experimentCreation':
449                    self.allocation[aid]['user'] = uname
450                for k in [ k['sshPubkey'] for k in u['access'] \
451                        if k.has_key('sshPubkey') ]:
452                    kv = "%s:%s" % (uname, k)
453                    if self.keys.has_key(kv): self.keys[kv] += 1
454                    else: self.keys[kv] = 1
455                    self.allocation[aid]['keys'].append((uname, k))
456        except KeyError:
457            self.state_lock.release()
458            raise service_error(service_error.internal,
459                    "Misformed allocation response?")
460
461        self.allocation[aid]['owners'] = owners
462        self.write_state()
463        self.state_lock.release()
464        return (pname, uname)
465
466    # End of RequestAccess support routines
467
468    def RequestAccess(self, req, fid):
469        """
470        Handle the access request.  Proxy if not for us.
471
472        Parse out the fields and make the allocations or rejections if for us,
473        otherwise, assuming we're willing to proxy, proxy the request out.
474        """
475
476        def gateway_hardware(h):
477            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
478            else: return h
479
480        def get_export_project(svcs):
481            """
482            if the service requests includes one to export a project, return
483            that project.
484            """
485            rv = None
486            for s in svcs:
487                if s.get('name', '') == 'project_export' and \
488                        s.get('visibility', '') == 'export':
489                    if not rv: 
490                        for a in s.get('fedAttr', []):
491                            if a.get('attribute', '') == 'project' \
492                                    and 'value' in a:
493                                rv = a['value']
494                    else:
495                        raise service_error(service_error, access, 
496                                'Requesting multiple project exports is ' + \
497                                        'not supported');
498            return rv
499
500        # The dance to get into the request body
501        if req.has_key('RequestAccessRequestBody'):
502            req = req['RequestAccessRequestBody']
503        else:
504            raise service_error(service_error.req, "No request!?")
505
506        # if this includes a project export request, construct a filter such
507        # that only the ABAC attributes mapped to that project are checked for
508        # access.
509        if 'service' in req:
510            ep = get_export_project(req['service'])
511            pf = lambda(a): a.value[0] == ep
512        else:
513            ep = None
514            pf = None
515
516        if self.auth.import_credentials(
517                data_list=req.get('abac_credential', [])):
518            self.auth.save()
519
520        if self.auth_type == "legacy":
521            found, dyn, owners = self.legacy_lookup_access(req, fid)
522        elif self.auth_type == 'abac':
523            found, dyn, owners = self.lookup_access(req, fid, filter=pf)
524        else:
525            raise service_error(service_error.internal, 
526                    'Unknown auth_type: %s' % self.auth_type)
527        ap = None
528
529        # This only happens in legacy lookups, but if this user has access to
530        # the testbed but not the project to be exported, raise the error.
531        if ep and ep != found[0]:
532            raise service_error(service_error.access,
533                    "Cannot export %s" % ep)
534
535        if self.ssh_pubkey_file:
536            ap = self.do_project_allocation(dyn[1], found[0], found[1])
537        else:
538            raise service_error(service_error.internal, 
539                    "SSH access parameters required")
540        # keep track of what's been added
541        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
542        aid = unicode(allocID)
543
544        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
545
546        services, svc_state = self.export_services(req.get('service',[]),
547                pname, uname)
548        self.state_lock.acquire()
549        # Store services state in global state
550        for k, v in svc_state.items():
551            self.allocation[aid][k] = v
552        self.write_state()
553        self.state_lock.release()
554        # Give the owners the right to change this allocation
555        for o in owners:
556            self.auth.set_attribute(o, allocID)
557        self.auth.save()
558        try:
559            f = open("%s/%s.pem" % (self.certdir, aid), "w")
560            print >>f, alloc_cert
561            f.close()
562        except EnvironmentError, e:
563            raise service_error(service_error.internal, 
564                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
565        resp = self.build_access_response({ 'fedid': allocID } ,
566                ap, services)
567        return resp
568
569    def do_release_project(self, del_project, del_users, del_types):
570        """
571        If a project and users has to be deleted, make the call.
572        """
573        msg = { 'project': { }}
574        if del_project:
575            msg['project']['name']= {'localname': del_project}
576        users = [ ]
577        for u in del_users.keys():
578            users.append({ 'userID': { 'localname': u },\
579                'access' :  \
580                        [ {'sshPubkey' : s } for s in del_users[u]]\
581            })
582        if users: 
583            msg['project']['user'] = users
584        if len(del_types) > 0:
585            msg['resources'] = { 'node': \
586                    [ {'hardware': [ h ] } for h in del_types ]\
587                }
588        if self.allocate_project.release_project:
589            msg = { 'ReleaseProjectRequestBody' : msg}
590            self.allocate_project.release_project(msg)
591
592    def ReleaseAccess(self, req, fid):
593        # The dance to get into the request body
594        if req.has_key('ReleaseAccessRequestBody'):
595            req = req['ReleaseAccessRequestBody']
596        else:
597            raise service_error(service_error.req, "No request!?")
598
599        try:
600            if req['allocID'].has_key('localname'):
601                auth_attr = aid = req['allocID']['localname']
602            elif req['allocID'].has_key('fedid'):
603                aid = unicode(req['allocID']['fedid'])
604                auth_attr = req['allocID']['fedid']
605            else:
606                raise service_error(service_error.req,
607                        "Only localnames and fedids are understood")
608        except KeyError:
609            raise service_error(service_error.req, "Badly formed request")
610
611        self.log.debug("[access] deallocation requested for %s by %s" % \
612                (aid, fid))
613        if not self.auth.check_attribute(fid, auth_attr):
614            self.log.debug("[access] deallocation denied for %s", aid)
615            raise service_error(service_error.access, "Access Denied")
616
617        # If we know this allocation, reduce the reference counts and
618        # remove the local allocations.  Otherwise report an error.  If
619        # there is an allocation to delete, del_users will be a dictonary
620        # of sets where the key is the user that owns the keys in the set.
621        # We use a set to avoid duplicates.  del_project is just the name
622        # of any dynamic project to delete.  We're somewhat lazy about
623        # deleting authorization attributes.  Having access to something
624        # that doesn't exist isn't harmful.
625        del_users = { }
626        del_project = None
627        del_types = set()
628
629        self.state_lock.acquire()
630        if aid in self.allocation:
631            self.log.debug("Found allocation for %s" %aid)
632            for k in self.allocation[aid]['keys']:
633                kk = "%s:%s" % k
634                self.keys[kk] -= 1
635                if self.keys[kk] == 0:
636                    if not del_users.has_key(k[0]):
637                        del_users[k[0]] = set()
638                    del_users[k[0]].add(k[1])
639                    del self.keys[kk]
640
641            if 'project' in self.allocation[aid]:
642                pname = self.allocation[aid]['project']
643                self.projects[pname] -= 1
644                if self.projects[pname] == 0:
645                    del_project = pname
646                    del self.projects[pname]
647
648            if 'types' in self.allocation[aid]:
649                for t in self.allocation[aid]['types']:
650                    self.types[t] -= 1
651                    if self.types[t] == 0:
652                        if not del_project: del_project = t[0]
653                        del_types.add(t[1])
654                        del self.types[t]
655
656            del self.allocation[aid]
657            self.write_state()
658            self.state_lock.release()
659            # If we actually have resources to deallocate, prepare the call.
660            if del_project or del_users:
661                self.do_release_project(del_project, del_users, del_types)
662            # And remove the access cert
663            cf = "%s/%s.pem" % (self.certdir, aid)
664            self.log.debug("Removing %s" % cf)
665            os.remove(cf)
666            return { 'allocID': req['allocID'] } 
667        else:
668            self.state_lock.release()
669            raise service_error(service_error.req, "No such allocation")
670
671    # These are subroutines for StartSegment
672    def generate_ns2(self, topo, expfn, softdir, connInfo):
673        """
674        Convert topo into an ns2 file, decorated with appropriate commands for
675        the particular testbed setup.  Convert all requests for software, etc
676        to point at the staged copies on this testbed and add the federation
677        startcommands.
678        """
679        class dragon_commands:
680            """
681            Functor to spit out approrpiate dragon commands for nodes listed in
682            the connectivity description.  The constructor makes a dict mapping
683            dragon nodes to their parameters and the __call__ checks each
684            element in turn for membership.
685            """
686            def __init__(self, map):
687                self.node_info = map
688
689            def __call__(self, e):
690                s = ""
691                if isinstance(e, topdl.Computer):
692                    if self.node_info.has_key(e.name):
693                        info = self.node_info[e.name]
694                        for ifname, vlan, type in info:
695                            for i in e.interface:
696                                if i.name == ifname:
697                                    addr = i.get_attribute('ip4_address')
698                                    subs = i.substrate[0]
699                                    break
700                            else:
701                                raise service_error(service_error.internal,
702                                        "No interface %s on element %s" % \
703                                                (ifname, e.name))
704                            # XXX: do netmask right
705                            if type =='link':
706                                s = ("tb-allow-external ${%s} " + \
707                                        "dragonportal ip %s vlan %s " + \
708                                        "netmask 255.255.255.0\n") % \
709                                        (topdl.to_tcl_name(e.name), addr, vlan)
710                            elif type =='lan':
711                                s = ("tb-allow-external ${%s} " + \
712                                        "dragonportal " + \
713                                        "ip %s vlan %s usurp %s\n") % \
714                                        (topdl.to_tcl_name(e.name), addr, 
715                                                vlan, subs)
716                            else:
717                                raise service_error(service_error_internal,
718                                        "Unknown DRAGON type %s" % type)
719                return s
720
721        class not_dragon:
722            """
723            Return true if a node is in the given map of dragon nodes.
724            """
725            def __init__(self, map):
726                self.nodes = set(map.keys())
727
728            def __call__(self, e):
729                return e.name not in self.nodes
730
731        # Main line of generate_ns2
732        t = topo.clone()
733
734        # Create the map of nodes that need direct connections (dragon
735        # connections) from the connInfo
736        dragon_map = { }
737        for i in [ i for i in connInfo if i['type'] == 'transit']:
738            for a in i.get('fedAttr', []):
739                if a['attribute'] == 'vlan_id':
740                    vlan = a['value']
741                    break
742            else:
743                raise service_error(service_error.internal, "No vlan tag")
744            members = i.get('member', [])
745            if len(members) > 1: type = 'lan'
746            else: type = 'link'
747
748            try:
749                for m in members:
750                    if m['element'] in dragon_map:
751                        dragon_map[m['element']].append(( m['interface'], 
752                            vlan, type))
753                    else:
754                        dragon_map[m['element']] = [( m['interface'], 
755                            vlan, type),]
756            except KeyError:
757                raise service_error(service_error.req,
758                        "Missing connectivity info")
759
760        # Weed out the things we aren't going to instantiate: Segments, portal
761        # substrates, and portal interfaces.  (The copy in the for loop allows
762        # us to delete from e.elements in side the for loop).  While we're
763        # touching all the elements, we also adjust paths from the original
764        # testbed to local testbed paths and put the federation commands into
765        # the start commands
766        for e in [e for e in t.elements]:
767            if isinstance(e, topdl.Segment):
768                t.elements.remove(e)
769            if isinstance(e, topdl.Computer):
770                self.add_kit(e, self.federation_software)
771                if e.get_attribute('portal') and self.portal_startcommand:
772                    # Add local portal support software
773                    self.add_kit(e, self.portal_software)
774                    # Portals never have a user-specified start command
775                    e.set_attribute('startup', self.portal_startcommand)
776                elif self.node_startcommand:
777                    if e.get_attribute('startup'):
778                        e.set_attribute('startup', "%s \\$USER '%s'" % \
779                                (self.node_startcommand, 
780                                    e.get_attribute('startup')))
781                    else:
782                        e.set_attribute('startup', self.node_startcommand)
783
784                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
785                # Remove portal interfaces that do not connect to DRAGON
786                e.interface = [i for i in e.interface \
787                        if not i.get_attribute('portal') or i.name in dinf ]
788            # Fix software paths
789            for s in getattr(e, 'software', []):
790                s.location = re.sub("^.*/", softdir, s.location)
791
792        t.substrates = [ s.clone() for s in t.substrates ]
793        t.incorporate_elements()
794
795        # Customize the ns2 output for local portal commands and images
796        filters = []
797
798        if self.dragon_endpoint:
799            add_filter = not_dragon(dragon_map)
800            filters.append(dragon_commands(dragon_map))
801        else:
802            add_filter = None
803
804        if self.portal_command:
805            filters.append(topdl.generate_portal_command_filter(
806                self.portal_command, add_filter=add_filter))
807
808        if self.portal_image:
809            filters.append(topdl.generate_portal_image_filter(
810                self.portal_image))
811
812        if self.portal_type:
813            filters.append(topdl.generate_portal_hardware_filter(
814                self.portal_type))
815
816        # Convert to ns and write it out
817        expfile = topdl.topology_to_ns2(t, filters)
818        try:
819            f = open(expfn, "w")
820            print >>f, expfile
821            f.close()
822        except EnvironmentError:
823            raise service_error(service_error.internal,
824                    "Cannot write experiment file %s: %s" % (expfn,e))
825
826    def export_store_info(self, cf, proj, ename, connInfo):
827        """
828        For the export requests in the connection info, install the peer names
829        at the experiment controller via SetValue calls.
830        """
831
832        for c in connInfo:
833            for p in [ p for p in c.get('parameter', []) \
834                    if p.get('type', '') == 'output']:
835
836                if p.get('name', '') == 'peer':
837                    k = p.get('key', None)
838                    surl = p.get('store', None)
839                    if surl and k and k.index('/') != -1:
840                        value = "%s.%s.%s%s" % \
841                                (k[k.index('/')+1:], ename, proj, self.domain)
842                        req = { 'name': k, 'value': value }
843                        self.log.debug("Setting %s to %s on %s" % \
844                                (k, value, surl))
845                        self.call_SetValue(surl, req, cf)
846                    else:
847                        self.log.error("Bad export request: %s" % p)
848                elif p.get('name', '') == 'ssh_port':
849                    k = p.get('key', None)
850                    surl = p.get('store', None)
851                    if surl and k:
852                        req = { 'name': k, 'value': self.ssh_port }
853                        self.log.debug("Setting %s to %s on %s" % \
854                                (k, self.ssh_port, surl))
855                        self.call_SetValue(surl, req, cf)
856                    else:
857                        self.log.error("Bad export request: %s" % p)
858                else:
859                    self.log.error("Unknown export parameter: %s" % \
860                            p.get('name'))
861                    continue
862
863    def add_seer_node(self, topo, name, startup):
864        """
865        Add a seer node to the given topology, with the startup command passed
866        in.  Used by configure seer_services.
867        """
868        c_node = topdl.Computer(
869                name=name, 
870                os= topdl.OperatingSystem(
871                    attribute=[
872                    { 'attribute': 'osid', 
873                        'value': self.local_seer_image },
874                    ]),
875                attribute=[
876                    { 'attribute': 'startup', 'value': startup },
877                    ]
878                )
879        self.add_kit(c_node, self.local_seer_software)
880        topo.elements.append(c_node)
881
882    def configure_seer_services(self, services, topo, softdir):
883        """
884        Make changes to the topology required for the seer requests being made.
885        Specifically, add any control or master nodes required and set up the
886        start commands on the nodes to interconnect them.
887        """
888        local_seer = False      # True if we need to add a control node
889        collect_seer = False    # True if there is a seer-master node
890        seer_master= False      # True if we need to add the seer-master
891        for s in services:
892            s_name = s.get('name', '')
893            s_vis = s.get('visibility','')
894
895            if s_name  == 'local_seer_control' and s_vis == 'export':
896                local_seer = True
897            elif s_name == 'seer_master':
898                if s_vis == 'import':
899                    collect_seer = True
900                elif s_vis == 'export':
901                    seer_master = True
902       
903        # We've got the whole picture now, so add nodes if needed and configure
904        # them to interconnect properly.
905        if local_seer or seer_master:
906            # Copy local seer control node software to the tempdir
907            for l, f in self.local_seer_software:
908                base = os.path.basename(f)
909                copy_file(f, "%s/%s" % (softdir, base))
910        # If we're collecting seers somewhere the controllers need to talk to
911        # the master.  In testbeds that export the service, that will be a
912        # local node that we'll add below.  Elsewhere it will be the control
913        # portal that will port forward to the exporting master.
914        if local_seer:
915            if collect_seer:
916                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
917            else:
918                startup = self.local_seer_start
919            self.add_seer_node(topo, 'control', startup)
920        # If this is the seer master, add that node, too.
921        if seer_master:
922            self.add_seer_node(topo, 'seer-master', 
923                    "%s -R -n -R seer-master -R -A -R sink" % \
924                            self.seer_master_start)
925
926    def retrieve_software(self, topo, certfile, softdir):
927        """
928        Collect the software that nodes in the topology need loaded and stage
929        it locally.  This implies retrieving it from the experiment_controller
930        and placing it into softdir.  Certfile is used to prove that this node
931        has access to that data (it's the allocation/segment fedid).  Finally
932        local portal and federation software is also copied to the same staging
933        directory for simplicity - all software needed for experiment creation
934        is in softdir.
935        """
936        sw = set()
937        for e in topo.elements:
938            for s in getattr(e, 'software', []):
939                sw.add(s.location)
940        for s in sw:
941            self.log.debug("Retrieving %s" % s)
942            try:
943                get_url(s, certfile, softdir)
944            except:
945                t, v, st = sys.exc_info()
946                raise service_error(service_error.internal,
947                        "Error retrieving %s: %s" % (s, v))
948
949        # Copy local federation and portal node software to the tempdir
950        for s in (self.federation_software, self.portal_software):
951            for l, f in s:
952                base = os.path.basename(f)
953                copy_file(f, "%s/%s" % (softdir, base))
954
955
956    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
957        """
958        Gather common configuration files, retrieve or create an experiment
959        name and project name, and return the ssh_key filenames.  Create an
960        allocation log bound to the state log variable as well.
961        """
962        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
963        ename = None
964        pubkey_base = None
965        secretkey_base = None
966        proj = None
967        user = None
968        alloc_log = None
969
970        for a in attrs:
971            if a['attribute'] in configs:
972                try:
973                    self.log.debug("Retrieving %s from %s" % \
974                            (a['attribute'], a['value']))
975                    get_url(a['value'], certfile, tmpdir)
976                except:
977                    t, v, st = sys.exc_info()
978                    raise service_error(service_error.internal,
979                            "Error retrieving %s: %s" % (a.get('value', ""), v))
980            if a['attribute'] == 'ssh_pubkey':
981                pubkey_base = a['value'].rpartition('/')[2]
982            if a['attribute'] == 'ssh_secretkey':
983                secretkey_base = a['value'].rpartition('/')[2]
984            if a['attribute'] == 'experiment_name':
985                ename = a['value']
986
987        if not ename:
988            ename = ""
989            for i in range(0,5):
990                ename += random.choice(string.ascii_letters)
991            self.log.warn("No experiment name: picked one randomly: %s" \
992                    % ename)
993
994        if not pubkey_base:
995            raise service_error(service_error.req, 
996                    "No public key attribute")
997
998        if not secretkey_base:
999            raise service_error(service_error.req, 
1000                    "No secret key attribute")
1001
1002        self.state_lock.acquire()
1003        if aid in self.allocation:
1004            proj = self.allocation[aid].get('project', None)
1005            if not proj: 
1006                proj = self.allocation[aid].get('sproject', None)
1007            user = self.allocation[aid].get('user', None)
1008            self.allocation[aid]['experiment'] = ename
1009            self.allocation[aid]['log'] = [ ]
1010            # Create a logger that logs to the experiment's state object as
1011            # well as to the main log file.
1012            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1013            h = logging.StreamHandler(
1014                    list_log.list_log(self.allocation[aid]['log']))
1015            # XXX: there should be a global one of these rather than
1016            # repeating the code.
1017            h.setFormatter(logging.Formatter(
1018                "%(asctime)s %(name)s %(message)s",
1019                        '%d %b %y %H:%M:%S'))
1020            alloc_log.addHandler(h)
1021            self.write_state()
1022        self.state_lock.release()
1023
1024        if not proj:
1025            raise service_error(service_error.internal, 
1026                    "Can't find project for %s" %aid)
1027
1028        if not user:
1029            raise service_error(service_error.internal, 
1030                    "Can't find creation user for %s" %aid)
1031
1032        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1033
1034    def finalize_experiment(self, starter, topo, aid, alloc_id):
1035        """
1036        Store key bits of experiment state in the global repository, including
1037        the response that may need to be replayed, and return the response.
1038        """
1039        # Copy the assigned names into the return topology
1040        embedding = [ ]
1041        for n in starter.node:
1042            embedding.append({ 
1043                'toponame': n,
1044                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1045                })
1046        # Grab the log (this is some anal locking, but better safe than
1047        # sorry)
1048        self.state_lock.acquire()
1049        logv = "".join(self.allocation[aid]['log'])
1050        # It's possible that the StartSegment call gets retried (!).
1051        # if the 'started' key is in the allocation, we'll return it rather
1052        # than redo the setup.
1053        self.allocation[aid]['started'] = { 
1054                'allocID': alloc_id,
1055                'allocationLog': logv,
1056                'segmentdescription': { 
1057                    'topdldescription': topo.clone().to_dict()
1058                    },
1059                'embedding': embedding
1060                }
1061        retval = copy.copy(self.allocation[aid]['started'])
1062        self.write_state()
1063        self.state_lock.release()
1064        return retval
1065   
1066    # End of StartSegment support routines
1067
1068    def StartSegment(self, req, fid):
1069        err = None  # Any service_error generated after tmpdir is created
1070        rv = None   # Return value from segment creation
1071
1072        try:
1073            req = req['StartSegmentRequestBody']
1074            auth_attr = req['allocID']['fedid']
1075            topref = req['segmentdescription']['topdldescription']
1076        except KeyError:
1077            raise service_error(server_error.req, "Badly formed request")
1078
1079        connInfo = req.get('connection', [])
1080        services = req.get('service', [])
1081        aid = "%s" % auth_attr
1082        attrs = req.get('fedAttr', [])
1083        if not self.auth.check_attribute(fid, auth_attr):
1084            raise service_error(service_error.access, "Access denied")
1085        else:
1086            # See if this is a replay of an earlier succeeded StartSegment -
1087            # sometimes SSL kills 'em.  If so, replay the response rather than
1088            # redoing the allocation.
1089            self.state_lock.acquire()
1090            retval = self.allocation[aid].get('started', None)
1091            self.state_lock.release()
1092            if retval:
1093                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1094                        "replaying response")
1095                return retval
1096
1097        # A new request.  Do it.
1098
1099        if topref: topo = topdl.Topology(**topref)
1100        else:
1101            raise service_error(service_error.req, 
1102                    "Request missing segmentdescription'")
1103       
1104        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1105        try:
1106            tmpdir = tempfile.mkdtemp(prefix="access-")
1107            softdir = "%s/software" % tmpdir
1108            os.mkdir(softdir)
1109        except EnvironmentError:
1110            raise service_error(service_error.internal, "Cannot create tmp dir")
1111
1112        # Try block alllows us to clean up temporary files.
1113        try:
1114            self.retrieve_software(topo, certfile, softdir)
1115            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1116                    self.initialize_experiment_info(attrs, aid, 
1117                            certfile, tmpdir)
1118
1119            # Set up userconf and seer if needed
1120            self.configure_userconf(services, tmpdir)
1121            self.configure_seer_services(services, topo, softdir)
1122            # Get and send synch store variables
1123            self.export_store_info(certfile, proj, ename, connInfo)
1124            self.import_store_info(certfile, connInfo)
1125
1126            expfile = "%s/experiment.tcl" % tmpdir
1127
1128            self.generate_portal_configs(topo, pubkey_base, 
1129                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1130            self.generate_ns2(topo, expfile, 
1131                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1132
1133            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1134                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1135                    cert=self.xmlrpc_cert)
1136            rv = starter(self, ename, proj, user, expfile, tmpdir)
1137        except service_error, e:
1138            err = e
1139        except:
1140            t, v, st = sys.exc_info()
1141            err = service_error(service_error.internal, "%s: %s" % \
1142                    (v, traceback.extract_tb(st)))
1143
1144        # Walk up tmpdir, deleting as we go
1145        if self.cleanup: self.remove_dirs(tmpdir)
1146        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1147
1148        if rv:
1149            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1150        elif err:
1151            raise service_error(service_error.federant,
1152                    "Swapin failed: %s" % err)
1153        else:
1154            raise service_error(service_error.federant, "Swapin failed")
1155
1156    def TerminateSegment(self, req, fid):
1157        try:
1158            req = req['TerminateSegmentRequestBody']
1159        except KeyError:
1160            raise service_error(server_error.req, "Badly formed request")
1161
1162        auth_attr = req['allocID']['fedid']
1163        aid = "%s" % auth_attr
1164        attrs = req.get('fedAttr', [])
1165        if not self.auth.check_attribute(fid, auth_attr):
1166            raise service_error(service_error.access, "Access denied")
1167
1168        self.state_lock.acquire()
1169        if aid in self.allocation:
1170            proj = self.allocation[aid].get('project', None)
1171            if not proj: 
1172                proj = self.allocation[aid].get('sproject', None)
1173            user = self.allocation[aid].get('user', None)
1174            ename = self.allocation[aid].get('experiment', None)
1175        else:
1176            proj = None
1177            user = None
1178            ename = None
1179        self.state_lock.release()
1180
1181        if not proj:
1182            raise service_error(service_error.internal, 
1183                    "Can't find project for %s" % aid)
1184
1185        if not user:
1186            raise service_error(service_error.internal, 
1187                    "Can't find creation user for %s" % aid)
1188        if not ename:
1189            raise service_error(service_error.internal, 
1190                    "Can't find experiment name for %s" % aid)
1191        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1192                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1193        stopper(self, user, proj, ename)
1194        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.