source: fedd/federation/emulab_access.py @ c002cb2

axis_examplecompt_changesinfo-ops
Last change on this file since c002cb2 was c002cb2, checked in by Ted Faber <faber@…>, 13 years ago

Structure for priority and filtering of ABAC attributes at access check time

  • Property mode set to 100644
File size: 39.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        # XXX: this should go?
107        #if config.has_option("access", "accessdb"):
108        #    self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'legacy'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'legacy':
120            self.access = { }
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            self.access = [ ]
126            if accessdb:
127                self.read_access(accessdb, self.access_tuple)
128        else:
129            raise service_error(service_error.internal, 
130                    "Unknown auth_type: %s" % self.auth_type)
131
132        # read_state in the base_class
133        self.state_lock.acquire()
134        for a  in ('allocation', 'projects', 'keys', 'types'):
135            if a not in self.state:
136                self.state[a] = { }
137        self.allocation = self.state['allocation']
138        self.projects = self.state['projects']
139        self.keys = self.state['keys']
140        self.types = self.state['types']
141        if self.auth_type == "legacy": 
142            # Add the ownership attributes to the authorizer.  Note that the
143            # indices of the allocation dict are strings, but the attributes are
144            # fedids, so there is a conversion.
145            for k in self.allocation.keys():
146                for o in self.allocation[k].get('owners', []):
147                    self.auth.set_attribute(o, fedid(hexstr=k))
148                if self.allocation[k].has_key('userconfig'):
149                    sfid = self.allocation[k]['userconfig']
150                    fid = fedid(hexstr=sfid)
151                    self.auth.set_attribute(fid, "/%s" % sfid)
152        self.state_lock.release()
153        self.exports = {
154                'SMB': self.export_SMB,
155                'seer': self.export_seer,
156                'tmcd': self.export_tmcd,
157                'userconfig': self.export_userconfig,
158                'project_export': self.export_project_export,
159                'local_seer_control': self.export_local_seer,
160                'seer_master': self.export_seer_master,
161                'hide_hosts': self.export_hide_hosts,
162                }
163
164        if not self.local_seer_image or not self.local_seer_software or \
165                not self.local_seer_start:
166            if 'local_seer_control' in self.exports:
167                del self.exports['local_seer_control']
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.seer_master_start:
171            if 'seer_master' in self.exports:
172                del self.exports['seer_master']
173
174
175        self.soap_services = {\
176            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
177            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
178            'StartSegment': soap_handler("StartSegment", self.StartSegment),
179            'TerminateSegment': soap_handler("TerminateSegment",
180                self.TerminateSegment),
181            }
182        self.xmlrpc_services =  {\
183            'RequestAccess': xmlrpc_handler('RequestAccess',
184                self.RequestAccess),
185            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
186                self.ReleaseAccess),
187            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
188            'TerminateSegment': xmlrpc_handler('TerminateSegment',
189                self.TerminateSegment),
190            }
191
192        self.call_SetValue = service_caller('SetValue')
193        self.call_GetValue = service_caller('GetValue', log=self.log)
194
195        if not config.has_option("allocate", "uri"):
196            self.allocate_project = \
197                allocate_project_local(config, auth)
198        else:
199            self.allocate_project = \
200                allocate_project_remote(config, auth)
201
202
203        # If the project allocator exports services, put them in this object's
204        # maps so that classes that instantiate this can call the services.
205        self.soap_services.update(self.allocate_project.soap_services)
206        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
207
208    @staticmethod
209    def legacy_access_tuple(str):
210        """
211        Convert a string of the form (id[:resources:resouces], id, id) into a
212        tuple of the form (project, user, user) where users may be names or
213        fedids.  The resources strings are obsolete and ignored.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = first[0]
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def access_tuple(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            # The slice takes the parens off the string.
250            proj, user = str[1:-1].split(',')
251            return (proj.strip(), user.strip(), user.strip())
252        else:
253            raise self.parse_error(
254                    'Bad mapping (unbalanced parens or more than 1 comma)')
255
256
257    # RequestAccess support routines
258
259    def legacy_lookup_access(self, req, fid):
260        """
261        Look up the local access control information mapped to this fedid and
262        credentials.  In this case it is a (project, create_user, access_user)
263        triple, and a triple of three booleans indicating which, if any will
264        need to be dynamically created.  Finally a list of owners for that
265        allocation is returned.
266
267        lookup_access_base pulls the first triple out, and it is parsed by this
268        routine into the boolean map.  Owners is always the controlling fedid.
269        """
270        # Return values
271        rp = None
272        ru = None
273        # This maps a valid user to the Emulab projects and users to use
274        found, match = self.legacy_lookup_access_base(req, fid)
275        tb, project, user = match
276       
277        if found == None:
278            raise service_error(service_error.access,
279                    "Access denied - cannot map access")
280
281        # resolve <dynamic> and <same> in found
282        dyn_proj = False
283        dyn_create_user = False
284        dyn_service_user = False
285
286        if found[0] == "<same>":
287            if project != None:
288                rp = project
289            else : 
290                raise service_error(\
291                        service_error.server_config,
292                        "Project matched <same> when no project given")
293        elif found[0] == "<dynamic>":
294            rp = None
295            dyn_proj = True
296        else:
297            rp = found[0]
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def lookup_access(self, req, fid, filter=None, compare=None): 
331        """
332        Check all the attributes that this controller knows how to map and see
333        if the requester is allowed to use any of them.  If so return one.
334        Filter defined the objects to check - it's a function that returns true
335        for the objects to check - and cmp defines the order to check them in
336        as the cmp field of sorted().  If filter is None, all possibilities are
337        checked.  If cmp is None, the choices are sorted by priority.
338        """
339        # NB: comparison order reversed so numerically larger priorities are
340        # checked first.
341        def prio_cmp(a, b):
342            return cmp(b.priority, a.priority)
343
344
345        # Import request credentials into this (clone later??)
346        if self.auth.import_credentials(
347                data_list=req.get('abac_credential', [])):
348            self.auth.save()
349
350        c = compare or prio_cmp
351        if filter: f = filter
352        else: f = lambda(x): True
353
354        check = sorted([ a for a in self.access if f(a)], cmp=c)
355
356        # Check every attribute that we know how to map and take the first
357        # success.
358        for attr in check:
359            if self.auth.check_attribute(fid, attr.attr):
360                self.log.debug("Access succeeded for %s %s" % (attr.attr, fid))
361                # XXX: needs to deal with dynamics
362                return copy.copy(attr.value), (False, False, False), \
363                        [ fid ]
364            else:
365                self.log.debug("Access failed for %s %s" % (attr.attr, fid))
366        else:
367            raise service_error(service_error.access, "Access denied")
368
369
370    def do_project_allocation(self, dyn, project, user):
371        """
372        Call the project allocation routines and return the info.
373        """
374        if dyn: 
375            # Compose the dynamic project request
376            # (only dynamic, dynamic currently allowed)
377            preq = { 'AllocateProjectRequestBody': \
378                        { 'project' : {\
379                            'user': [ \
380                            { \
381                                'access': [ { 
382                                    'sshPubkey': self.ssh_pubkey_file } ],
383                                 'role': "serviceAccess",\
384                            }, \
385                            { \
386                                'access': [ { 
387                                    'sshPubkey': self.ssh_pubkey_file } ],
388                                 'role': "experimentCreation",\
389                            }, \
390                            ], \
391                            }\
392                        }\
393                    }
394            return self.allocate_project.dynamic_project(preq)
395        else:
396            preq = {'StaticProjectRequestBody' : \
397                    { 'project': \
398                        { 'name' : { 'localname' : project },\
399                          'user' : [ \
400                            {\
401                                'userID': { 'localname' : user }, \
402                                'access': [ { 
403                                    'sshPubkey': self.ssh_pubkey_file } ],
404                                'role': 'experimentCreation'\
405                            },\
406                            {\
407                                'userID': { 'localname' : user}, \
408                                'access': [ { 
409                                    'sshPubkey': self.ssh_pubkey_file } ],
410                                'role': 'serviceAccess'\
411                            },\
412                        ]}\
413                    }\
414            }
415            return self.allocate_project.static_project(preq)
416
417    def save_project_state(self, aid, ap, dyn, owners):
418        """
419        Parse out and save the information relevant to the project created for
420        this experiment.  That info is largely in ap and owners.  dyn indicates
421        that the project was created dynamically.  Return the user and project
422        names.
423        """
424        self.state_lock.acquire()
425        self.allocation[aid] = { }
426        try:
427            pname = ap['project']['name']['localname']
428        except KeyError:
429            pname = None
430
431        if dyn:
432            if not pname:
433                self.state_lock.release()
434                raise service_error(service_error.internal,
435                        "Misformed allocation response?")
436            if pname in self.projects: self.projects[pname] += 1
437            else: self.projects[pname] = 1
438            self.allocation[aid]['project'] = pname
439        else:
440            # sproject is a static project associated with this allocation.
441            self.allocation[aid]['sproject'] = pname
442
443        self.allocation[aid]['keys'] = [ ]
444
445        try:
446            for u in ap['project']['user']:
447                uname = u['userID']['localname']
448                if u['role'] == 'experimentCreation':
449                    self.allocation[aid]['user'] = uname
450                for k in [ k['sshPubkey'] for k in u['access'] \
451                        if k.has_key('sshPubkey') ]:
452                    kv = "%s:%s" % (uname, k)
453                    if self.keys.has_key(kv): self.keys[kv] += 1
454                    else: self.keys[kv] = 1
455                    self.allocation[aid]['keys'].append((uname, k))
456        except KeyError:
457            self.state_lock.release()
458            raise service_error(service_error.internal,
459                    "Misformed allocation response?")
460
461        self.allocation[aid]['owners'] = owners
462        self.write_state()
463        self.state_lock.release()
464        return (pname, uname)
465
466    # End of RequestAccess support routines
467
468    def RequestAccess(self, req, fid):
469        """
470        Handle the access request.  Proxy if not for us.
471
472        Parse out the fields and make the allocations or rejections if for us,
473        otherwise, assuming we're willing to proxy, proxy the request out.
474        """
475
476        def gateway_hardware(h):
477            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
478            else: return h
479
480        def get_export_project(svcs):
481            """
482            if the service requests includes one to export a project, return
483            that project.
484            """
485            rv = None
486            for s in svcs:
487                if s.get('name', '') == 'project_export' and \
488                        s.get('visibility', '') == 'export':
489                    if not rv: 
490                        for a in s.get('feddAttr', []):
491                            if a.get('attribute', '') == 'project' \
492                                    and 'value' in a:
493                                rv = a['value']
494                    else:
495                        raise service_error(service_error, access, 
496                                'Requesting multiple project exports is ' + \
497                                        'not supported');
498            return rv
499
500        # The dance to get into the request body
501        if req.has_key('RequestAccessRequestBody'):
502            req = req['RequestAccessRequestBody']
503        else:
504            raise service_error(service_error.req, "No request!?")
505
506        alog = open("./auth.log", 'w')
507        print >>alog, self.auth
508        print >> alog, "after"
509        if self.auth.import_credentials(
510                data_list=req.get('abac_credential', [])):
511            self.auth.save()
512        print >>alog, self.auth
513        alog.close()
514
515        if self.auth_type == "legacy":
516            found, dyn, owners = self.legacy_lookup_access(req, fid)
517        elif self.auth_type == 'abac':
518            found, dyn, owners = self.lookup_access(req, fid)
519        else:
520            raise service_error(service_error.internal, 
521                    'Unknown auth_type: %s' % self.auth_type)
522        ap = None
523
524        # if this includes a project export request and the exported
525        # project is not the access project, access denied.
526        if 'service' in req:
527            ep = get_export_project(req['service'])
528            if ep and ep != found[0]:
529                raise service_error(service_error.access,
530                        "Cannot export %s" % ep)
531
532        if self.ssh_pubkey_file:
533            ap = self.do_project_allocation(dyn[1], found[0], found[1])
534        else:
535            raise service_error(service_error.internal, 
536                    "SSH access parameters required")
537        # keep track of what's been added
538        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
539        aid = unicode(allocID)
540
541        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
542
543        services, svc_state = self.export_services(req.get('service',[]),
544                pname, uname)
545        self.state_lock.acquire()
546        # Store services state in global state
547        for k, v in svc_state.items():
548            self.allocation[aid][k] = v
549        self.write_state()
550        self.state_lock.release()
551        # Give the owners the right to change this allocation
552        for o in owners:
553            self.auth.set_attribute(o, allocID)
554        self.auth.save()
555        try:
556            f = open("%s/%s.pem" % (self.certdir, aid), "w")
557            print >>f, alloc_cert
558            f.close()
559        except EnvironmentError, e:
560            raise service_error(service_error.internal, 
561                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
562        resp = self.build_access_response({ 'fedid': allocID } ,
563                ap, services)
564        return resp
565
566    def do_release_project(self, del_project, del_users, del_types):
567        """
568        If a project and users has to be deleted, make the call.
569        """
570        msg = { 'project': { }}
571        if del_project:
572            msg['project']['name']= {'localname': del_project}
573        users = [ ]
574        for u in del_users.keys():
575            users.append({ 'userID': { 'localname': u },\
576                'access' :  \
577                        [ {'sshPubkey' : s } for s in del_users[u]]\
578            })
579        if users: 
580            msg['project']['user'] = users
581        if len(del_types) > 0:
582            msg['resources'] = { 'node': \
583                    [ {'hardware': [ h ] } for h in del_types ]\
584                }
585        if self.allocate_project.release_project:
586            msg = { 'ReleaseProjectRequestBody' : msg}
587            self.allocate_project.release_project(msg)
588
589    def ReleaseAccess(self, req, fid):
590        # The dance to get into the request body
591        if req.has_key('ReleaseAccessRequestBody'):
592            req = req['ReleaseAccessRequestBody']
593        else:
594            raise service_error(service_error.req, "No request!?")
595
596        try:
597            if req['allocID'].has_key('localname'):
598                auth_attr = aid = req['allocID']['localname']
599            elif req['allocID'].has_key('fedid'):
600                aid = unicode(req['allocID']['fedid'])
601                auth_attr = req['allocID']['fedid']
602            else:
603                raise service_error(service_error.req,
604                        "Only localnames and fedids are understood")
605        except KeyError:
606            raise service_error(service_error.req, "Badly formed request")
607
608        self.log.debug("[access] deallocation requested for %s by %s" % \
609                (aid, fid))
610        if not self.auth.check_attribute(fid, auth_attr):
611            self.log.debug("[access] deallocation denied for %s", aid)
612            raise service_error(service_error.access, "Access Denied")
613
614        # If we know this allocation, reduce the reference counts and
615        # remove the local allocations.  Otherwise report an error.  If
616        # there is an allocation to delete, del_users will be a dictonary
617        # of sets where the key is the user that owns the keys in the set.
618        # We use a set to avoid duplicates.  del_project is just the name
619        # of any dynamic project to delete.  We're somewhat lazy about
620        # deleting authorization attributes.  Having access to something
621        # that doesn't exist isn't harmful.
622        del_users = { }
623        del_project = None
624        del_types = set()
625
626        self.state_lock.acquire()
627        if aid in self.allocation:
628            self.log.debug("Found allocation for %s" %aid)
629            for k in self.allocation[aid]['keys']:
630                kk = "%s:%s" % k
631                self.keys[kk] -= 1
632                if self.keys[kk] == 0:
633                    if not del_users.has_key(k[0]):
634                        del_users[k[0]] = set()
635                    del_users[k[0]].add(k[1])
636                    del self.keys[kk]
637
638            if 'project' in self.allocation[aid]:
639                pname = self.allocation[aid]['project']
640                self.projects[pname] -= 1
641                if self.projects[pname] == 0:
642                    del_project = pname
643                    del self.projects[pname]
644
645            if 'types' in self.allocation[aid]:
646                for t in self.allocation[aid]['types']:
647                    self.types[t] -= 1
648                    if self.types[t] == 0:
649                        if not del_project: del_project = t[0]
650                        del_types.add(t[1])
651                        del self.types[t]
652
653            del self.allocation[aid]
654            self.write_state()
655            self.state_lock.release()
656            # If we actually have resources to deallocate, prepare the call.
657            if del_project or del_users:
658                self.do_release_project(del_project, del_users, del_types)
659            # And remove the access cert
660            cf = "%s/%s.pem" % (self.certdir, aid)
661            self.log.debug("Removing %s" % cf)
662            os.remove(cf)
663            return { 'allocID': req['allocID'] } 
664        else:
665            self.state_lock.release()
666            raise service_error(service_error.req, "No such allocation")
667
668    # These are subroutines for StartSegment
669    def generate_ns2(self, topo, expfn, softdir, connInfo):
670        """
671        Convert topo into an ns2 file, decorated with appropriate commands for
672        the particular testbed setup.  Convert all requests for software, etc
673        to point at the staged copies on this testbed and add the federation
674        startcommands.
675        """
676        class dragon_commands:
677            """
678            Functor to spit out approrpiate dragon commands for nodes listed in
679            the connectivity description.  The constructor makes a dict mapping
680            dragon nodes to their parameters and the __call__ checks each
681            element in turn for membership.
682            """
683            def __init__(self, map):
684                self.node_info = map
685
686            def __call__(self, e):
687                s = ""
688                if isinstance(e, topdl.Computer):
689                    if self.node_info.has_key(e.name):
690                        info = self.node_info[e.name]
691                        for ifname, vlan, type in info:
692                            for i in e.interface:
693                                if i.name == ifname:
694                                    addr = i.get_attribute('ip4_address')
695                                    subs = i.substrate[0]
696                                    break
697                            else:
698                                raise service_error(service_error.internal,
699                                        "No interface %s on element %s" % \
700                                                (ifname, e.name))
701                            # XXX: do netmask right
702                            if type =='link':
703                                s = ("tb-allow-external ${%s} " + \
704                                        "dragonportal ip %s vlan %s " + \
705                                        "netmask 255.255.255.0\n") % \
706                                        (topdl.to_tcl_name(e.name), addr, vlan)
707                            elif type =='lan':
708                                s = ("tb-allow-external ${%s} " + \
709                                        "dragonportal " + \
710                                        "ip %s vlan %s usurp %s\n") % \
711                                        (topdl.to_tcl_name(e.name), addr, 
712                                                vlan, subs)
713                            else:
714                                raise service_error(service_error_internal,
715                                        "Unknown DRAGON type %s" % type)
716                return s
717
718        class not_dragon:
719            """
720            Return true if a node is in the given map of dragon nodes.
721            """
722            def __init__(self, map):
723                self.nodes = set(map.keys())
724
725            def __call__(self, e):
726                return e.name not in self.nodes
727
728        # Main line of generate_ns2
729        t = topo.clone()
730
731        # Create the map of nodes that need direct connections (dragon
732        # connections) from the connInfo
733        dragon_map = { }
734        for i in [ i for i in connInfo if i['type'] == 'transit']:
735            for a in i.get('fedAttr', []):
736                if a['attribute'] == 'vlan_id':
737                    vlan = a['value']
738                    break
739            else:
740                raise service_error(service_error.internal, "No vlan tag")
741            members = i.get('member', [])
742            if len(members) > 1: type = 'lan'
743            else: type = 'link'
744
745            try:
746                for m in members:
747                    if m['element'] in dragon_map:
748                        dragon_map[m['element']].append(( m['interface'], 
749                            vlan, type))
750                    else:
751                        dragon_map[m['element']] = [( m['interface'], 
752                            vlan, type),]
753            except KeyError:
754                raise service_error(service_error.req,
755                        "Missing connectivity info")
756
757        # Weed out the things we aren't going to instantiate: Segments, portal
758        # substrates, and portal interfaces.  (The copy in the for loop allows
759        # us to delete from e.elements in side the for loop).  While we're
760        # touching all the elements, we also adjust paths from the original
761        # testbed to local testbed paths and put the federation commands into
762        # the start commands
763        for e in [e for e in t.elements]:
764            if isinstance(e, topdl.Segment):
765                t.elements.remove(e)
766            if isinstance(e, topdl.Computer):
767                self.add_kit(e, self.federation_software)
768                if e.get_attribute('portal') and self.portal_startcommand:
769                    # Add local portal support software
770                    self.add_kit(e, self.portal_software)
771                    # Portals never have a user-specified start command
772                    e.set_attribute('startup', self.portal_startcommand)
773                elif self.node_startcommand:
774                    if e.get_attribute('startup'):
775                        e.set_attribute('startup', "%s \\$USER '%s'" % \
776                                (self.node_startcommand, 
777                                    e.get_attribute('startup')))
778                    else:
779                        e.set_attribute('startup', self.node_startcommand)
780
781                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
782                # Remove portal interfaces that do not connect to DRAGON
783                e.interface = [i for i in e.interface \
784                        if not i.get_attribute('portal') or i.name in dinf ]
785            # Fix software paths
786            for s in getattr(e, 'software', []):
787                s.location = re.sub("^.*/", softdir, s.location)
788
789        t.substrates = [ s.clone() for s in t.substrates ]
790        t.incorporate_elements()
791
792        # Customize the ns2 output for local portal commands and images
793        filters = []
794
795        if self.dragon_endpoint:
796            add_filter = not_dragon(dragon_map)
797            filters.append(dragon_commands(dragon_map))
798        else:
799            add_filter = None
800
801        if self.portal_command:
802            filters.append(topdl.generate_portal_command_filter(
803                self.portal_command, add_filter=add_filter))
804
805        if self.portal_image:
806            filters.append(topdl.generate_portal_image_filter(
807                self.portal_image))
808
809        if self.portal_type:
810            filters.append(topdl.generate_portal_hardware_filter(
811                self.portal_type))
812
813        # Convert to ns and write it out
814        expfile = topdl.topology_to_ns2(t, filters)
815        try:
816            f = open(expfn, "w")
817            print >>f, expfile
818            f.close()
819        except EnvironmentError:
820            raise service_error(service_error.internal,
821                    "Cannot write experiment file %s: %s" % (expfn,e))
822
823    def export_store_info(self, cf, proj, ename, connInfo):
824        """
825        For the export requests in the connection info, install the peer names
826        at the experiment controller via SetValue calls.
827        """
828
829        for c in connInfo:
830            for p in [ p for p in c.get('parameter', []) \
831                    if p.get('type', '') == 'output']:
832
833                if p.get('name', '') == 'peer':
834                    k = p.get('key', None)
835                    surl = p.get('store', None)
836                    if surl and k and k.index('/') != -1:
837                        value = "%s.%s.%s%s" % \
838                                (k[k.index('/')+1:], ename, proj, self.domain)
839                        req = { 'name': k, 'value': value }
840                        self.log.debug("Setting %s to %s on %s" % \
841                                (k, value, surl))
842                        self.call_SetValue(surl, req, cf)
843                    else:
844                        self.log.error("Bad export request: %s" % p)
845                elif p.get('name', '') == 'ssh_port':
846                    k = p.get('key', None)
847                    surl = p.get('store', None)
848                    if surl and k:
849                        req = { 'name': k, 'value': self.ssh_port }
850                        self.log.debug("Setting %s to %s on %s" % \
851                                (k, self.ssh_port, surl))
852                        self.call_SetValue(surl, req, cf)
853                    else:
854                        self.log.error("Bad export request: %s" % p)
855                else:
856                    self.log.error("Unknown export parameter: %s" % \
857                            p.get('name'))
858                    continue
859
860    def add_seer_node(self, topo, name, startup):
861        """
862        Add a seer node to the given topology, with the startup command passed
863        in.  Used by configure seer_services.
864        """
865        c_node = topdl.Computer(
866                name=name, 
867                os= topdl.OperatingSystem(
868                    attribute=[
869                    { 'attribute': 'osid', 
870                        'value': self.local_seer_image },
871                    ]),
872                attribute=[
873                    { 'attribute': 'startup', 'value': startup },
874                    ]
875                )
876        self.add_kit(c_node, self.local_seer_software)
877        topo.elements.append(c_node)
878
879    def configure_seer_services(self, services, topo, softdir):
880        """
881        Make changes to the topology required for the seer requests being made.
882        Specifically, add any control or master nodes required and set up the
883        start commands on the nodes to interconnect them.
884        """
885        local_seer = False      # True if we need to add a control node
886        collect_seer = False    # True if there is a seer-master node
887        seer_master= False      # True if we need to add the seer-master
888        for s in services:
889            s_name = s.get('name', '')
890            s_vis = s.get('visibility','')
891
892            if s_name  == 'local_seer_control' and s_vis == 'export':
893                local_seer = True
894            elif s_name == 'seer_master':
895                if s_vis == 'import':
896                    collect_seer = True
897                elif s_vis == 'export':
898                    seer_master = True
899       
900        # We've got the whole picture now, so add nodes if needed and configure
901        # them to interconnect properly.
902        if local_seer or seer_master:
903            # Copy local seer control node software to the tempdir
904            for l, f in self.local_seer_software:
905                base = os.path.basename(f)
906                copy_file(f, "%s/%s" % (softdir, base))
907        # If we're collecting seers somewhere the controllers need to talk to
908        # the master.  In testbeds that export the service, that will be a
909        # local node that we'll add below.  Elsewhere it will be the control
910        # portal that will port forward to the exporting master.
911        if local_seer:
912            if collect_seer:
913                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
914            else:
915                startup = self.local_seer_start
916            self.add_seer_node(topo, 'control', startup)
917        # If this is the seer master, add that node, too.
918        if seer_master:
919            self.add_seer_node(topo, 'seer-master', 
920                    "%s -R -n -R seer-master -R -A -R sink" % \
921                            self.seer_master_start)
922
923    def retrieve_software(self, topo, certfile, softdir):
924        """
925        Collect the software that nodes in the topology need loaded and stage
926        it locally.  This implies retrieving it from the experiment_controller
927        and placing it into softdir.  Certfile is used to prove that this node
928        has access to that data (it's the allocation/segment fedid).  Finally
929        local portal and federation software is also copied to the same staging
930        directory for simplicity - all software needed for experiment creation
931        is in softdir.
932        """
933        sw = set()
934        for e in topo.elements:
935            for s in getattr(e, 'software', []):
936                sw.add(s.location)
937        for s in sw:
938            self.log.debug("Retrieving %s" % s)
939            try:
940                get_url(s, certfile, softdir)
941            except:
942                t, v, st = sys.exc_info()
943                raise service_error(service_error.internal,
944                        "Error retrieving %s: %s" % (s, v))
945
946        # Copy local federation and portal node software to the tempdir
947        for s in (self.federation_software, self.portal_software):
948            for l, f in s:
949                base = os.path.basename(f)
950                copy_file(f, "%s/%s" % (softdir, base))
951
952
953    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
954        """
955        Gather common configuration files, retrieve or create an experiment
956        name and project name, and return the ssh_key filenames.  Create an
957        allocation log bound to the state log variable as well.
958        """
959        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
960        ename = None
961        pubkey_base = None
962        secretkey_base = None
963        proj = None
964        user = None
965        alloc_log = None
966
967        for a in attrs:
968            if a['attribute'] in configs:
969                try:
970                    self.log.debug("Retrieving %s from %s" % \
971                            (a['attribute'], a['value']))
972                    get_url(a['value'], certfile, tmpdir)
973                except:
974                    t, v, st = sys.exc_info()
975                    raise service_error(service_error.internal,
976                            "Error retrieving %s: %s" % (a.get('value', ""), v))
977            if a['attribute'] == 'ssh_pubkey':
978                pubkey_base = a['value'].rpartition('/')[2]
979            if a['attribute'] == 'ssh_secretkey':
980                secretkey_base = a['value'].rpartition('/')[2]
981            if a['attribute'] == 'experiment_name':
982                ename = a['value']
983
984        if not ename:
985            ename = ""
986            for i in range(0,5):
987                ename += random.choice(string.ascii_letters)
988            self.log.warn("No experiment name: picked one randomly: %s" \
989                    % ename)
990
991        if not pubkey_base:
992            raise service_error(service_error.req, 
993                    "No public key attribute")
994
995        if not secretkey_base:
996            raise service_error(service_error.req, 
997                    "No secret key attribute")
998
999        self.state_lock.acquire()
1000        if aid in self.allocation:
1001            proj = self.allocation[aid].get('project', None)
1002            if not proj: 
1003                proj = self.allocation[aid].get('sproject', None)
1004            user = self.allocation[aid].get('user', None)
1005            self.allocation[aid]['experiment'] = ename
1006            self.allocation[aid]['log'] = [ ]
1007            # Create a logger that logs to the experiment's state object as
1008            # well as to the main log file.
1009            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1010            h = logging.StreamHandler(
1011                    list_log.list_log(self.allocation[aid]['log']))
1012            # XXX: there should be a global one of these rather than
1013            # repeating the code.
1014            h.setFormatter(logging.Formatter(
1015                "%(asctime)s %(name)s %(message)s",
1016                        '%d %b %y %H:%M:%S'))
1017            alloc_log.addHandler(h)
1018            self.write_state()
1019        self.state_lock.release()
1020
1021        if not proj:
1022            raise service_error(service_error.internal, 
1023                    "Can't find project for %s" %aid)
1024
1025        if not user:
1026            raise service_error(service_error.internal, 
1027                    "Can't find creation user for %s" %aid)
1028
1029        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1030
1031    def finalize_experiment(self, starter, topo, aid, alloc_id):
1032        """
1033        Store key bits of experiment state in the global repository, including
1034        the response that may need to be replayed, and return the response.
1035        """
1036        # Copy the assigned names into the return topology
1037        embedding = [ ]
1038        for n in starter.node:
1039            embedding.append({ 
1040                'toponame': n,
1041                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1042                })
1043        # Grab the log (this is some anal locking, but better safe than
1044        # sorry)
1045        self.state_lock.acquire()
1046        logv = "".join(self.allocation[aid]['log'])
1047        # It's possible that the StartSegment call gets retried (!).
1048        # if the 'started' key is in the allocation, we'll return it rather
1049        # than redo the setup.
1050        self.allocation[aid]['started'] = { 
1051                'allocID': alloc_id,
1052                'allocationLog': logv,
1053                'segmentdescription': { 
1054                    'topdldescription': topo.clone().to_dict()
1055                    },
1056                'embedding': embedding
1057                }
1058        retval = copy.copy(self.allocation[aid]['started'])
1059        self.write_state()
1060        self.state_lock.release()
1061        return retval
1062   
1063    # End of StartSegment support routines
1064
1065    def StartSegment(self, req, fid):
1066        err = None  # Any service_error generated after tmpdir is created
1067        rv = None   # Return value from segment creation
1068
1069        try:
1070            req = req['StartSegmentRequestBody']
1071            auth_attr = req['allocID']['fedid']
1072            topref = req['segmentdescription']['topdldescription']
1073        except KeyError:
1074            raise service_error(server_error.req, "Badly formed request")
1075
1076        connInfo = req.get('connection', [])
1077        services = req.get('service', [])
1078        aid = "%s" % auth_attr
1079        attrs = req.get('fedAttr', [])
1080        if not self.auth.check_attribute(fid, auth_attr):
1081            raise service_error(service_error.access, "Access denied")
1082        else:
1083            # See if this is a replay of an earlier succeeded StartSegment -
1084            # sometimes SSL kills 'em.  If so, replay the response rather than
1085            # redoing the allocation.
1086            self.state_lock.acquire()
1087            retval = self.allocation[aid].get('started', None)
1088            self.state_lock.release()
1089            if retval:
1090                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1091                        "replaying response")
1092                return retval
1093
1094        # A new request.  Do it.
1095
1096        if topref: topo = topdl.Topology(**topref)
1097        else:
1098            raise service_error(service_error.req, 
1099                    "Request missing segmentdescription'")
1100       
1101        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1102        try:
1103            tmpdir = tempfile.mkdtemp(prefix="access-")
1104            softdir = "%s/software" % tmpdir
1105            os.mkdir(softdir)
1106        except EnvironmentError:
1107            raise service_error(service_error.internal, "Cannot create tmp dir")
1108
1109        # Try block alllows us to clean up temporary files.
1110        try:
1111            self.retrieve_software(topo, certfile, softdir)
1112            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1113                    self.initialize_experiment_info(attrs, aid, 
1114                            certfile, tmpdir)
1115
1116            # Set up userconf and seer if needed
1117            self.configure_userconf(services, tmpdir)
1118            self.configure_seer_services(services, topo, softdir)
1119            # Get and send synch store variables
1120            self.export_store_info(certfile, proj, ename, connInfo)
1121            self.import_store_info(certfile, connInfo)
1122
1123            expfile = "%s/experiment.tcl" % tmpdir
1124
1125            self.generate_portal_configs(topo, pubkey_base, 
1126                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1127            self.generate_ns2(topo, expfile, 
1128                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1129
1130            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1131                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1132                    cert=self.xmlrpc_cert)
1133            rv = starter(self, ename, proj, user, expfile, tmpdir)
1134        except service_error, e:
1135            err = e
1136        except:
1137            t, v, st = sys.exc_info()
1138            err = service_error(service_error.internal, "%s: %s" % \
1139                    (v, traceback.extract_tb(st)))
1140
1141        # Walk up tmpdir, deleting as we go
1142        if self.cleanup: self.remove_dirs(tmpdir)
1143        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1144
1145        if rv:
1146            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1147        elif err:
1148            raise service_error(service_error.federant,
1149                    "Swapin failed: %s" % err)
1150        else:
1151            raise service_error(service_error.federant, "Swapin failed")
1152
1153    def TerminateSegment(self, req, fid):
1154        try:
1155            req = req['TerminateSegmentRequestBody']
1156        except KeyError:
1157            raise service_error(server_error.req, "Badly formed request")
1158
1159        auth_attr = req['allocID']['fedid']
1160        aid = "%s" % auth_attr
1161        attrs = req.get('fedAttr', [])
1162        if not self.auth.check_attribute(fid, auth_attr):
1163            raise service_error(service_error.access, "Access denied")
1164
1165        self.state_lock.acquire()
1166        if aid in self.allocation:
1167            proj = self.allocation[aid].get('project', None)
1168            if not proj: 
1169                proj = self.allocation[aid].get('sproject', None)
1170            user = self.allocation[aid].get('user', None)
1171            ename = self.allocation[aid].get('experiment', None)
1172        else:
1173            proj = None
1174            user = None
1175            ename = None
1176        self.state_lock.release()
1177
1178        if not proj:
1179            raise service_error(service_error.internal, 
1180                    "Can't find project for %s" % aid)
1181
1182        if not user:
1183            raise service_error(service_error.internal, 
1184                    "Can't find creation user for %s" % aid)
1185        if not ename:
1186            raise service_error(service_error.internal, 
1187                    "Can't find experiment name for %s" % aid)
1188        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1189                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1190        stopper(self, user, proj, ename)
1191        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.