source: fedd/federation/emulab_access.py @ 78f2668

axis_examplecompt_changesinfo-ops
Last change on this file since 78f2668 was 78f2668, checked in by Ted Faber <faber@…>, 13 years ago

Move some functions from access to legacy_access. Rename functions so abac is the default

  • Property mode set to 100644
File size: 38.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        # XXX: this should go?
108        #if config.has_option("access", "accessdb"):
109        #    self.read_access(config.get("access", "accessdb"))
110        tb = config.get('access', 'testbed')
111        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
112        else: self.testbed = [ ]
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            if accessdb:
126                self.read_access(accessdb, self.access_tuple)
127        else:
128            raise service_error(service_error.internal, 
129                    "Unknown auth_type: %s" % self.auth_type)
130
131        # read_state in the base_class
132        self.state_lock.acquire()
133        for a  in ('allocation', 'projects', 'keys', 'types'):
134            if a not in self.state:
135                self.state[a] = { }
136        self.allocation = self.state['allocation']
137        self.projects = self.state['projects']
138        self.keys = self.state['keys']
139        self.types = self.state['types']
140        if self.auth_type == "legacy": 
141            # Add the ownership attributes to the authorizer.  Note that the
142            # indices of the allocation dict are strings, but the attributes are
143            # fedids, so there is a conversion.
144            for k in self.allocation.keys():
145                for o in self.allocation[k].get('owners', []):
146                    self.auth.set_attribute(o, fedid(hexstr=k))
147                if self.allocation[k].has_key('userconfig'):
148                    sfid = self.allocation[k]['userconfig']
149                    fid = fedid(hexstr=sfid)
150                    self.auth.set_attribute(fid, "/%s" % sfid)
151        self.state_lock.release()
152        self.exports = {
153                'SMB': self.export_SMB,
154                'seer': self.export_seer,
155                'tmcd': self.export_tmcd,
156                'userconfig': self.export_userconfig,
157                'project_export': self.export_project_export,
158                'local_seer_control': self.export_local_seer,
159                'seer_master': self.export_seer_master,
160                'hide_hosts': self.export_hide_hosts,
161                }
162
163        if not self.local_seer_image or not self.local_seer_software or \
164                not self.local_seer_start:
165            if 'local_seer_control' in self.exports:
166                del self.exports['local_seer_control']
167
168        if not self.local_seer_image or not self.local_seer_software or \
169                not self.seer_master_start:
170            if 'seer_master' in self.exports:
171                del self.exports['seer_master']
172
173
174        self.soap_services = {\
175            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
176            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
177            'StartSegment': soap_handler("StartSegment", self.StartSegment),
178            'TerminateSegment': soap_handler("TerminateSegment",
179                self.TerminateSegment),
180            }
181        self.xmlrpc_services =  {\
182            'RequestAccess': xmlrpc_handler('RequestAccess',
183                self.RequestAccess),
184            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
185                self.ReleaseAccess),
186            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': xmlrpc_handler('TerminateSegment',
188                self.TerminateSegment),
189            }
190
191        self.call_SetValue = service_caller('SetValue')
192        self.call_GetValue = service_caller('GetValue', log=self.log)
193
194        if not config.has_option("allocate", "uri"):
195            self.allocate_project = \
196                allocate_project_local(config, auth)
197        else:
198            self.allocate_project = \
199                allocate_project_remote(config, auth)
200
201
202        # If the project allocator exports services, put them in this object's
203        # maps so that classes that instantiate this can call the services.
204        self.soap_services.update(self.allocate_project.soap_services)
205        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
206
207    @staticmethod
208    def legacy_access_tuple(str):
209        """
210        Convert a string of the form (id[:resources:resouces], id, id) into a
211        tuple of the form (project, user, user) where users may be names or
212        fedids.  The resources strings are obsolete and ignored.
213        """
214        def parse_name(n):
215            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
216            else: return n
217
218        str = str.strip()
219        if str.startswith('(') and str.endswith(')'):
220            str = str[1:-1]
221            names = [ s.strip() for s in str.split(",")]
222            if len(names) > 3:
223                raise self.parse_error("More than three fields in name")
224            first = names[0].split(":")
225            if first == 'fedid:':
226                del first[0]
227                first[0] = fedid(hexstr=first[0])
228            names[0] = first[0]
229
230            for i in range(1,2):
231                names[i] = parse_name(names[i])
232
233            return tuple(names)
234        else:
235            raise self.parse_error('Bad mapping (unbalanced parens)')
236
237    @staticmethod
238    def access_tuple(str):
239        """
240        Convert a string of the form (id, id) into an access_project.  This is
241        called by read_access to convert to local attributes.  It returns
242        a tuple of the form (project, user, user) where the two users are
243        always the same.
244        """
245
246        str = str.strip()
247        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
248            # The slice takes the parens off the string.
249            proj, user = str[1:-1].split(',')
250            return (proj.strip(), user.strip(), user.strip())
251        else:
252            raise self.parse_error(
253                    'Bad mapping (unbalanced parens or more than 1 comma)')
254
255
256    # RequestAccess support routines
257
258    def legacy_lookup_access(self, req, fid):
259        """
260        Look up the local access control information mapped to this fedid and
261        credentials.  In this case it is a (project, create_user, access_user)
262        triple, and a triple of three booleans indicating which, if any will
263        need to be dynamically created.  Finally a list of owners for that
264        allocation is returned.
265
266        lookup_access_base pulls the first triple out, and it is parsed by this
267        routine into the boolean map.  Owners is always the controlling fedid.
268        """
269        # Return values
270        rp = None
271        ru = None
272        # This maps a valid user to the Emulab projects and users to use
273        found, match = self.legacy_lookup_access_base(req, fid)
274        tb, project, user = match
275       
276        if found == None:
277            raise service_error(service_error.access,
278                    "Access denied - cannot map access")
279
280        # resolve <dynamic> and <same> in found
281        dyn_proj = False
282        dyn_create_user = False
283        dyn_service_user = False
284
285        if found[0] == "<same>":
286            if project != None:
287                rp = project
288            else : 
289                raise service_error(\
290                        service_error.server_config,
291                        "Project matched <same> when no project given")
292        elif found[0] == "<dynamic>":
293            rp = None
294            dyn_proj = True
295        else:
296            rp = found[0]
297
298        if found[1] == "<same>":
299            if user_match == "<any>":
300                if user != None: rcu = user[0]
301                else: raise service_error(\
302                        service_error.server_config,
303                        "Matched <same> on anonymous request")
304            else:
305                rcu = user_match
306        elif found[1] == "<dynamic>":
307            rcu = None
308            dyn_create_user = True
309        else:
310            rcu = found[1]
311       
312        if found[2] == "<same>":
313            if user_match == "<any>":
314                if user != None: rsu = user[0]
315                else: raise service_error(\
316                        service_error.server_config,
317                        "Matched <same> on anonymous request")
318            else:
319                rsu = user_match
320        elif found[2] == "<dynamic>":
321            rsu = None
322            dyn_service_user = True
323        else:
324            rsu = found[2]
325
326        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
327                [ fid ]
328
329    def lookup_access(self, req, fid): 
330        """
331        Check all the attributes that this controller knows how to map and see
332        if the requester is allowed to use any of them.  If so return one.
333        """
334        # Import request credentials into this (clone later??)
335        if self.auth.import_credentials(
336                data_list=req.get('abac_credential', [])):
337            self.auth.save()
338
339        # Check every attribute that we know how to map and take the first
340        # success.
341        for attr in (self.access.keys()):
342            if self.auth.check_attribute(fid, attr):
343                # XXX: needs to deal with dynamics
344                return copy.copy(self.access[attr]), (False, False, False), \
345                        [ fid ]
346            else:
347                self.log.debug("Access failed for %s %s" % (attr, fid))
348        else:
349            raise service_error(service_error.access, "Access denied")
350
351
352    def do_project_allocation(self, dyn, project, user):
353        """
354        Call the project allocation routines and return the info.
355        """
356        if dyn: 
357            # Compose the dynamic project request
358            # (only dynamic, dynamic currently allowed)
359            preq = { 'AllocateProjectRequestBody': \
360                        { 'project' : {\
361                            'user': [ \
362                            { \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                 'role': "serviceAccess",\
366                            }, \
367                            { \
368                                'access': [ { 
369                                    'sshPubkey': self.ssh_pubkey_file } ],
370                                 'role': "experimentCreation",\
371                            }, \
372                            ], \
373                            }\
374                        }\
375                    }
376            return self.allocate_project.dynamic_project(preq)
377        else:
378            preq = {'StaticProjectRequestBody' : \
379                    { 'project': \
380                        { 'name' : { 'localname' : project },\
381                          'user' : [ \
382                            {\
383                                'userID': { 'localname' : user }, \
384                                'access': [ { 
385                                    'sshPubkey': self.ssh_pubkey_file } ],
386                                'role': 'experimentCreation'\
387                            },\
388                            {\
389                                'userID': { 'localname' : user}, \
390                                'access': [ { 
391                                    'sshPubkey': self.ssh_pubkey_file } ],
392                                'role': 'serviceAccess'\
393                            },\
394                        ]}\
395                    }\
396            }
397            return self.allocate_project.static_project(preq)
398
399    def save_project_state(self, aid, ap, dyn, owners):
400        """
401        Parse out and save the information relevant to the project created for
402        this experiment.  That info is largely in ap and owners.  dyn indicates
403        that the project was created dynamically.  Return the user and project
404        names.
405        """
406        self.state_lock.acquire()
407        self.allocation[aid] = { }
408        try:
409            pname = ap['project']['name']['localname']
410        except KeyError:
411            pname = None
412
413        if dyn:
414            if not pname:
415                self.state_lock.release()
416                raise service_error(service_error.internal,
417                        "Misformed allocation response?")
418            if pname in self.projects: self.projects[pname] += 1
419            else: self.projects[pname] = 1
420            self.allocation[aid]['project'] = pname
421        else:
422            # sproject is a static project associated with this allocation.
423            self.allocation[aid]['sproject'] = pname
424
425        self.allocation[aid]['keys'] = [ ]
426
427        try:
428            for u in ap['project']['user']:
429                uname = u['userID']['localname']
430                if u['role'] == 'experimentCreation':
431                    self.allocation[aid]['user'] = uname
432                for k in [ k['sshPubkey'] for k in u['access'] \
433                        if k.has_key('sshPubkey') ]:
434                    kv = "%s:%s" % (uname, k)
435                    if self.keys.has_key(kv): self.keys[kv] += 1
436                    else: self.keys[kv] = 1
437                    self.allocation[aid]['keys'].append((uname, k))
438        except KeyError:
439            self.state_lock.release()
440            raise service_error(service_error.internal,
441                    "Misformed allocation response?")
442
443        self.allocation[aid]['owners'] = owners
444        self.write_state()
445        self.state_lock.release()
446        return (pname, uname)
447
448    # End of RequestAccess support routines
449
450    def RequestAccess(self, req, fid):
451        """
452        Handle the access request.  Proxy if not for us.
453
454        Parse out the fields and make the allocations or rejections if for us,
455        otherwise, assuming we're willing to proxy, proxy the request out.
456        """
457
458        def gateway_hardware(h):
459            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
460            else: return h
461
462        def get_export_project(svcs):
463            """
464            if the service requests includes one to export a project, return
465            that project.
466            """
467            rv = None
468            for s in svcs:
469                if s.get('name', '') == 'project_export' and \
470                        s.get('visibility', '') == 'export':
471                    if not rv: 
472                        for a in s.get('feddAttr', []):
473                            if a.get('attribute', '') == 'project' \
474                                    and 'value' in a:
475                                rv = a['value']
476                    else:
477                        raise service_error(service_error, access, 
478                                'Requesting multiple project exports is ' + \
479                                        'not supported');
480            return rv
481
482        # The dance to get into the request body
483        if req.has_key('RequestAccessRequestBody'):
484            req = req['RequestAccessRequestBody']
485        else:
486            raise service_error(service_error.req, "No request!?")
487
488        alog = open("./auth.log", 'w')
489        print >>alog, self.auth
490        print >> alog, "after"
491        if self.auth.import_credentials(
492                data_list=req.get('abac_credential', [])):
493            self.auth.save()
494        print >>alog, self.auth
495        alog.close()
496
497        if self.auth_type == "legacy":
498            found, dyn, owners = self.legacy_lookup_access(req, fid)
499        elif self.auth_type == 'abac':
500            found, dyn, owners = self.lookup_access(req, fid)
501        else:
502            raise service_error(service_error.internal, 
503                    'Unknown auth_type: %s' % self.auth_type)
504        ap = None
505
506        # if this includes a project export request and the exported
507        # project is not the access project, access denied.
508        if 'service' in req:
509            ep = get_export_project(req['service'])
510            if ep and ep != found[0]:
511                raise service_error(service_error.access,
512                        "Cannot export %s" % ep)
513
514        if self.ssh_pubkey_file:
515            ap = self.do_project_allocation(dyn[1], found[0], found[1])
516        else:
517            raise service_error(service_error.internal, 
518                    "SSH access parameters required")
519        # keep track of what's been added
520        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
521        aid = unicode(allocID)
522
523        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
524
525        services, svc_state = self.export_services(req.get('service',[]),
526                pname, uname)
527        self.state_lock.acquire()
528        # Store services state in global state
529        for k, v in svc_state.items():
530            self.allocation[aid][k] = v
531        self.write_state()
532        self.state_lock.release()
533        # Give the owners the right to change this allocation
534        for o in owners:
535            self.auth.set_attribute(o, allocID)
536            print "Set delete rights on %s for %s" % (o, allocID)
537        self.auth.save()
538        try:
539            f = open("%s/%s.pem" % (self.certdir, aid), "w")
540            print >>f, alloc_cert
541            f.close()
542        except EnvironmentError, e:
543            raise service_error(service_error.internal, 
544                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
545        resp = self.build_access_response({ 'fedid': allocID } ,
546                ap, services)
547        return resp
548
549    def do_release_project(self, del_project, del_users, del_types):
550        """
551        If a project and users has to be deleted, make the call.
552        """
553        msg = { 'project': { }}
554        if del_project:
555            msg['project']['name']= {'localname': del_project}
556        users = [ ]
557        for u in del_users.keys():
558            users.append({ 'userID': { 'localname': u },\
559                'access' :  \
560                        [ {'sshPubkey' : s } for s in del_users[u]]\
561            })
562        if users: 
563            msg['project']['user'] = users
564        if len(del_types) > 0:
565            msg['resources'] = { 'node': \
566                    [ {'hardware': [ h ] } for h in del_types ]\
567                }
568        if self.allocate_project.release_project:
569            msg = { 'ReleaseProjectRequestBody' : msg}
570            self.allocate_project.release_project(msg)
571
572    def ReleaseAccess(self, req, fid):
573        # The dance to get into the request body
574        if req.has_key('ReleaseAccessRequestBody'):
575            req = req['ReleaseAccessRequestBody']
576        else:
577            raise service_error(service_error.req, "No request!?")
578
579        try:
580            if req['allocID'].has_key('localname'):
581                auth_attr = aid = req['allocID']['localname']
582            elif req['allocID'].has_key('fedid'):
583                aid = unicode(req['allocID']['fedid'])
584                auth_attr = req['allocID']['fedid']
585            else:
586                raise service_error(service_error.req,
587                        "Only localnames and fedids are understood")
588        except KeyError:
589            raise service_error(service_error.req, "Badly formed request")
590
591        self.log.debug("[access] deallocation requested for %s by %s" % \
592                (aid, fid))
593        if not self.auth.check_attribute(fid, auth_attr):
594            self.log.debug("[access] deallocation denied for %s", aid)
595            raise service_error(service_error.access, "Access Denied")
596
597        # If we know this allocation, reduce the reference counts and
598        # remove the local allocations.  Otherwise report an error.  If
599        # there is an allocation to delete, del_users will be a dictonary
600        # of sets where the key is the user that owns the keys in the set.
601        # We use a set to avoid duplicates.  del_project is just the name
602        # of any dynamic project to delete.  We're somewhat lazy about
603        # deleting authorization attributes.  Having access to something
604        # that doesn't exist isn't harmful.
605        del_users = { }
606        del_project = None
607        del_types = set()
608
609        self.state_lock.acquire()
610        if aid in self.allocation:
611            self.log.debug("Found allocation for %s" %aid)
612            for k in self.allocation[aid]['keys']:
613                kk = "%s:%s" % k
614                self.keys[kk] -= 1
615                if self.keys[kk] == 0:
616                    if not del_users.has_key(k[0]):
617                        del_users[k[0]] = set()
618                    del_users[k[0]].add(k[1])
619                    del self.keys[kk]
620
621            if 'project' in self.allocation[aid]:
622                pname = self.allocation[aid]['project']
623                self.projects[pname] -= 1
624                if self.projects[pname] == 0:
625                    del_project = pname
626                    del self.projects[pname]
627
628            if 'types' in self.allocation[aid]:
629                for t in self.allocation[aid]['types']:
630                    self.types[t] -= 1
631                    if self.types[t] == 0:
632                        if not del_project: del_project = t[0]
633                        del_types.add(t[1])
634                        del self.types[t]
635
636            del self.allocation[aid]
637            self.write_state()
638            self.state_lock.release()
639            # If we actually have resources to deallocate, prepare the call.
640            if del_project or del_users:
641                self.do_release_project(del_project, del_users, del_types)
642            # And remove the access cert
643            cf = "%s/%s.pem" % (self.certdir, aid)
644            self.log.debug("Removing %s" % cf)
645            os.remove(cf)
646            return { 'allocID': req['allocID'] } 
647        else:
648            self.state_lock.release()
649            raise service_error(service_error.req, "No such allocation")
650
651    # These are subroutines for StartSegment
652    def generate_ns2(self, topo, expfn, softdir, connInfo):
653        """
654        Convert topo into an ns2 file, decorated with appropriate commands for
655        the particular testbed setup.  Convert all requests for software, etc
656        to point at the staged copies on this testbed and add the federation
657        startcommands.
658        """
659        class dragon_commands:
660            """
661            Functor to spit out approrpiate dragon commands for nodes listed in
662            the connectivity description.  The constructor makes a dict mapping
663            dragon nodes to their parameters and the __call__ checks each
664            element in turn for membership.
665            """
666            def __init__(self, map):
667                self.node_info = map
668
669            def __call__(self, e):
670                s = ""
671                if isinstance(e, topdl.Computer):
672                    if self.node_info.has_key(e.name):
673                        info = self.node_info[e.name]
674                        for ifname, vlan, type in info:
675                            for i in e.interface:
676                                if i.name == ifname:
677                                    addr = i.get_attribute('ip4_address')
678                                    subs = i.substrate[0]
679                                    break
680                            else:
681                                raise service_error(service_error.internal,
682                                        "No interface %s on element %s" % \
683                                                (ifname, e.name))
684                            # XXX: do netmask right
685                            if type =='link':
686                                s = ("tb-allow-external ${%s} " + \
687                                        "dragonportal ip %s vlan %s " + \
688                                        "netmask 255.255.255.0\n") % \
689                                        (topdl.to_tcl_name(e.name), addr, vlan)
690                            elif type =='lan':
691                                s = ("tb-allow-external ${%s} " + \
692                                        "dragonportal " + \
693                                        "ip %s vlan %s usurp %s\n") % \
694                                        (topdl.to_tcl_name(e.name), addr, 
695                                                vlan, subs)
696                            else:
697                                raise service_error(service_error_internal,
698                                        "Unknown DRAGON type %s" % type)
699                return s
700
701        class not_dragon:
702            """
703            Return true if a node is in the given map of dragon nodes.
704            """
705            def __init__(self, map):
706                self.nodes = set(map.keys())
707
708            def __call__(self, e):
709                return e.name not in self.nodes
710
711        # Main line of generate_ns2
712        t = topo.clone()
713
714        # Create the map of nodes that need direct connections (dragon
715        # connections) from the connInfo
716        dragon_map = { }
717        for i in [ i for i in connInfo if i['type'] == 'transit']:
718            for a in i.get('fedAttr', []):
719                if a['attribute'] == 'vlan_id':
720                    vlan = a['value']
721                    break
722            else:
723                raise service_error(service_error.internal, "No vlan tag")
724            members = i.get('member', [])
725            if len(members) > 1: type = 'lan'
726            else: type = 'link'
727
728            try:
729                for m in members:
730                    if m['element'] in dragon_map:
731                        dragon_map[m['element']].append(( m['interface'], 
732                            vlan, type))
733                    else:
734                        dragon_map[m['element']] = [( m['interface'], 
735                            vlan, type),]
736            except KeyError:
737                raise service_error(service_error.req,
738                        "Missing connectivity info")
739
740        # Weed out the things we aren't going to instantiate: Segments, portal
741        # substrates, and portal interfaces.  (The copy in the for loop allows
742        # us to delete from e.elements in side the for loop).  While we're
743        # touching all the elements, we also adjust paths from the original
744        # testbed to local testbed paths and put the federation commands into
745        # the start commands
746        for e in [e for e in t.elements]:
747            if isinstance(e, topdl.Segment):
748                t.elements.remove(e)
749            if isinstance(e, topdl.Computer):
750                self.add_kit(e, self.federation_software)
751                if e.get_attribute('portal') and self.portal_startcommand:
752                    # Add local portal support software
753                    self.add_kit(e, self.portal_software)
754                    # Portals never have a user-specified start command
755                    e.set_attribute('startup', self.portal_startcommand)
756                elif self.node_startcommand:
757                    if e.get_attribute('startup'):
758                        e.set_attribute('startup', "%s \\$USER '%s'" % \
759                                (self.node_startcommand, 
760                                    e.get_attribute('startup')))
761                    else:
762                        e.set_attribute('startup', self.node_startcommand)
763
764                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
765                # Remove portal interfaces that do not connect to DRAGON
766                e.interface = [i for i in e.interface \
767                        if not i.get_attribute('portal') or i.name in dinf ]
768            # Fix software paths
769            for s in getattr(e, 'software', []):
770                s.location = re.sub("^.*/", softdir, s.location)
771
772        t.substrates = [ s.clone() for s in t.substrates ]
773        t.incorporate_elements()
774
775        # Customize the ns2 output for local portal commands and images
776        filters = []
777
778        if self.dragon_endpoint:
779            add_filter = not_dragon(dragon_map)
780            filters.append(dragon_commands(dragon_map))
781        else:
782            add_filter = None
783
784        if self.portal_command:
785            filters.append(topdl.generate_portal_command_filter(
786                self.portal_command, add_filter=add_filter))
787
788        if self.portal_image:
789            filters.append(topdl.generate_portal_image_filter(
790                self.portal_image))
791
792        if self.portal_type:
793            filters.append(topdl.generate_portal_hardware_filter(
794                self.portal_type))
795
796        # Convert to ns and write it out
797        expfile = topdl.topology_to_ns2(t, filters)
798        try:
799            f = open(expfn, "w")
800            print >>f, expfile
801            f.close()
802        except EnvironmentError:
803            raise service_error(service_error.internal,
804                    "Cannot write experiment file %s: %s" % (expfn,e))
805
806    def export_store_info(self, cf, proj, ename, connInfo):
807        """
808        For the export requests in the connection info, install the peer names
809        at the experiment controller via SetValue calls.
810        """
811
812        for c in connInfo:
813            for p in [ p for p in c.get('parameter', []) \
814                    if p.get('type', '') == 'output']:
815
816                if p.get('name', '') == 'peer':
817                    k = p.get('key', None)
818                    surl = p.get('store', None)
819                    if surl and k and k.index('/') != -1:
820                        value = "%s.%s.%s%s" % \
821                                (k[k.index('/')+1:], ename, proj, self.domain)
822                        req = { 'name': k, 'value': value }
823                        self.log.debug("Setting %s to %s on %s" % \
824                                (k, value, surl))
825                        self.call_SetValue(surl, req, cf)
826                    else:
827                        self.log.error("Bad export request: %s" % p)
828                elif p.get('name', '') == 'ssh_port':
829                    k = p.get('key', None)
830                    surl = p.get('store', None)
831                    if surl and k:
832                        req = { 'name': k, 'value': self.ssh_port }
833                        self.log.debug("Setting %s to %s on %s" % \
834                                (k, self.ssh_port, surl))
835                        self.call_SetValue(surl, req, cf)
836                    else:
837                        self.log.error("Bad export request: %s" % p)
838                else:
839                    self.log.error("Unknown export parameter: %s" % \
840                            p.get('name'))
841                    continue
842
843    def add_seer_node(self, topo, name, startup):
844        """
845        Add a seer node to the given topology, with the startup command passed
846        in.  Used by configure seer_services.
847        """
848        c_node = topdl.Computer(
849                name=name, 
850                os= topdl.OperatingSystem(
851                    attribute=[
852                    { 'attribute': 'osid', 
853                        'value': self.local_seer_image },
854                    ]),
855                attribute=[
856                    { 'attribute': 'startup', 'value': startup },
857                    ]
858                )
859        self.add_kit(c_node, self.local_seer_software)
860        topo.elements.append(c_node)
861
862    def configure_seer_services(self, services, topo, softdir):
863        """
864        Make changes to the topology required for the seer requests being made.
865        Specifically, add any control or master nodes required and set up the
866        start commands on the nodes to interconnect them.
867        """
868        local_seer = False      # True if we need to add a control node
869        collect_seer = False    # True if there is a seer-master node
870        seer_master= False      # True if we need to add the seer-master
871        for s in services:
872            s_name = s.get('name', '')
873            s_vis = s.get('visibility','')
874
875            if s_name  == 'local_seer_control' and s_vis == 'export':
876                local_seer = True
877            elif s_name == 'seer_master':
878                if s_vis == 'import':
879                    collect_seer = True
880                elif s_vis == 'export':
881                    seer_master = True
882       
883        # We've got the whole picture now, so add nodes if needed and configure
884        # them to interconnect properly.
885        if local_seer or seer_master:
886            # Copy local seer control node software to the tempdir
887            for l, f in self.local_seer_software:
888                base = os.path.basename(f)
889                copy_file(f, "%s/%s" % (softdir, base))
890        # If we're collecting seers somewhere the controllers need to talk to
891        # the master.  In testbeds that export the service, that will be a
892        # local node that we'll add below.  Elsewhere it will be the control
893        # portal that will port forward to the exporting master.
894        if local_seer:
895            if collect_seer:
896                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
897            else:
898                startup = self.local_seer_start
899            self.add_seer_node(topo, 'control', startup)
900        # If this is the seer master, add that node, too.
901        if seer_master:
902            self.add_seer_node(topo, 'seer-master', 
903                    "%s -R -n -R seer-master -R -A -R sink" % \
904                            self.seer_master_start)
905
906    def retrieve_software(self, topo, certfile, softdir):
907        """
908        Collect the software that nodes in the topology need loaded and stage
909        it locally.  This implies retrieving it from the experiment_controller
910        and placing it into softdir.  Certfile is used to prove that this node
911        has access to that data (it's the allocation/segment fedid).  Finally
912        local portal and federation software is also copied to the same staging
913        directory for simplicity - all software needed for experiment creation
914        is in softdir.
915        """
916        sw = set()
917        for e in topo.elements:
918            for s in getattr(e, 'software', []):
919                sw.add(s.location)
920        for s in sw:
921            self.log.debug("Retrieving %s" % s)
922            try:
923                get_url(s, certfile, softdir)
924            except:
925                t, v, st = sys.exc_info()
926                raise service_error(service_error.internal,
927                        "Error retrieving %s: %s" % (s, v))
928
929        # Copy local federation and portal node software to the tempdir
930        for s in (self.federation_software, self.portal_software):
931            for l, f in s:
932                base = os.path.basename(f)
933                copy_file(f, "%s/%s" % (softdir, base))
934
935
936    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
937        """
938        Gather common configuration files, retrieve or create an experiment
939        name and project name, and return the ssh_key filenames.  Create an
940        allocation log bound to the state log variable as well.
941        """
942        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
943        ename = None
944        pubkey_base = None
945        secretkey_base = None
946        proj = None
947        user = None
948        alloc_log = None
949
950        for a in attrs:
951            if a['attribute'] in configs:
952                try:
953                    self.log.debug("Retrieving %s from %s" % \
954                            (a['attribute'], a['value']))
955                    get_url(a['value'], certfile, tmpdir)
956                except:
957                    t, v, st = sys.exc_info()
958                    raise service_error(service_error.internal,
959                            "Error retrieving %s: %s" % (a.get('value', ""), v))
960            if a['attribute'] == 'ssh_pubkey':
961                pubkey_base = a['value'].rpartition('/')[2]
962            if a['attribute'] == 'ssh_secretkey':
963                secretkey_base = a['value'].rpartition('/')[2]
964            if a['attribute'] == 'experiment_name':
965                ename = a['value']
966
967        if not ename:
968            ename = ""
969            for i in range(0,5):
970                ename += random.choice(string.ascii_letters)
971            self.log.warn("No experiment name: picked one randomly: %s" \
972                    % ename)
973
974        if not pubkey_base:
975            raise service_error(service_error.req, 
976                    "No public key attribute")
977
978        if not secretkey_base:
979            raise service_error(service_error.req, 
980                    "No secret key attribute")
981
982        self.state_lock.acquire()
983        if aid in self.allocation:
984            proj = self.allocation[aid].get('project', None)
985            if not proj: 
986                proj = self.allocation[aid].get('sproject', None)
987            user = self.allocation[aid].get('user', None)
988            self.allocation[aid]['experiment'] = ename
989            self.allocation[aid]['log'] = [ ]
990            # Create a logger that logs to the experiment's state object as
991            # well as to the main log file.
992            alloc_log = logging.getLogger('fedd.access.%s' % ename)
993            h = logging.StreamHandler(
994                    list_log.list_log(self.allocation[aid]['log']))
995            # XXX: there should be a global one of these rather than
996            # repeating the code.
997            h.setFormatter(logging.Formatter(
998                "%(asctime)s %(name)s %(message)s",
999                        '%d %b %y %H:%M:%S'))
1000            alloc_log.addHandler(h)
1001            self.write_state()
1002        self.state_lock.release()
1003
1004        if not proj:
1005            raise service_error(service_error.internal, 
1006                    "Can't find project for %s" %aid)
1007
1008        if not user:
1009            raise service_error(service_error.internal, 
1010                    "Can't find creation user for %s" %aid)
1011
1012        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1013
1014    def finalize_experiment(self, starter, topo, aid, alloc_id):
1015        """
1016        Store key bits of experiment state in the global repository, including
1017        the response that may need to be replayed, and return the response.
1018        """
1019        # Copy the assigned names into the return topology
1020        embedding = [ ]
1021        for n in starter.node:
1022            embedding.append({ 
1023                'toponame': n,
1024                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1025                })
1026        # Grab the log (this is some anal locking, but better safe than
1027        # sorry)
1028        self.state_lock.acquire()
1029        logv = "".join(self.allocation[aid]['log'])
1030        # It's possible that the StartSegment call gets retried (!).
1031        # if the 'started' key is in the allocation, we'll return it rather
1032        # than redo the setup.
1033        self.allocation[aid]['started'] = { 
1034                'allocID': alloc_id,
1035                'allocationLog': logv,
1036                'segmentdescription': { 
1037                    'topdldescription': topo.clone().to_dict()
1038                    },
1039                'embedding': embedding
1040                }
1041        retval = copy.copy(self.allocation[aid]['started'])
1042        self.write_state()
1043        self.state_lock.release()
1044        return retval
1045   
1046    # End of StartSegment support routines
1047
1048    def StartSegment(self, req, fid):
1049        err = None  # Any service_error generated after tmpdir is created
1050        rv = None   # Return value from segment creation
1051
1052        try:
1053            req = req['StartSegmentRequestBody']
1054            auth_attr = req['allocID']['fedid']
1055            topref = req['segmentdescription']['topdldescription']
1056        except KeyError:
1057            raise service_error(server_error.req, "Badly formed request")
1058
1059        connInfo = req.get('connection', [])
1060        services = req.get('service', [])
1061        aid = "%s" % auth_attr
1062        attrs = req.get('fedAttr', [])
1063        if not self.auth.check_attribute(fid, auth_attr):
1064            raise service_error(service_error.access, "Access denied")
1065        else:
1066            # See if this is a replay of an earlier succeeded StartSegment -
1067            # sometimes SSL kills 'em.  If so, replay the response rather than
1068            # redoing the allocation.
1069            self.state_lock.acquire()
1070            retval = self.allocation[aid].get('started', None)
1071            self.state_lock.release()
1072            if retval:
1073                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1074                        "replaying response")
1075                return retval
1076
1077        # A new request.  Do it.
1078
1079        if topref: topo = topdl.Topology(**topref)
1080        else:
1081            raise service_error(service_error.req, 
1082                    "Request missing segmentdescription'")
1083       
1084        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1085        try:
1086            tmpdir = tempfile.mkdtemp(prefix="access-")
1087            softdir = "%s/software" % tmpdir
1088            os.mkdir(softdir)
1089        except EnvironmentError:
1090            raise service_error(service_error.internal, "Cannot create tmp dir")
1091
1092        # Try block alllows us to clean up temporary files.
1093        try:
1094            self.retrieve_software(topo, certfile, softdir)
1095            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1096                    self.initialize_experiment_info(attrs, aid, 
1097                            certfile, tmpdir)
1098
1099            # Set up userconf and seer if needed
1100            self.configure_userconf(services, tmpdir)
1101            self.configure_seer_services(services, topo, softdir)
1102            # Get and send synch store variables
1103            self.export_store_info(certfile, proj, ename, connInfo)
1104            self.import_store_info(certfile, connInfo)
1105
1106            expfile = "%s/experiment.tcl" % tmpdir
1107
1108            self.generate_portal_configs(topo, pubkey_base, 
1109                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1110            self.generate_ns2(topo, expfile, 
1111                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1112
1113            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1114                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1115                    cert=self.xmlrpc_cert)
1116            rv = starter(self, ename, proj, user, expfile, tmpdir)
1117        except service_error, e:
1118            err = e
1119        except:
1120            t, v, st = sys.exc_info()
1121            err = service_error(service_error.internal, "%s: %s" % \
1122                    (v, traceback.extract_tb(st)))
1123
1124        # Walk up tmpdir, deleting as we go
1125        if self.cleanup: self.remove_dirs(tmpdir)
1126        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1127
1128        if rv:
1129            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1130        elif err:
1131            raise service_error(service_error.federant,
1132                    "Swapin failed: %s" % err)
1133        else:
1134            raise service_error(service_error.federant, "Swapin failed")
1135
1136    def TerminateSegment(self, req, fid):
1137        try:
1138            req = req['TerminateSegmentRequestBody']
1139        except KeyError:
1140            raise service_error(server_error.req, "Badly formed request")
1141
1142        auth_attr = req['allocID']['fedid']
1143        aid = "%s" % auth_attr
1144        attrs = req.get('fedAttr', [])
1145        if not self.auth.check_attribute(fid, auth_attr):
1146            raise service_error(service_error.access, "Access denied")
1147
1148        self.state_lock.acquire()
1149        if aid in self.allocation:
1150            proj = self.allocation[aid].get('project', None)
1151            if not proj: 
1152                proj = self.allocation[aid].get('sproject', None)
1153            user = self.allocation[aid].get('user', None)
1154            ename = self.allocation[aid].get('experiment', None)
1155        else:
1156            proj = None
1157            user = None
1158            ename = None
1159        self.state_lock.release()
1160
1161        if not proj:
1162            raise service_error(service_error.internal, 
1163                    "Can't find project for %s" % aid)
1164
1165        if not user:
1166            raise service_error(service_error.internal, 
1167                    "Can't find creation user for %s" % aid)
1168        if not ename:
1169            raise service_error(service_error.internal, 
1170                    "Can't find experiment name for %s" % aid)
1171        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1172                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1173        stopper(self, user, proj, ename)
1174        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.