source: fedd/federation/emulab_access.py @ 4692a16

axis_examplecompt_changesinfo-ops
Last change on this file since 4692a16 was 4692a16, checked in by Ted Faber <faber@…>, 13 years ago

Gratuitious use of lambda

  • Property mode set to 100644
File size: 39.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        # XXX: this should go?
107        #if config.has_option("access", "accessdb"):
108        #    self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'legacy'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'legacy':
120            self.access = { }
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            self.access = [ ]
126            if accessdb:
127                self.read_access(accessdb, self.access_tuple)
128        else:
129            raise service_error(service_error.internal, 
130                    "Unknown auth_type: %s" % self.auth_type)
131
132        # read_state in the base_class
133        self.state_lock.acquire()
134        for a  in ('allocation', 'projects', 'keys', 'types'):
135            if a not in self.state:
136                self.state[a] = { }
137        self.allocation = self.state['allocation']
138        self.projects = self.state['projects']
139        self.keys = self.state['keys']
140        self.types = self.state['types']
141        if self.auth_type == "legacy": 
142            # Add the ownership attributes to the authorizer.  Note that the
143            # indices of the allocation dict are strings, but the attributes are
144            # fedids, so there is a conversion.
145            for k in self.allocation.keys():
146                for o in self.allocation[k].get('owners', []):
147                    self.auth.set_attribute(o, fedid(hexstr=k))
148                if self.allocation[k].has_key('userconfig'):
149                    sfid = self.allocation[k]['userconfig']
150                    fid = fedid(hexstr=sfid)
151                    self.auth.set_attribute(fid, "/%s" % sfid)
152        self.state_lock.release()
153        self.exports = {
154                'SMB': self.export_SMB,
155                'seer': self.export_seer,
156                'tmcd': self.export_tmcd,
157                'userconfig': self.export_userconfig,
158                'project_export': self.export_project_export,
159                'local_seer_control': self.export_local_seer,
160                'seer_master': self.export_seer_master,
161                'hide_hosts': self.export_hide_hosts,
162                }
163
164        if not self.local_seer_image or not self.local_seer_software or \
165                not self.local_seer_start:
166            if 'local_seer_control' in self.exports:
167                del self.exports['local_seer_control']
168
169        if not self.local_seer_image or not self.local_seer_software or \
170                not self.seer_master_start:
171            if 'seer_master' in self.exports:
172                del self.exports['seer_master']
173
174
175        self.soap_services = {\
176            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
177            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
178            'StartSegment': soap_handler("StartSegment", self.StartSegment),
179            'TerminateSegment': soap_handler("TerminateSegment",
180                self.TerminateSegment),
181            }
182        self.xmlrpc_services =  {\
183            'RequestAccess': xmlrpc_handler('RequestAccess',
184                self.RequestAccess),
185            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
186                self.ReleaseAccess),
187            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
188            'TerminateSegment': xmlrpc_handler('TerminateSegment',
189                self.TerminateSegment),
190            }
191
192        self.call_SetValue = service_caller('SetValue')
193        self.call_GetValue = service_caller('GetValue', log=self.log)
194
195        if not config.has_option("allocate", "uri"):
196            self.allocate_project = \
197                allocate_project_local(config, auth)
198        else:
199            self.allocate_project = \
200                allocate_project_remote(config, auth)
201
202
203        # If the project allocator exports services, put them in this object's
204        # maps so that classes that instantiate this can call the services.
205        self.soap_services.update(self.allocate_project.soap_services)
206        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
207
208    @staticmethod
209    def legacy_access_tuple(str):
210        """
211        Convert a string of the form (id[:resources:resouces], id, id) into a
212        tuple of the form (project, user, user) where users may be names or
213        fedids.  The resources strings are obsolete and ignored.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = first[0]
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def access_tuple(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            # The slice takes the parens off the string.
250            proj, user = str[1:-1].split(',')
251            return (proj.strip(), user.strip(), user.strip())
252        else:
253            raise self.parse_error(
254                    'Bad mapping (unbalanced parens or more than 1 comma)')
255
256
257    # RequestAccess support routines
258
259    def legacy_lookup_access(self, req, fid):
260        """
261        Look up the local access control information mapped to this fedid and
262        credentials.  In this case it is a (project, create_user, access_user)
263        triple, and a triple of three booleans indicating which, if any will
264        need to be dynamically created.  Finally a list of owners for that
265        allocation is returned.
266
267        lookup_access_base pulls the first triple out, and it is parsed by this
268        routine into the boolean map.  Owners is always the controlling fedid.
269        """
270        # Return values
271        rp = None
272        ru = None
273        # This maps a valid user to the Emulab projects and users to use
274        found, match = self.legacy_lookup_access_base(req, fid)
275        tb, project, user = match
276       
277        if found == None:
278            raise service_error(service_error.access,
279                    "Access denied - cannot map access")
280
281        # resolve <dynamic> and <same> in found
282        dyn_proj = False
283        dyn_create_user = False
284        dyn_service_user = False
285
286        if found[0] == "<same>":
287            if project != None:
288                rp = project
289            else : 
290                raise service_error(\
291                        service_error.server_config,
292                        "Project matched <same> when no project given")
293        elif found[0] == "<dynamic>":
294            rp = None
295            dyn_proj = True
296        else:
297            rp = found[0]
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def lookup_access(self, req, fid, filter=None, compare=None): 
331        """
332        Check all the attributes that this controller knows how to map and see
333        if the requester is allowed to use any of them.  If so return one.
334        Filter defined the objects to check - it's a function that returns true
335        for the objects to check - and cmp defines the order to check them in
336        as the cmp field of sorted().  If filter is None, all possibilities are
337        checked.  If cmp is None, the choices are sorted by priority.
338        """
339
340        # Import request credentials into this (clone later??)
341        if self.auth.import_credentials(
342                data_list=req.get('abac_credential', [])):
343            self.auth.save()
344
345        # NB: in the default case (the else), the comparison order is reversed
346        # so numerically larger priorities are checked first.
347        if compare: c = compare
348        else: c = lambda(a, b): cmp(b,a)
349
350        if filter: f = filter
351        else: f = lambda(x): True
352
353        check = sorted([ a for a in self.access if f(a)], cmp=c)
354
355        # Check every attribute that we know how to map and take the first
356        # success.
357        for attr in check:
358            if self.auth.check_attribute(fid, attr.attr):
359                self.log.debug("Access succeeded for %s %s" % (attr.attr, fid))
360                # XXX: needs to deal with dynamics
361                return copy.copy(attr.value), (False, False, False), \
362                        [ fid ]
363            else:
364                self.log.debug("Access failed for %s %s" % (attr.attr, fid))
365        else:
366            raise service_error(service_error.access, "Access denied")
367
368
369    def do_project_allocation(self, dyn, project, user):
370        """
371        Call the project allocation routines and return the info.
372        """
373        if dyn: 
374            # Compose the dynamic project request
375            # (only dynamic, dynamic currently allowed)
376            preq = { 'AllocateProjectRequestBody': \
377                        { 'project' : {\
378                            'user': [ \
379                            { \
380                                'access': [ { 
381                                    'sshPubkey': self.ssh_pubkey_file } ],
382                                 'role': "serviceAccess",\
383                            }, \
384                            { \
385                                'access': [ { 
386                                    'sshPubkey': self.ssh_pubkey_file } ],
387                                 'role': "experimentCreation",\
388                            }, \
389                            ], \
390                            }\
391                        }\
392                    }
393            return self.allocate_project.dynamic_project(preq)
394        else:
395            preq = {'StaticProjectRequestBody' : \
396                    { 'project': \
397                        { 'name' : { 'localname' : project },\
398                          'user' : [ \
399                            {\
400                                'userID': { 'localname' : user }, \
401                                'access': [ { 
402                                    'sshPubkey': self.ssh_pubkey_file } ],
403                                'role': 'experimentCreation'\
404                            },\
405                            {\
406                                'userID': { 'localname' : user}, \
407                                'access': [ { 
408                                    'sshPubkey': self.ssh_pubkey_file } ],
409                                'role': 'serviceAccess'\
410                            },\
411                        ]}\
412                    }\
413            }
414            return self.allocate_project.static_project(preq)
415
416    def save_project_state(self, aid, ap, dyn, owners):
417        """
418        Parse out and save the information relevant to the project created for
419        this experiment.  That info is largely in ap and owners.  dyn indicates
420        that the project was created dynamically.  Return the user and project
421        names.
422        """
423        self.state_lock.acquire()
424        self.allocation[aid] = { }
425        try:
426            pname = ap['project']['name']['localname']
427        except KeyError:
428            pname = None
429
430        if dyn:
431            if not pname:
432                self.state_lock.release()
433                raise service_error(service_error.internal,
434                        "Misformed allocation response?")
435            if pname in self.projects: self.projects[pname] += 1
436            else: self.projects[pname] = 1
437            self.allocation[aid]['project'] = pname
438        else:
439            # sproject is a static project associated with this allocation.
440            self.allocation[aid]['sproject'] = pname
441
442        self.allocation[aid]['keys'] = [ ]
443
444        try:
445            for u in ap['project']['user']:
446                uname = u['userID']['localname']
447                if u['role'] == 'experimentCreation':
448                    self.allocation[aid]['user'] = uname
449                for k in [ k['sshPubkey'] for k in u['access'] \
450                        if k.has_key('sshPubkey') ]:
451                    kv = "%s:%s" % (uname, k)
452                    if self.keys.has_key(kv): self.keys[kv] += 1
453                    else: self.keys[kv] = 1
454                    self.allocation[aid]['keys'].append((uname, k))
455        except KeyError:
456            self.state_lock.release()
457            raise service_error(service_error.internal,
458                    "Misformed allocation response?")
459
460        self.allocation[aid]['owners'] = owners
461        self.write_state()
462        self.state_lock.release()
463        return (pname, uname)
464
465    # End of RequestAccess support routines
466
467    def RequestAccess(self, req, fid):
468        """
469        Handle the access request.  Proxy if not for us.
470
471        Parse out the fields and make the allocations or rejections if for us,
472        otherwise, assuming we're willing to proxy, proxy the request out.
473        """
474
475        def gateway_hardware(h):
476            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
477            else: return h
478
479        def get_export_project(svcs):
480            """
481            if the service requests includes one to export a project, return
482            that project.
483            """
484            rv = None
485            for s in svcs:
486                if s.get('name', '') == 'project_export' and \
487                        s.get('visibility', '') == 'export':
488                    if not rv: 
489                        for a in s.get('fedAttr', []):
490                            if a.get('attribute', '') == 'project' \
491                                    and 'value' in a:
492                                rv = a['value']
493                    else:
494                        raise service_error(service_error, access, 
495                                'Requesting multiple project exports is ' + \
496                                        'not supported');
497            return rv
498
499        # The dance to get into the request body
500        if req.has_key('RequestAccessRequestBody'):
501            req = req['RequestAccessRequestBody']
502        else:
503            raise service_error(service_error.req, "No request!?")
504
505        # if this includes a project export request, construct a filter such
506        # that only the ABAC attributes mapped to that project are checked for
507        # access.
508        if 'service' in req:
509            ep = get_export_project(req['service'])
510            pf = lambda(a): a.value[0] == ep
511        else:
512            ep = None
513            pf = None
514
515        if self.auth.import_credentials(
516                data_list=req.get('abac_credential', [])):
517            self.auth.save()
518
519        if self.auth_type == "legacy":
520            found, dyn, owners = self.legacy_lookup_access(req, fid)
521        elif self.auth_type == 'abac':
522            found, dyn, owners = self.lookup_access(req, fid, filter=pf)
523        else:
524            raise service_error(service_error.internal, 
525                    'Unknown auth_type: %s' % self.auth_type)
526        ap = None
527
528        # This only happens in legacy lookups, but if this user has access to
529        # the testbed but not the project to be exported, raise the error.
530        if ep and ep != found[0]:
531            raise service_error(service_error.access,
532                    "Cannot export %s" % ep)
533
534        if self.ssh_pubkey_file:
535            ap = self.do_project_allocation(dyn[1], found[0], found[1])
536        else:
537            raise service_error(service_error.internal, 
538                    "SSH access parameters required")
539        # keep track of what's been added
540        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
541        aid = unicode(allocID)
542
543        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
544
545        services, svc_state = self.export_services(req.get('service',[]),
546                pname, uname)
547        self.state_lock.acquire()
548        # Store services state in global state
549        for k, v in svc_state.items():
550            self.allocation[aid][k] = v
551        self.write_state()
552        self.state_lock.release()
553        # Give the owners the right to change this allocation
554        for o in owners:
555            self.auth.set_attribute(o, allocID)
556        self.auth.save()
557        try:
558            f = open("%s/%s.pem" % (self.certdir, aid), "w")
559            print >>f, alloc_cert
560            f.close()
561        except EnvironmentError, e:
562            raise service_error(service_error.internal, 
563                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
564        resp = self.build_access_response({ 'fedid': allocID } ,
565                ap, services)
566        return resp
567
568    def do_release_project(self, del_project, del_users, del_types):
569        """
570        If a project and users has to be deleted, make the call.
571        """
572        msg = { 'project': { }}
573        if del_project:
574            msg['project']['name']= {'localname': del_project}
575        users = [ ]
576        for u in del_users.keys():
577            users.append({ 'userID': { 'localname': u },\
578                'access' :  \
579                        [ {'sshPubkey' : s } for s in del_users[u]]\
580            })
581        if users: 
582            msg['project']['user'] = users
583        if len(del_types) > 0:
584            msg['resources'] = { 'node': \
585                    [ {'hardware': [ h ] } for h in del_types ]\
586                }
587        if self.allocate_project.release_project:
588            msg = { 'ReleaseProjectRequestBody' : msg}
589            self.allocate_project.release_project(msg)
590
591    def ReleaseAccess(self, req, fid):
592        # The dance to get into the request body
593        if req.has_key('ReleaseAccessRequestBody'):
594            req = req['ReleaseAccessRequestBody']
595        else:
596            raise service_error(service_error.req, "No request!?")
597
598        try:
599            if req['allocID'].has_key('localname'):
600                auth_attr = aid = req['allocID']['localname']
601            elif req['allocID'].has_key('fedid'):
602                aid = unicode(req['allocID']['fedid'])
603                auth_attr = req['allocID']['fedid']
604            else:
605                raise service_error(service_error.req,
606                        "Only localnames and fedids are understood")
607        except KeyError:
608            raise service_error(service_error.req, "Badly formed request")
609
610        self.log.debug("[access] deallocation requested for %s by %s" % \
611                (aid, fid))
612        if not self.auth.check_attribute(fid, auth_attr):
613            self.log.debug("[access] deallocation denied for %s", aid)
614            raise service_error(service_error.access, "Access Denied")
615
616        # If we know this allocation, reduce the reference counts and
617        # remove the local allocations.  Otherwise report an error.  If
618        # there is an allocation to delete, del_users will be a dictonary
619        # of sets where the key is the user that owns the keys in the set.
620        # We use a set to avoid duplicates.  del_project is just the name
621        # of any dynamic project to delete.  We're somewhat lazy about
622        # deleting authorization attributes.  Having access to something
623        # that doesn't exist isn't harmful.
624        del_users = { }
625        del_project = None
626        del_types = set()
627
628        self.state_lock.acquire()
629        if aid in self.allocation:
630            self.log.debug("Found allocation for %s" %aid)
631            for k in self.allocation[aid]['keys']:
632                kk = "%s:%s" % k
633                self.keys[kk] -= 1
634                if self.keys[kk] == 0:
635                    if not del_users.has_key(k[0]):
636                        del_users[k[0]] = set()
637                    del_users[k[0]].add(k[1])
638                    del self.keys[kk]
639
640            if 'project' in self.allocation[aid]:
641                pname = self.allocation[aid]['project']
642                self.projects[pname] -= 1
643                if self.projects[pname] == 0:
644                    del_project = pname
645                    del self.projects[pname]
646
647            if 'types' in self.allocation[aid]:
648                for t in self.allocation[aid]['types']:
649                    self.types[t] -= 1
650                    if self.types[t] == 0:
651                        if not del_project: del_project = t[0]
652                        del_types.add(t[1])
653                        del self.types[t]
654
655            del self.allocation[aid]
656            self.write_state()
657            self.state_lock.release()
658            # If we actually have resources to deallocate, prepare the call.
659            if del_project or del_users:
660                self.do_release_project(del_project, del_users, del_types)
661            # And remove the access cert
662            cf = "%s/%s.pem" % (self.certdir, aid)
663            self.log.debug("Removing %s" % cf)
664            os.remove(cf)
665            return { 'allocID': req['allocID'] } 
666        else:
667            self.state_lock.release()
668            raise service_error(service_error.req, "No such allocation")
669
670    # These are subroutines for StartSegment
671    def generate_ns2(self, topo, expfn, softdir, connInfo):
672        """
673        Convert topo into an ns2 file, decorated with appropriate commands for
674        the particular testbed setup.  Convert all requests for software, etc
675        to point at the staged copies on this testbed and add the federation
676        startcommands.
677        """
678        class dragon_commands:
679            """
680            Functor to spit out approrpiate dragon commands for nodes listed in
681            the connectivity description.  The constructor makes a dict mapping
682            dragon nodes to their parameters and the __call__ checks each
683            element in turn for membership.
684            """
685            def __init__(self, map):
686                self.node_info = map
687
688            def __call__(self, e):
689                s = ""
690                if isinstance(e, topdl.Computer):
691                    if self.node_info.has_key(e.name):
692                        info = self.node_info[e.name]
693                        for ifname, vlan, type in info:
694                            for i in e.interface:
695                                if i.name == ifname:
696                                    addr = i.get_attribute('ip4_address')
697                                    subs = i.substrate[0]
698                                    break
699                            else:
700                                raise service_error(service_error.internal,
701                                        "No interface %s on element %s" % \
702                                                (ifname, e.name))
703                            # XXX: do netmask right
704                            if type =='link':
705                                s = ("tb-allow-external ${%s} " + \
706                                        "dragonportal ip %s vlan %s " + \
707                                        "netmask 255.255.255.0\n") % \
708                                        (topdl.to_tcl_name(e.name), addr, vlan)
709                            elif type =='lan':
710                                s = ("tb-allow-external ${%s} " + \
711                                        "dragonportal " + \
712                                        "ip %s vlan %s usurp %s\n") % \
713                                        (topdl.to_tcl_name(e.name), addr, 
714                                                vlan, subs)
715                            else:
716                                raise service_error(service_error_internal,
717                                        "Unknown DRAGON type %s" % type)
718                return s
719
720        class not_dragon:
721            """
722            Return true if a node is in the given map of dragon nodes.
723            """
724            def __init__(self, map):
725                self.nodes = set(map.keys())
726
727            def __call__(self, e):
728                return e.name not in self.nodes
729
730        # Main line of generate_ns2
731        t = topo.clone()
732
733        # Create the map of nodes that need direct connections (dragon
734        # connections) from the connInfo
735        dragon_map = { }
736        for i in [ i for i in connInfo if i['type'] == 'transit']:
737            for a in i.get('fedAttr', []):
738                if a['attribute'] == 'vlan_id':
739                    vlan = a['value']
740                    break
741            else:
742                raise service_error(service_error.internal, "No vlan tag")
743            members = i.get('member', [])
744            if len(members) > 1: type = 'lan'
745            else: type = 'link'
746
747            try:
748                for m in members:
749                    if m['element'] in dragon_map:
750                        dragon_map[m['element']].append(( m['interface'], 
751                            vlan, type))
752                    else:
753                        dragon_map[m['element']] = [( m['interface'], 
754                            vlan, type),]
755            except KeyError:
756                raise service_error(service_error.req,
757                        "Missing connectivity info")
758
759        # Weed out the things we aren't going to instantiate: Segments, portal
760        # substrates, and portal interfaces.  (The copy in the for loop allows
761        # us to delete from e.elements in side the for loop).  While we're
762        # touching all the elements, we also adjust paths from the original
763        # testbed to local testbed paths and put the federation commands into
764        # the start commands
765        for e in [e for e in t.elements]:
766            if isinstance(e, topdl.Segment):
767                t.elements.remove(e)
768            if isinstance(e, topdl.Computer):
769                self.add_kit(e, self.federation_software)
770                if e.get_attribute('portal') and self.portal_startcommand:
771                    # Add local portal support software
772                    self.add_kit(e, self.portal_software)
773                    # Portals never have a user-specified start command
774                    e.set_attribute('startup', self.portal_startcommand)
775                elif self.node_startcommand:
776                    if e.get_attribute('startup'):
777                        e.set_attribute('startup', "%s \\$USER '%s'" % \
778                                (self.node_startcommand, 
779                                    e.get_attribute('startup')))
780                    else:
781                        e.set_attribute('startup', self.node_startcommand)
782
783                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
784                # Remove portal interfaces that do not connect to DRAGON
785                e.interface = [i for i in e.interface \
786                        if not i.get_attribute('portal') or i.name in dinf ]
787            # Fix software paths
788            for s in getattr(e, 'software', []):
789                s.location = re.sub("^.*/", softdir, s.location)
790
791        t.substrates = [ s.clone() for s in t.substrates ]
792        t.incorporate_elements()
793
794        # Customize the ns2 output for local portal commands and images
795        filters = []
796
797        if self.dragon_endpoint:
798            add_filter = not_dragon(dragon_map)
799            filters.append(dragon_commands(dragon_map))
800        else:
801            add_filter = None
802
803        if self.portal_command:
804            filters.append(topdl.generate_portal_command_filter(
805                self.portal_command, add_filter=add_filter))
806
807        if self.portal_image:
808            filters.append(topdl.generate_portal_image_filter(
809                self.portal_image))
810
811        if self.portal_type:
812            filters.append(topdl.generate_portal_hardware_filter(
813                self.portal_type))
814
815        # Convert to ns and write it out
816        expfile = topdl.topology_to_ns2(t, filters)
817        try:
818            f = open(expfn, "w")
819            print >>f, expfile
820            f.close()
821        except EnvironmentError:
822            raise service_error(service_error.internal,
823                    "Cannot write experiment file %s: %s" % (expfn,e))
824
825    def export_store_info(self, cf, proj, ename, connInfo):
826        """
827        For the export requests in the connection info, install the peer names
828        at the experiment controller via SetValue calls.
829        """
830
831        for c in connInfo:
832            for p in [ p for p in c.get('parameter', []) \
833                    if p.get('type', '') == 'output']:
834
835                if p.get('name', '') == 'peer':
836                    k = p.get('key', None)
837                    surl = p.get('store', None)
838                    if surl and k and k.index('/') != -1:
839                        value = "%s.%s.%s%s" % \
840                                (k[k.index('/')+1:], ename, proj, self.domain)
841                        req = { 'name': k, 'value': value }
842                        self.log.debug("Setting %s to %s on %s" % \
843                                (k, value, surl))
844                        self.call_SetValue(surl, req, cf)
845                    else:
846                        self.log.error("Bad export request: %s" % p)
847                elif p.get('name', '') == 'ssh_port':
848                    k = p.get('key', None)
849                    surl = p.get('store', None)
850                    if surl and k:
851                        req = { 'name': k, 'value': self.ssh_port }
852                        self.log.debug("Setting %s to %s on %s" % \
853                                (k, self.ssh_port, surl))
854                        self.call_SetValue(surl, req, cf)
855                    else:
856                        self.log.error("Bad export request: %s" % p)
857                else:
858                    self.log.error("Unknown export parameter: %s" % \
859                            p.get('name'))
860                    continue
861
862    def add_seer_node(self, topo, name, startup):
863        """
864        Add a seer node to the given topology, with the startup command passed
865        in.  Used by configure seer_services.
866        """
867        c_node = topdl.Computer(
868                name=name, 
869                os= topdl.OperatingSystem(
870                    attribute=[
871                    { 'attribute': 'osid', 
872                        'value': self.local_seer_image },
873                    ]),
874                attribute=[
875                    { 'attribute': 'startup', 'value': startup },
876                    ]
877                )
878        self.add_kit(c_node, self.local_seer_software)
879        topo.elements.append(c_node)
880
881    def configure_seer_services(self, services, topo, softdir):
882        """
883        Make changes to the topology required for the seer requests being made.
884        Specifically, add any control or master nodes required and set up the
885        start commands on the nodes to interconnect them.
886        """
887        local_seer = False      # True if we need to add a control node
888        collect_seer = False    # True if there is a seer-master node
889        seer_master= False      # True if we need to add the seer-master
890        for s in services:
891            s_name = s.get('name', '')
892            s_vis = s.get('visibility','')
893
894            if s_name  == 'local_seer_control' and s_vis == 'export':
895                local_seer = True
896            elif s_name == 'seer_master':
897                if s_vis == 'import':
898                    collect_seer = True
899                elif s_vis == 'export':
900                    seer_master = True
901       
902        # We've got the whole picture now, so add nodes if needed and configure
903        # them to interconnect properly.
904        if local_seer or seer_master:
905            # Copy local seer control node software to the tempdir
906            for l, f in self.local_seer_software:
907                base = os.path.basename(f)
908                copy_file(f, "%s/%s" % (softdir, base))
909        # If we're collecting seers somewhere the controllers need to talk to
910        # the master.  In testbeds that export the service, that will be a
911        # local node that we'll add below.  Elsewhere it will be the control
912        # portal that will port forward to the exporting master.
913        if local_seer:
914            if collect_seer:
915                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
916            else:
917                startup = self.local_seer_start
918            self.add_seer_node(topo, 'control', startup)
919        # If this is the seer master, add that node, too.
920        if seer_master:
921            self.add_seer_node(topo, 'seer-master', 
922                    "%s -R -n -R seer-master -R -A -R sink" % \
923                            self.seer_master_start)
924
925    def retrieve_software(self, topo, certfile, softdir):
926        """
927        Collect the software that nodes in the topology need loaded and stage
928        it locally.  This implies retrieving it from the experiment_controller
929        and placing it into softdir.  Certfile is used to prove that this node
930        has access to that data (it's the allocation/segment fedid).  Finally
931        local portal and federation software is also copied to the same staging
932        directory for simplicity - all software needed for experiment creation
933        is in softdir.
934        """
935        sw = set()
936        for e in topo.elements:
937            for s in getattr(e, 'software', []):
938                sw.add(s.location)
939        for s in sw:
940            self.log.debug("Retrieving %s" % s)
941            try:
942                get_url(s, certfile, softdir)
943            except:
944                t, v, st = sys.exc_info()
945                raise service_error(service_error.internal,
946                        "Error retrieving %s: %s" % (s, v))
947
948        # Copy local federation and portal node software to the tempdir
949        for s in (self.federation_software, self.portal_software):
950            for l, f in s:
951                base = os.path.basename(f)
952                copy_file(f, "%s/%s" % (softdir, base))
953
954
955    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
956        """
957        Gather common configuration files, retrieve or create an experiment
958        name and project name, and return the ssh_key filenames.  Create an
959        allocation log bound to the state log variable as well.
960        """
961        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
962        ename = None
963        pubkey_base = None
964        secretkey_base = None
965        proj = None
966        user = None
967        alloc_log = None
968
969        for a in attrs:
970            if a['attribute'] in configs:
971                try:
972                    self.log.debug("Retrieving %s from %s" % \
973                            (a['attribute'], a['value']))
974                    get_url(a['value'], certfile, tmpdir)
975                except:
976                    t, v, st = sys.exc_info()
977                    raise service_error(service_error.internal,
978                            "Error retrieving %s: %s" % (a.get('value', ""), v))
979            if a['attribute'] == 'ssh_pubkey':
980                pubkey_base = a['value'].rpartition('/')[2]
981            if a['attribute'] == 'ssh_secretkey':
982                secretkey_base = a['value'].rpartition('/')[2]
983            if a['attribute'] == 'experiment_name':
984                ename = a['value']
985
986        if not ename:
987            ename = ""
988            for i in range(0,5):
989                ename += random.choice(string.ascii_letters)
990            self.log.warn("No experiment name: picked one randomly: %s" \
991                    % ename)
992
993        if not pubkey_base:
994            raise service_error(service_error.req, 
995                    "No public key attribute")
996
997        if not secretkey_base:
998            raise service_error(service_error.req, 
999                    "No secret key attribute")
1000
1001        self.state_lock.acquire()
1002        if aid in self.allocation:
1003            proj = self.allocation[aid].get('project', None)
1004            if not proj: 
1005                proj = self.allocation[aid].get('sproject', None)
1006            user = self.allocation[aid].get('user', None)
1007            self.allocation[aid]['experiment'] = ename
1008            self.allocation[aid]['log'] = [ ]
1009            # Create a logger that logs to the experiment's state object as
1010            # well as to the main log file.
1011            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1012            h = logging.StreamHandler(
1013                    list_log.list_log(self.allocation[aid]['log']))
1014            # XXX: there should be a global one of these rather than
1015            # repeating the code.
1016            h.setFormatter(logging.Formatter(
1017                "%(asctime)s %(name)s %(message)s",
1018                        '%d %b %y %H:%M:%S'))
1019            alloc_log.addHandler(h)
1020            self.write_state()
1021        self.state_lock.release()
1022
1023        if not proj:
1024            raise service_error(service_error.internal, 
1025                    "Can't find project for %s" %aid)
1026
1027        if not user:
1028            raise service_error(service_error.internal, 
1029                    "Can't find creation user for %s" %aid)
1030
1031        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1032
1033    def finalize_experiment(self, starter, topo, aid, alloc_id):
1034        """
1035        Store key bits of experiment state in the global repository, including
1036        the response that may need to be replayed, and return the response.
1037        """
1038        # Copy the assigned names into the return topology
1039        embedding = [ ]
1040        for n in starter.node:
1041            embedding.append({ 
1042                'toponame': n,
1043                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1044                })
1045        # Grab the log (this is some anal locking, but better safe than
1046        # sorry)
1047        self.state_lock.acquire()
1048        logv = "".join(self.allocation[aid]['log'])
1049        # It's possible that the StartSegment call gets retried (!).
1050        # if the 'started' key is in the allocation, we'll return it rather
1051        # than redo the setup.
1052        self.allocation[aid]['started'] = { 
1053                'allocID': alloc_id,
1054                'allocationLog': logv,
1055                'segmentdescription': { 
1056                    'topdldescription': topo.clone().to_dict()
1057                    },
1058                'embedding': embedding
1059                }
1060        retval = copy.copy(self.allocation[aid]['started'])
1061        self.write_state()
1062        self.state_lock.release()
1063        return retval
1064   
1065    # End of StartSegment support routines
1066
1067    def StartSegment(self, req, fid):
1068        err = None  # Any service_error generated after tmpdir is created
1069        rv = None   # Return value from segment creation
1070
1071        try:
1072            req = req['StartSegmentRequestBody']
1073            auth_attr = req['allocID']['fedid']
1074            topref = req['segmentdescription']['topdldescription']
1075        except KeyError:
1076            raise service_error(server_error.req, "Badly formed request")
1077
1078        connInfo = req.get('connection', [])
1079        services = req.get('service', [])
1080        aid = "%s" % auth_attr
1081        attrs = req.get('fedAttr', [])
1082        if not self.auth.check_attribute(fid, auth_attr):
1083            raise service_error(service_error.access, "Access denied")
1084        else:
1085            # See if this is a replay of an earlier succeeded StartSegment -
1086            # sometimes SSL kills 'em.  If so, replay the response rather than
1087            # redoing the allocation.
1088            self.state_lock.acquire()
1089            retval = self.allocation[aid].get('started', None)
1090            self.state_lock.release()
1091            if retval:
1092                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1093                        "replaying response")
1094                return retval
1095
1096        # A new request.  Do it.
1097
1098        if topref: topo = topdl.Topology(**topref)
1099        else:
1100            raise service_error(service_error.req, 
1101                    "Request missing segmentdescription'")
1102       
1103        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1104        try:
1105            tmpdir = tempfile.mkdtemp(prefix="access-")
1106            softdir = "%s/software" % tmpdir
1107            os.mkdir(softdir)
1108        except EnvironmentError:
1109            raise service_error(service_error.internal, "Cannot create tmp dir")
1110
1111        # Try block alllows us to clean up temporary files.
1112        try:
1113            self.retrieve_software(topo, certfile, softdir)
1114            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1115                    self.initialize_experiment_info(attrs, aid, 
1116                            certfile, tmpdir)
1117
1118            # Set up userconf and seer if needed
1119            self.configure_userconf(services, tmpdir)
1120            self.configure_seer_services(services, topo, softdir)
1121            # Get and send synch store variables
1122            self.export_store_info(certfile, proj, ename, connInfo)
1123            self.import_store_info(certfile, connInfo)
1124
1125            expfile = "%s/experiment.tcl" % tmpdir
1126
1127            self.generate_portal_configs(topo, pubkey_base, 
1128                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1129            self.generate_ns2(topo, expfile, 
1130                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1131
1132            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1133                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1134                    cert=self.xmlrpc_cert)
1135            rv = starter(self, ename, proj, user, expfile, tmpdir)
1136        except service_error, e:
1137            err = e
1138        except:
1139            t, v, st = sys.exc_info()
1140            err = service_error(service_error.internal, "%s: %s" % \
1141                    (v, traceback.extract_tb(st)))
1142
1143        # Walk up tmpdir, deleting as we go
1144        if self.cleanup: self.remove_dirs(tmpdir)
1145        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1146
1147        if rv:
1148            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1149        elif err:
1150            raise service_error(service_error.federant,
1151                    "Swapin failed: %s" % err)
1152        else:
1153            raise service_error(service_error.federant, "Swapin failed")
1154
1155    def TerminateSegment(self, req, fid):
1156        try:
1157            req = req['TerminateSegmentRequestBody']
1158        except KeyError:
1159            raise service_error(server_error.req, "Badly formed request")
1160
1161        auth_attr = req['allocID']['fedid']
1162        aid = "%s" % auth_attr
1163        attrs = req.get('fedAttr', [])
1164        if not self.auth.check_attribute(fid, auth_attr):
1165            raise service_error(service_error.access, "Access denied")
1166
1167        self.state_lock.acquire()
1168        if aid in self.allocation:
1169            proj = self.allocation[aid].get('project', None)
1170            if not proj: 
1171                proj = self.allocation[aid].get('sproject', None)
1172            user = self.allocation[aid].get('user', None)
1173            ename = self.allocation[aid].get('experiment', None)
1174        else:
1175            proj = None
1176            user = None
1177            ename = None
1178        self.state_lock.release()
1179
1180        if not proj:
1181            raise service_error(service_error.internal, 
1182                    "Can't find project for %s" % aid)
1183
1184        if not user:
1185            raise service_error(service_error.internal, 
1186                    "Can't find creation user for %s" % aid)
1187        if not ename:
1188            raise service_error(service_error.internal, 
1189                    "Can't find experiment name for %s" % aid)
1190        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1191                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1192        stopper(self, user, proj, ename)
1193        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.