source: fedd/federation/emulab_access.py @ 725c55d

axis_examplecompt_changesinfo-ops
Last change on this file since 725c55d was 725c55d, checked in by Ted Faber <faber@…>, 13 years ago

Checkpoint - successful swap in and out

  • Property mode set to 100644
File size: 38.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        # XXX: this should go?
108        #if config.has_option("access", "accessdb"):
109        #    self.read_access(config.get("access", "accessdb"))
110        tb = config.get('access', 'testbed')
111        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
112        else: self.testbed = [ ]
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            if accessdb:
122                self.read_access(accessdb, self.make_access_project)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            if accessdb:
126                self.read_abac_access(accessdb, self.make_abac_access_project)
127        else:
128            raise service_error(service_error.internal, 
129                    "Unknown auth_type: %s" % self.auth_type)
130
131        # read_state in the base_class
132        self.state_lock.acquire()
133        for a  in ('allocation', 'projects', 'keys', 'types'):
134            if a not in self.state:
135                self.state[a] = { }
136        self.allocation = self.state['allocation']
137        self.projects = self.state['projects']
138        self.keys = self.state['keys']
139        self.types = self.state['types']
140        if self.auth_type == "legacy": 
141            # Add the ownership attributes to the authorizer.  Note that the
142            # indices of the allocation dict are strings, but the attributes are
143            # fedids, so there is a conversion.
144            for k in self.allocation.keys():
145                for o in self.allocation[k].get('owners', []):
146                    self.auth.set_attribute(o, fedid(hexstr=k))
147                if self.allocation[k].has_key('userconfig'):
148                    sfid = self.allocation[k]['userconfig']
149                    fid = fedid(hexstr=sfid)
150                    self.auth.set_attribute(fid, "/%s" % sfid)
151        self.state_lock.release()
152        self.exports = {
153                'SMB': self.export_SMB,
154                'seer': self.export_seer,
155                'tmcd': self.export_tmcd,
156                'userconfig': self.export_userconfig,
157                'project_export': self.export_project_export,
158                'local_seer_control': self.export_local_seer,
159                'seer_master': self.export_seer_master,
160                'hide_hosts': self.export_hide_hosts,
161                }
162
163        if not self.local_seer_image or not self.local_seer_software or \
164                not self.local_seer_start:
165            if 'local_seer_control' in self.exports:
166                del self.exports['local_seer_control']
167
168        if not self.local_seer_image or not self.local_seer_software or \
169                not self.seer_master_start:
170            if 'seer_master' in self.exports:
171                del self.exports['seer_master']
172
173
174        self.soap_services = {\
175            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
176            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
177            'StartSegment': soap_handler("StartSegment", self.StartSegment),
178            'TerminateSegment': soap_handler("TerminateSegment",
179                self.TerminateSegment),
180            }
181        self.xmlrpc_services =  {\
182            'RequestAccess': xmlrpc_handler('RequestAccess',
183                self.RequestAccess),
184            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
185                self.ReleaseAccess),
186            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': xmlrpc_handler('TerminateSegment',
188                self.TerminateSegment),
189            }
190
191        self.call_SetValue = service_caller('SetValue')
192        self.call_GetValue = service_caller('GetValue', log=self.log)
193
194        if not config.has_option("allocate", "uri"):
195            self.allocate_project = \
196                allocate_project_local(config, auth)
197        else:
198            self.allocate_project = \
199                allocate_project_remote(config, auth)
200
201
202        # If the project allocator exports services, put them in this object's
203        # maps so that classes that instantiate this can call the services.
204        self.soap_services.update(self.allocate_project.soap_services)
205        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
206
207    @staticmethod
208    def make_access_project(str):
209        """
210        Convert a string of the form (id[:resources:resouces], id, id) into an
211        access_project.  This is called by read_access to convert to local
212        attributes.  It returns a tuple of the form (project, user, user) where
213        users may be names or fedids.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = access_project(first[0], first[1:])
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def make_abac_access_project(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_abac_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            # The slice takes the parens off the string.
250            proj, user = str[1:-1].split(',')
251            return (access_project(proj.strip(), []), 
252                    user.strip(), user.strip())
253        else:
254            raise self.parse_error(
255                    'Bad mapping (unbalanced parens or more than 1 comma)')
256
257
258    # RequestAccess support routines
259
260    def lookup_access(self, req, fid):
261        """
262        Look up the local access control information mapped to this fedid and
263        credentials.  In this case it is a (project, create_user, access_user)
264        triple, and a triple of three booleans indicating which, if any will
265        need to be dynamically created.  Finally a list of owners for that
266        allocation is returned.
267
268        lookup_access_base pulls the first triple out, and it is parsed by this
269        routine into the boolean map.  Owners is always the controlling fedid.
270        """
271        # Return values
272        rp = access_project(None, ())
273        ru = None
274        # This maps a valid user to the Emulab projects and users to use
275        found, match = self.lookup_access_base(req, fid)
276        tb, project, user = match
277       
278        if found == None:
279            raise service_error(service_error.access,
280                    "Access denied - cannot map access")
281
282        # resolve <dynamic> and <same> in found
283        dyn_proj = False
284        dyn_create_user = False
285        dyn_service_user = False
286
287        if found[0].name == "<same>":
288            if project != None:
289                rp.name = project
290            else : 
291                raise service_error(\
292                        service_error.server_config,
293                        "Project matched <same> when no project given")
294        elif found[0].name == "<dynamic>":
295            rp.name = None
296            dyn_proj = True
297        else:
298            rp.name = found[0].name
299        rp.node_types = found[0].node_types;
300
301        if found[1] == "<same>":
302            if user_match == "<any>":
303                if user != None: rcu = user[0]
304                else: raise service_error(\
305                        service_error.server_config,
306                        "Matched <same> on anonymous request")
307            else:
308                rcu = user_match
309        elif found[1] == "<dynamic>":
310            rcu = None
311            dyn_create_user = True
312        else:
313            rcu = found[1]
314       
315        if found[2] == "<same>":
316            if user_match == "<any>":
317                if user != None: rsu = user[0]
318                else: raise service_error(\
319                        service_error.server_config,
320                        "Matched <same> on anonymous request")
321            else:
322                rsu = user_match
323        elif found[2] == "<dynamic>":
324            rsu = None
325            dyn_service_user = True
326        else:
327            rsu = found[2]
328
329        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
330                [ fid ]
331
332    def lookup_abac_access(self, req, fid): 
333        # Import request credentials into this (clone later??)
334        if self.auth.import_credentials(data_list=req.get('abac_credential', [])):
335            self.auth.save()
336
337        # Check every attribute that we know how to map and take the first
338        # success.
339        print "%s" %self.auth
340        for attr in (self.access.keys()):
341            if self.auth.check_attribute(fid, attr):
342                # XXX: needs to deal with dynamics
343                return copy.copy(self.access[attr]), (False, False, False), \
344                        [ fid ]
345            else:
346                self.log.debug("Access failed for %s %s" % (attr, fid))
347        else:
348            raise service_error(service_error.access, "Access denied")
349
350
351    def do_project_allocation(self, dyn, project, user):
352        """
353        Call the project allocation routines and return the info.
354        """
355        if dyn: 
356            # Compose the dynamic project request
357            # (only dynamic, dynamic currently allowed)
358            preq = { 'AllocateProjectRequestBody': \
359                        { 'project' : {\
360                            'user': [ \
361                            { \
362                                'access': [ { 
363                                    'sshPubkey': self.ssh_pubkey_file } ],
364                                 'role': "serviceAccess",\
365                            }, \
366                            { \
367                                'access': [ { 
368                                    'sshPubkey': self.ssh_pubkey_file } ],
369                                 'role': "experimentCreation",\
370                            }, \
371                            ], \
372                            }\
373                        }\
374                    }
375            return self.allocate_project.dynamic_project(preq)
376        else:
377            preq = {'StaticProjectRequestBody' : \
378                    { 'project': \
379                        { 'name' : { 'localname' : project },\
380                          'user' : [ \
381                            {\
382                                'userID': { 'localname' : user }, \
383                                'access': [ { 
384                                    'sshPubkey': self.ssh_pubkey_file } ],
385                                'role': 'experimentCreation'\
386                            },\
387                            {\
388                                'userID': { 'localname' : user}, \
389                                'access': [ { 
390                                    'sshPubkey': self.ssh_pubkey_file } ],
391                                'role': 'serviceAccess'\
392                            },\
393                        ]}\
394                    }\
395            }
396            return self.allocate_project.static_project(preq)
397
398    def save_project_state(self, aid, ap, dyn, owners):
399        """
400        Parse out and save the information relevant to the project created for
401        this experiment.  That info is largely in ap and owners.  dyn indicates
402        that the project was created dynamically.  Return the user and project
403        names.
404        """
405        self.state_lock.acquire()
406        self.allocation[aid] = { }
407        try:
408            pname = ap['project']['name']['localname']
409        except KeyError:
410            pname = None
411
412        if dyn:
413            if not pname:
414                self.state_lock.release()
415                raise service_error(service_error.internal,
416                        "Misformed allocation response?")
417            if pname in self.projects: self.projects[pname] += 1
418            else: self.projects[pname] = 1
419            self.allocation[aid]['project'] = pname
420        else:
421            # sproject is a static project associated with this allocation.
422            self.allocation[aid]['sproject'] = pname
423
424        self.allocation[aid]['keys'] = [ ]
425
426        try:
427            for u in ap['project']['user']:
428                uname = u['userID']['localname']
429                if u['role'] == 'experimentCreation':
430                    self.allocation[aid]['user'] = uname
431                for k in [ k['sshPubkey'] for k in u['access'] \
432                        if k.has_key('sshPubkey') ]:
433                    kv = "%s:%s" % (uname, k)
434                    if self.keys.has_key(kv): self.keys[kv] += 1
435                    else: self.keys[kv] = 1
436                    self.allocation[aid]['keys'].append((uname, k))
437        except KeyError:
438            self.state_lock.release()
439            raise service_error(service_error.internal,
440                    "Misformed allocation response?")
441
442        self.allocation[aid]['owners'] = owners
443        self.write_state()
444        self.state_lock.release()
445        return (pname, uname)
446
447    # End of RequestAccess support routines
448
449    def RequestAccess(self, req, fid):
450        """
451        Handle the access request.  Proxy if not for us.
452
453        Parse out the fields and make the allocations or rejections if for us,
454        otherwise, assuming we're willing to proxy, proxy the request out.
455        """
456
457        def gateway_hardware(h):
458            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
459            else: return h
460
461        def get_export_project(svcs):
462            """
463            if the service requests includes one to export a project, return
464            that project.
465            """
466            rv = None
467            for s in svcs:
468                if s.get('name', '') == 'project_export' and \
469                        s.get('visibility', '') == 'export':
470                    if not rv: 
471                        for a in s.get('feddAttr', []):
472                            if a.get('attribute', '') == 'project' \
473                                    and 'value' in a:
474                                rv = a['value']
475                    else:
476                        raise service_error(service_error, access, 
477                                'Requesting multiple project exports is ' + \
478                                        'not supported');
479            return rv
480
481        # The dance to get into the request body
482        if req.has_key('RequestAccessRequestBody'):
483            req = req['RequestAccessRequestBody']
484        else:
485            raise service_error(service_error.req, "No request!?")
486
487        alog = open("./auth.log", 'w')
488        print >>alog, self.auth
489        print >> alog, "after"
490        if self.auth.import_credentials(
491                data_list=req.get('abac_credential', [])):
492            self.auth.save()
493        print >>alog, self.auth
494        alog.close()
495
496        if self.auth_type == "legacy":
497            found, dyn, owners = self.lookup_access(req, fid)
498        elif self.auth_type == 'abac':
499            found, dyn, owners = self.lookup_abac_access(req, fid)
500        else:
501            raise service_error(service_error.internal, 
502                    'Unknown auth_type: %s' % self.auth_type)
503        ap = None
504
505        print "%s %s %s" % (found, dyn, owners)
506
507        # if this includes a project export request and the exported
508        # project is not the access project, access denied.
509        if 'service' in req:
510            ep = get_export_project(req['service'])
511            if ep and ep != found[0].name:
512                raise service_error(service_error.access,
513                        "Cannot export %s" % ep)
514
515        if self.ssh_pubkey_file:
516            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
517        else:
518            raise service_error(service_error.internal, 
519                    "SSH access parameters required")
520        # keep track of what's been added
521        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
522        aid = unicode(allocID)
523
524        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
525
526        services, svc_state = self.export_services(req.get('service',[]),
527                pname, uname)
528        self.state_lock.acquire()
529        # Store services state in global state
530        for k, v in svc_state.items():
531            self.allocation[aid][k] = v
532        self.write_state()
533        self.state_lock.release()
534        # Give the owners the right to change this allocation
535        for o in owners:
536            self.auth.set_attribute(o, allocID)
537            print "Set delete rights on %s for %s" % (o, allocID)
538        self.auth.save()
539        try:
540            f = open("%s/%s.pem" % (self.certdir, aid), "w")
541            print >>f, alloc_cert
542            f.close()
543        except EnvironmentError, e:
544            raise service_error(service_error.internal, 
545                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
546        resp = self.build_access_response({ 'fedid': allocID } ,
547                ap, services)
548        return resp
549
550    def do_release_project(self, del_project, del_users, del_types):
551        """
552        If a project and users has to be deleted, make the call.
553        """
554        msg = { 'project': { }}
555        if del_project:
556            msg['project']['name']= {'localname': del_project}
557        users = [ ]
558        for u in del_users.keys():
559            users.append({ 'userID': { 'localname': u },\
560                'access' :  \
561                        [ {'sshPubkey' : s } for s in del_users[u]]\
562            })
563        if users: 
564            msg['project']['user'] = users
565        if len(del_types) > 0:
566            msg['resources'] = { 'node': \
567                    [ {'hardware': [ h ] } for h in del_types ]\
568                }
569        if self.allocate_project.release_project:
570            msg = { 'ReleaseProjectRequestBody' : msg}
571            self.allocate_project.release_project(msg)
572
573    def ReleaseAccess(self, req, fid):
574        # The dance to get into the request body
575        if req.has_key('ReleaseAccessRequestBody'):
576            req = req['ReleaseAccessRequestBody']
577        else:
578            raise service_error(service_error.req, "No request!?")
579
580        try:
581            if req['allocID'].has_key('localname'):
582                auth_attr = aid = req['allocID']['localname']
583            elif req['allocID'].has_key('fedid'):
584                aid = unicode(req['allocID']['fedid'])
585                auth_attr = req['allocID']['fedid']
586            else:
587                raise service_error(service_error.req,
588                        "Only localnames and fedids are understood")
589        except KeyError:
590            raise service_error(service_error.req, "Badly formed request")
591
592        self.log.debug("[access] deallocation requested for %s by %s" % \
593                (aid, fid))
594        if not self.auth.check_attribute(fid, auth_attr):
595            self.log.debug("[access] deallocation denied for %s", aid)
596            raise service_error(service_error.access, "Access Denied")
597
598        # If we know this allocation, reduce the reference counts and
599        # remove the local allocations.  Otherwise report an error.  If
600        # there is an allocation to delete, del_users will be a dictonary
601        # of sets where the key is the user that owns the keys in the set.
602        # We use a set to avoid duplicates.  del_project is just the name
603        # of any dynamic project to delete.  We're somewhat lazy about
604        # deleting authorization attributes.  Having access to something
605        # that doesn't exist isn't harmful.
606        del_users = { }
607        del_project = None
608        del_types = set()
609
610        self.state_lock.acquire()
611        if aid in self.allocation:
612            self.log.debug("Found allocation for %s" %aid)
613            for k in self.allocation[aid]['keys']:
614                kk = "%s:%s" % k
615                self.keys[kk] -= 1
616                if self.keys[kk] == 0:
617                    if not del_users.has_key(k[0]):
618                        del_users[k[0]] = set()
619                    del_users[k[0]].add(k[1])
620                    del self.keys[kk]
621
622            if 'project' in self.allocation[aid]:
623                pname = self.allocation[aid]['project']
624                self.projects[pname] -= 1
625                if self.projects[pname] == 0:
626                    del_project = pname
627                    del self.projects[pname]
628
629            if 'types' in self.allocation[aid]:
630                for t in self.allocation[aid]['types']:
631                    self.types[t] -= 1
632                    if self.types[t] == 0:
633                        if not del_project: del_project = t[0]
634                        del_types.add(t[1])
635                        del self.types[t]
636
637            del self.allocation[aid]
638            self.write_state()
639            self.state_lock.release()
640            # If we actually have resources to deallocate, prepare the call.
641            if del_project or del_users:
642                self.do_release_project(del_project, del_users, del_types)
643            # And remove the access cert
644            cf = "%s/%s.pem" % (self.certdir, aid)
645            self.log.debug("Removing %s" % cf)
646            os.remove(cf)
647            return { 'allocID': req['allocID'] } 
648        else:
649            self.state_lock.release()
650            raise service_error(service_error.req, "No such allocation")
651
652    # These are subroutines for StartSegment
653    def generate_ns2(self, topo, expfn, softdir, connInfo):
654        """
655        Convert topo into an ns2 file, decorated with appropriate commands for
656        the particular testbed setup.  Convert all requests for software, etc
657        to point at the staged copies on this testbed and add the federation
658        startcommands.
659        """
660        class dragon_commands:
661            """
662            Functor to spit out approrpiate dragon commands for nodes listed in
663            the connectivity description.  The constructor makes a dict mapping
664            dragon nodes to their parameters and the __call__ checks each
665            element in turn for membership.
666            """
667            def __init__(self, map):
668                self.node_info = map
669
670            def __call__(self, e):
671                s = ""
672                if isinstance(e, topdl.Computer):
673                    if self.node_info.has_key(e.name):
674                        info = self.node_info[e.name]
675                        for ifname, vlan, type in info:
676                            for i in e.interface:
677                                if i.name == ifname:
678                                    addr = i.get_attribute('ip4_address')
679                                    subs = i.substrate[0]
680                                    break
681                            else:
682                                raise service_error(service_error.internal,
683                                        "No interface %s on element %s" % \
684                                                (ifname, e.name))
685                            # XXX: do netmask right
686                            if type =='link':
687                                s = ("tb-allow-external ${%s} " + \
688                                        "dragonportal ip %s vlan %s " + \
689                                        "netmask 255.255.255.0\n") % \
690                                        (topdl.to_tcl_name(e.name), addr, vlan)
691                            elif type =='lan':
692                                s = ("tb-allow-external ${%s} " + \
693                                        "dragonportal " + \
694                                        "ip %s vlan %s usurp %s\n") % \
695                                        (topdl.to_tcl_name(e.name), addr, 
696                                                vlan, subs)
697                            else:
698                                raise service_error(service_error_internal,
699                                        "Unknown DRAGON type %s" % type)
700                return s
701
702        class not_dragon:
703            """
704            Return true if a node is in the given map of dragon nodes.
705            """
706            def __init__(self, map):
707                self.nodes = set(map.keys())
708
709            def __call__(self, e):
710                return e.name not in self.nodes
711
712        # Main line of generate_ns2
713        t = topo.clone()
714
715        # Create the map of nodes that need direct connections (dragon
716        # connections) from the connInfo
717        dragon_map = { }
718        for i in [ i for i in connInfo if i['type'] == 'transit']:
719            for a in i.get('fedAttr', []):
720                if a['attribute'] == 'vlan_id':
721                    vlan = a['value']
722                    break
723            else:
724                raise service_error(service_error.internal, "No vlan tag")
725            members = i.get('member', [])
726            if len(members) > 1: type = 'lan'
727            else: type = 'link'
728
729            try:
730                for m in members:
731                    if m['element'] in dragon_map:
732                        dragon_map[m['element']].append(( m['interface'], 
733                            vlan, type))
734                    else:
735                        dragon_map[m['element']] = [( m['interface'], 
736                            vlan, type),]
737            except KeyError:
738                raise service_error(service_error.req,
739                        "Missing connectivity info")
740
741        # Weed out the things we aren't going to instantiate: Segments, portal
742        # substrates, and portal interfaces.  (The copy in the for loop allows
743        # us to delete from e.elements in side the for loop).  While we're
744        # touching all the elements, we also adjust paths from the original
745        # testbed to local testbed paths and put the federation commands into
746        # the start commands
747        for e in [e for e in t.elements]:
748            if isinstance(e, topdl.Segment):
749                t.elements.remove(e)
750            if isinstance(e, topdl.Computer):
751                self.add_kit(e, self.federation_software)
752                if e.get_attribute('portal') and self.portal_startcommand:
753                    # Add local portal support software
754                    self.add_kit(e, self.portal_software)
755                    # Portals never have a user-specified start command
756                    e.set_attribute('startup', self.portal_startcommand)
757                elif self.node_startcommand:
758                    if e.get_attribute('startup'):
759                        e.set_attribute('startup', "%s \\$USER '%s'" % \
760                                (self.node_startcommand, 
761                                    e.get_attribute('startup')))
762                    else:
763                        e.set_attribute('startup', self.node_startcommand)
764
765                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
766                # Remove portal interfaces that do not connect to DRAGON
767                e.interface = [i for i in e.interface \
768                        if not i.get_attribute('portal') or i.name in dinf ]
769            # Fix software paths
770            for s in getattr(e, 'software', []):
771                s.location = re.sub("^.*/", softdir, s.location)
772
773        t.substrates = [ s.clone() for s in t.substrates ]
774        t.incorporate_elements()
775
776        # Customize the ns2 output for local portal commands and images
777        filters = []
778
779        if self.dragon_endpoint:
780            add_filter = not_dragon(dragon_map)
781            filters.append(dragon_commands(dragon_map))
782        else:
783            add_filter = None
784
785        if self.portal_command:
786            filters.append(topdl.generate_portal_command_filter(
787                self.portal_command, add_filter=add_filter))
788
789        if self.portal_image:
790            filters.append(topdl.generate_portal_image_filter(
791                self.portal_image))
792
793        if self.portal_type:
794            filters.append(topdl.generate_portal_hardware_filter(
795                self.portal_type))
796
797        # Convert to ns and write it out
798        expfile = topdl.topology_to_ns2(t, filters)
799        try:
800            f = open(expfn, "w")
801            print >>f, expfile
802            f.close()
803        except EnvironmentError:
804            raise service_error(service_error.internal,
805                    "Cannot write experiment file %s: %s" % (expfn,e))
806
807    def export_store_info(self, cf, proj, ename, connInfo):
808        """
809        For the export requests in the connection info, install the peer names
810        at the experiment controller via SetValue calls.
811        """
812
813        for c in connInfo:
814            for p in [ p for p in c.get('parameter', []) \
815                    if p.get('type', '') == 'output']:
816
817                if p.get('name', '') == 'peer':
818                    k = p.get('key', None)
819                    surl = p.get('store', None)
820                    if surl and k and k.index('/') != -1:
821                        value = "%s.%s.%s%s" % \
822                                (k[k.index('/')+1:], ename, proj, self.domain)
823                        req = { 'name': k, 'value': value }
824                        self.log.debug("Setting %s to %s on %s" % \
825                                (k, value, surl))
826                        self.call_SetValue(surl, req, cf)
827                    else:
828                        self.log.error("Bad export request: %s" % p)
829                elif p.get('name', '') == 'ssh_port':
830                    k = p.get('key', None)
831                    surl = p.get('store', None)
832                    if surl and k:
833                        req = { 'name': k, 'value': self.ssh_port }
834                        self.log.debug("Setting %s to %s on %s" % \
835                                (k, self.ssh_port, surl))
836                        self.call_SetValue(surl, req, cf)
837                    else:
838                        self.log.error("Bad export request: %s" % p)
839                else:
840                    self.log.error("Unknown export parameter: %s" % \
841                            p.get('name'))
842                    continue
843
844    def add_seer_node(self, topo, name, startup):
845        """
846        Add a seer node to the given topology, with the startup command passed
847        in.  Used by configure seer_services.
848        """
849        c_node = topdl.Computer(
850                name=name, 
851                os= topdl.OperatingSystem(
852                    attribute=[
853                    { 'attribute': 'osid', 
854                        'value': self.local_seer_image },
855                    ]),
856                attribute=[
857                    { 'attribute': 'startup', 'value': startup },
858                    ]
859                )
860        self.add_kit(c_node, self.local_seer_software)
861        topo.elements.append(c_node)
862
863    def configure_seer_services(self, services, topo, softdir):
864        """
865        Make changes to the topology required for the seer requests being made.
866        Specifically, add any control or master nodes required and set up the
867        start commands on the nodes to interconnect them.
868        """
869        local_seer = False      # True if we need to add a control node
870        collect_seer = False    # True if there is a seer-master node
871        seer_master= False      # True if we need to add the seer-master
872        for s in services:
873            s_name = s.get('name', '')
874            s_vis = s.get('visibility','')
875
876            if s_name  == 'local_seer_control' and s_vis == 'export':
877                local_seer = True
878            elif s_name == 'seer_master':
879                if s_vis == 'import':
880                    collect_seer = True
881                elif s_vis == 'export':
882                    seer_master = True
883       
884        # We've got the whole picture now, so add nodes if needed and configure
885        # them to interconnect properly.
886        if local_seer or seer_master:
887            # Copy local seer control node software to the tempdir
888            for l, f in self.local_seer_software:
889                base = os.path.basename(f)
890                copy_file(f, "%s/%s" % (softdir, base))
891        # If we're collecting seers somewhere the controllers need to talk to
892        # the master.  In testbeds that export the service, that will be a
893        # local node that we'll add below.  Elsewhere it will be the control
894        # portal that will port forward to the exporting master.
895        if local_seer:
896            if collect_seer:
897                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
898            else:
899                startup = self.local_seer_start
900            self.add_seer_node(topo, 'control', startup)
901        # If this is the seer master, add that node, too.
902        if seer_master:
903            self.add_seer_node(topo, 'seer-master', 
904                    "%s -R -n -R seer-master -R -A -R sink" % \
905                            self.seer_master_start)
906
907    def retrieve_software(self, topo, certfile, softdir):
908        """
909        Collect the software that nodes in the topology need loaded and stage
910        it locally.  This implies retrieving it from the experiment_controller
911        and placing it into softdir.  Certfile is used to prove that this node
912        has access to that data (it's the allocation/segment fedid).  Finally
913        local portal and federation software is also copied to the same staging
914        directory for simplicity - all software needed for experiment creation
915        is in softdir.
916        """
917        sw = set()
918        for e in topo.elements:
919            for s in getattr(e, 'software', []):
920                sw.add(s.location)
921        for s in sw:
922            self.log.debug("Retrieving %s" % s)
923            try:
924                get_url(s, certfile, softdir)
925            except:
926                t, v, st = sys.exc_info()
927                raise service_error(service_error.internal,
928                        "Error retrieving %s: %s" % (s, v))
929
930        # Copy local federation and portal node software to the tempdir
931        for s in (self.federation_software, self.portal_software):
932            for l, f in s:
933                base = os.path.basename(f)
934                copy_file(f, "%s/%s" % (softdir, base))
935
936
937    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
938        """
939        Gather common configuration files, retrieve or create an experiment
940        name and project name, and return the ssh_key filenames.  Create an
941        allocation log bound to the state log variable as well.
942        """
943        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
944        ename = None
945        pubkey_base = None
946        secretkey_base = None
947        proj = None
948        user = None
949        alloc_log = None
950
951        for a in attrs:
952            if a['attribute'] in configs:
953                try:
954                    self.log.debug("Retrieving %s from %s" % \
955                            (a['attribute'], a['value']))
956                    get_url(a['value'], certfile, tmpdir)
957                except:
958                    t, v, st = sys.exc_info()
959                    raise service_error(service_error.internal,
960                            "Error retrieving %s: %s" % (a.get('value', ""), v))
961            if a['attribute'] == 'ssh_pubkey':
962                pubkey_base = a['value'].rpartition('/')[2]
963            if a['attribute'] == 'ssh_secretkey':
964                secretkey_base = a['value'].rpartition('/')[2]
965            if a['attribute'] == 'experiment_name':
966                ename = a['value']
967
968        if not ename:
969            ename = ""
970            for i in range(0,5):
971                ename += random.choice(string.ascii_letters)
972            self.log.warn("No experiment name: picked one randomly: %s" \
973                    % ename)
974
975        if not pubkey_base:
976            raise service_error(service_error.req, 
977                    "No public key attribute")
978
979        if not secretkey_base:
980            raise service_error(service_error.req, 
981                    "No secret key attribute")
982
983        self.state_lock.acquire()
984        if aid in self.allocation:
985            proj = self.allocation[aid].get('project', None)
986            if not proj: 
987                proj = self.allocation[aid].get('sproject', None)
988            user = self.allocation[aid].get('user', None)
989            self.allocation[aid]['experiment'] = ename
990            self.allocation[aid]['log'] = [ ]
991            # Create a logger that logs to the experiment's state object as
992            # well as to the main log file.
993            alloc_log = logging.getLogger('fedd.access.%s' % ename)
994            h = logging.StreamHandler(
995                    list_log.list_log(self.allocation[aid]['log']))
996            # XXX: there should be a global one of these rather than
997            # repeating the code.
998            h.setFormatter(logging.Formatter(
999                "%(asctime)s %(name)s %(message)s",
1000                        '%d %b %y %H:%M:%S'))
1001            alloc_log.addHandler(h)
1002            self.write_state()
1003        self.state_lock.release()
1004
1005        if not proj:
1006            raise service_error(service_error.internal, 
1007                    "Can't find project for %s" %aid)
1008
1009        if not user:
1010            raise service_error(service_error.internal, 
1011                    "Can't find creation user for %s" %aid)
1012
1013        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1014
1015    def finalize_experiment(self, starter, topo, aid, alloc_id):
1016        """
1017        Store key bits of experiment state in the global repository, including
1018        the response that may need to be replayed, and return the response.
1019        """
1020        # Copy the assigned names into the return topology
1021        embedding = [ ]
1022        for n in starter.node:
1023            embedding.append({ 
1024                'toponame': n,
1025                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1026                })
1027        # Grab the log (this is some anal locking, but better safe than
1028        # sorry)
1029        self.state_lock.acquire()
1030        logv = "".join(self.allocation[aid]['log'])
1031        # It's possible that the StartSegment call gets retried (!).
1032        # if the 'started' key is in the allocation, we'll return it rather
1033        # than redo the setup.
1034        self.allocation[aid]['started'] = { 
1035                'allocID': alloc_id,
1036                'allocationLog': logv,
1037                'segmentdescription': { 
1038                    'topdldescription': topo.clone().to_dict()
1039                    },
1040                'embedding': embedding
1041                }
1042        retval = copy.copy(self.allocation[aid]['started'])
1043        self.write_state()
1044        self.state_lock.release()
1045        return retval
1046   
1047    # End of StartSegment support routines
1048
1049    def StartSegment(self, req, fid):
1050        err = None  # Any service_error generated after tmpdir is created
1051        rv = None   # Return value from segment creation
1052
1053        try:
1054            req = req['StartSegmentRequestBody']
1055            auth_attr = req['allocID']['fedid']
1056            topref = req['segmentdescription']['topdldescription']
1057        except KeyError:
1058            raise service_error(server_error.req, "Badly formed request")
1059
1060        connInfo = req.get('connection', [])
1061        services = req.get('service', [])
1062        aid = "%s" % auth_attr
1063        attrs = req.get('fedAttr', [])
1064        if not self.auth.check_attribute(fid, auth_attr):
1065            raise service_error(service_error.access, "Access denied")
1066        else:
1067            # See if this is a replay of an earlier succeeded StartSegment -
1068            # sometimes SSL kills 'em.  If so, replay the response rather than
1069            # redoing the allocation.
1070            self.state_lock.acquire()
1071            retval = self.allocation[aid].get('started', None)
1072            self.state_lock.release()
1073            if retval:
1074                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1075                        "replaying response")
1076                return retval
1077
1078        # A new request.  Do it.
1079
1080        if topref: topo = topdl.Topology(**topref)
1081        else:
1082            raise service_error(service_error.req, 
1083                    "Request missing segmentdescription'")
1084       
1085        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1086        try:
1087            tmpdir = tempfile.mkdtemp(prefix="access-")
1088            softdir = "%s/software" % tmpdir
1089            os.mkdir(softdir)
1090        except EnvironmentError:
1091            raise service_error(service_error.internal, "Cannot create tmp dir")
1092
1093        # Try block alllows us to clean up temporary files.
1094        try:
1095            self.retrieve_software(topo, certfile, softdir)
1096            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1097                    self.initialize_experiment_info(attrs, aid, 
1098                            certfile, tmpdir)
1099
1100            # Set up userconf and seer if needed
1101            self.configure_userconf(services, tmpdir)
1102            self.configure_seer_services(services, topo, softdir)
1103            # Get and send synch store variables
1104            self.export_store_info(certfile, proj, ename, connInfo)
1105            self.import_store_info(certfile, connInfo)
1106
1107            expfile = "%s/experiment.tcl" % tmpdir
1108
1109            self.generate_portal_configs(topo, pubkey_base, 
1110                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1111            self.generate_ns2(topo, expfile, 
1112                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1113
1114            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1115                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1116                    cert=self.xmlrpc_cert)
1117            rv = starter(self, ename, proj, user, expfile, tmpdir)
1118        except service_error, e:
1119            err = e
1120        except:
1121            t, v, st = sys.exc_info()
1122            err = service_error(service_error.internal, "%s: %s" % \
1123                    (v, traceback.extract_tb(st)))
1124
1125        # Walk up tmpdir, deleting as we go
1126        if self.cleanup: self.remove_dirs(tmpdir)
1127        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1128
1129        if rv:
1130            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1131        elif err:
1132            raise service_error(service_error.federant,
1133                    "Swapin failed: %s" % err)
1134        else:
1135            raise service_error(service_error.federant, "Swapin failed")
1136
1137    def TerminateSegment(self, req, fid):
1138        try:
1139            req = req['TerminateSegmentRequestBody']
1140        except KeyError:
1141            raise service_error(server_error.req, "Badly formed request")
1142
1143        auth_attr = req['allocID']['fedid']
1144        aid = "%s" % auth_attr
1145        attrs = req.get('fedAttr', [])
1146        if not self.auth.check_attribute(fid, auth_attr):
1147            raise service_error(service_error.access, "Access denied")
1148
1149        self.state_lock.acquire()
1150        if aid in self.allocation:
1151            proj = self.allocation[aid].get('project', None)
1152            if not proj: 
1153                proj = self.allocation[aid].get('sproject', None)
1154            user = self.allocation[aid].get('user', None)
1155            ename = self.allocation[aid].get('experiment', None)
1156        else:
1157            proj = None
1158            user = None
1159            ename = None
1160        self.state_lock.release()
1161
1162        if not proj:
1163            raise service_error(service_error.internal, 
1164                    "Can't find project for %s" % aid)
1165
1166        if not user:
1167            raise service_error(service_error.internal, 
1168                    "Can't find creation user for %s" % aid)
1169        if not ename:
1170            raise service_error(service_error.internal, 
1171                    "Can't find experiment name for %s" % aid)
1172        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1173                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1174        stopper(self, user, proj, ename)
1175        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.