source: fedd/federation/emulab_access.py @ 822d31b

axis_examplecompt_changesinfo-ops
Last change on this file since 822d31b was 822d31b, checked in by Ted Faber <faber@…>, 13 years ago

fix legacy mode

  • Property mode set to 100644
File size: 38.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        # XXX: this should go?
108        #if config.has_option("access", "accessdb"):
109        #    self.read_access(config.get("access", "accessdb"))
110        tb = config.get('access', 'testbed')
111        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
112        else: self.testbed = [ ]
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            if accessdb:
122                self.legacy_read_access(accessdb, self.legacy_access_tuple)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            if accessdb:
126                self.read_access(accessdb, self.access_tuple)
127        else:
128            raise service_error(service_error.internal, 
129                    "Unknown auth_type: %s" % self.auth_type)
130
131        # read_state in the base_class
132        self.state_lock.acquire()
133        for a  in ('allocation', 'projects', 'keys', 'types'):
134            if a not in self.state:
135                self.state[a] = { }
136        self.allocation = self.state['allocation']
137        self.projects = self.state['projects']
138        self.keys = self.state['keys']
139        self.types = self.state['types']
140        if self.auth_type == "legacy": 
141            # Add the ownership attributes to the authorizer.  Note that the
142            # indices of the allocation dict are strings, but the attributes are
143            # fedids, so there is a conversion.
144            for k in self.allocation.keys():
145                for o in self.allocation[k].get('owners', []):
146                    self.auth.set_attribute(o, fedid(hexstr=k))
147                if self.allocation[k].has_key('userconfig'):
148                    sfid = self.allocation[k]['userconfig']
149                    fid = fedid(hexstr=sfid)
150                    self.auth.set_attribute(fid, "/%s" % sfid)
151        self.state_lock.release()
152        self.exports = {
153                'SMB': self.export_SMB,
154                'seer': self.export_seer,
155                'tmcd': self.export_tmcd,
156                'userconfig': self.export_userconfig,
157                'project_export': self.export_project_export,
158                'local_seer_control': self.export_local_seer,
159                'seer_master': self.export_seer_master,
160                'hide_hosts': self.export_hide_hosts,
161                }
162
163        if not self.local_seer_image or not self.local_seer_software or \
164                not self.local_seer_start:
165            if 'local_seer_control' in self.exports:
166                del self.exports['local_seer_control']
167
168        if not self.local_seer_image or not self.local_seer_software or \
169                not self.seer_master_start:
170            if 'seer_master' in self.exports:
171                del self.exports['seer_master']
172
173
174        self.soap_services = {\
175            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
176            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
177            'StartSegment': soap_handler("StartSegment", self.StartSegment),
178            'TerminateSegment': soap_handler("TerminateSegment",
179                self.TerminateSegment),
180            }
181        self.xmlrpc_services =  {\
182            'RequestAccess': xmlrpc_handler('RequestAccess',
183                self.RequestAccess),
184            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
185                self.ReleaseAccess),
186            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': xmlrpc_handler('TerminateSegment',
188                self.TerminateSegment),
189            }
190
191        self.call_SetValue = service_caller('SetValue')
192        self.call_GetValue = service_caller('GetValue', log=self.log)
193
194        if not config.has_option("allocate", "uri"):
195            self.allocate_project = \
196                allocate_project_local(config, auth)
197        else:
198            self.allocate_project = \
199                allocate_project_remote(config, auth)
200
201
202        # If the project allocator exports services, put them in this object's
203        # maps so that classes that instantiate this can call the services.
204        self.soap_services.update(self.allocate_project.soap_services)
205        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
206
207    @staticmethod
208    def legacy_access_tuple(str):
209        """
210        Convert a string of the form (id[:resources:resouces], id, id) into a
211        tuple of the form (project, user, user) where users may be names or
212        fedids.  The resources strings are obsolete and ignored.
213        """
214        def parse_name(n):
215            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
216            else: return n
217
218        str = str.strip()
219        if str.startswith('(') and str.endswith(')'):
220            str = str[1:-1]
221            names = [ s.strip() for s in str.split(",")]
222            if len(names) > 3:
223                raise self.parse_error("More than three fields in name")
224            first = names[0].split(":")
225            if first == 'fedid:':
226                del first[0]
227                first[0] = fedid(hexstr=first[0])
228            names[0] = first[0]
229
230            for i in range(1,2):
231                names[i] = parse_name(names[i])
232
233            return tuple(names)
234        else:
235            raise self.parse_error('Bad mapping (unbalanced parens)')
236
237    @staticmethod
238    def access_tuple(str):
239        """
240        Convert a string of the form (id, id) into an access_project.  This is
241        called by read_access to convert to local attributes.  It returns
242        a tuple of the form (project, user, user) where the two users are
243        always the same.
244        """
245
246        str = str.strip()
247        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
248            # The slice takes the parens off the string.
249            proj, user = str[1:-1].split(',')
250            return (proj.strip(), user.strip(), user.strip())
251        else:
252            raise self.parse_error(
253                    'Bad mapping (unbalanced parens or more than 1 comma)')
254
255
256    # RequestAccess support routines
257
258    def legacy_lookup_access(self, req, fid):
259        """
260        Look up the local access control information mapped to this fedid and
261        credentials.  In this case it is a (project, create_user, access_user)
262        triple, and a triple of three booleans indicating which, if any will
263        need to be dynamically created.  Finally a list of owners for that
264        allocation is returned.
265
266        lookup_access_base pulls the first triple out, and it is parsed by this
267        routine into the boolean map.  Owners is always the controlling fedid.
268        """
269        # Return values
270        rp = None
271        ru = None
272        # This maps a valid user to the Emulab projects and users to use
273        found, match = self.legacy_lookup_access_base(req, fid)
274        tb, project, user = match
275       
276        if found == None:
277            raise service_error(service_error.access,
278                    "Access denied - cannot map access")
279
280        # resolve <dynamic> and <same> in found
281        dyn_proj = False
282        dyn_create_user = False
283        dyn_service_user = False
284
285        if found[0] == "<same>":
286            if project != None:
287                rp = project
288            else : 
289                raise service_error(\
290                        service_error.server_config,
291                        "Project matched <same> when no project given")
292        elif found[0] == "<dynamic>":
293            rp = None
294            dyn_proj = True
295        else:
296            rp = found[0]
297
298        if found[1] == "<same>":
299            if user_match == "<any>":
300                if user != None: rcu = user[0]
301                else: raise service_error(\
302                        service_error.server_config,
303                        "Matched <same> on anonymous request")
304            else:
305                rcu = user_match
306        elif found[1] == "<dynamic>":
307            rcu = None
308            dyn_create_user = True
309        else:
310            rcu = found[1]
311       
312        if found[2] == "<same>":
313            if user_match == "<any>":
314                if user != None: rsu = user[0]
315                else: raise service_error(\
316                        service_error.server_config,
317                        "Matched <same> on anonymous request")
318            else:
319                rsu = user_match
320        elif found[2] == "<dynamic>":
321            rsu = None
322            dyn_service_user = True
323        else:
324            rsu = found[2]
325
326        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
327                [ fid ]
328
329    def lookup_access(self, req, fid): 
330        """
331        Check all the attributes that this controller knows how to map and see
332        if the requester is allowed to use any of them.  If so return one.
333        """
334        # Import request credentials into this (clone later??)
335        if self.auth.import_credentials(
336                data_list=req.get('abac_credential', [])):
337            self.auth.save()
338
339        # Check every attribute that we know how to map and take the first
340        # success.
341        for attr in (self.access.keys()):
342            if self.auth.check_attribute(fid, attr):
343                # XXX: needs to deal with dynamics
344                return copy.copy(self.access[attr]), (False, False, False), \
345                        [ fid ]
346            else:
347                self.log.debug("Access failed for %s %s" % (attr, fid))
348        else:
349            raise service_error(service_error.access, "Access denied")
350
351
352    def do_project_allocation(self, dyn, project, user):
353        """
354        Call the project allocation routines and return the info.
355        """
356        if dyn: 
357            # Compose the dynamic project request
358            # (only dynamic, dynamic currently allowed)
359            preq = { 'AllocateProjectRequestBody': \
360                        { 'project' : {\
361                            'user': [ \
362                            { \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                 'role': "serviceAccess",\
366                            }, \
367                            { \
368                                'access': [ { 
369                                    'sshPubkey': self.ssh_pubkey_file } ],
370                                 'role': "experimentCreation",\
371                            }, \
372                            ], \
373                            }\
374                        }\
375                    }
376            return self.allocate_project.dynamic_project(preq)
377        else:
378            preq = {'StaticProjectRequestBody' : \
379                    { 'project': \
380                        { 'name' : { 'localname' : project },\
381                          'user' : [ \
382                            {\
383                                'userID': { 'localname' : user }, \
384                                'access': [ { 
385                                    'sshPubkey': self.ssh_pubkey_file } ],
386                                'role': 'experimentCreation'\
387                            },\
388                            {\
389                                'userID': { 'localname' : user}, \
390                                'access': [ { 
391                                    'sshPubkey': self.ssh_pubkey_file } ],
392                                'role': 'serviceAccess'\
393                            },\
394                        ]}\
395                    }\
396            }
397            return self.allocate_project.static_project(preq)
398
399    def save_project_state(self, aid, ap, dyn, owners):
400        """
401        Parse out and save the information relevant to the project created for
402        this experiment.  That info is largely in ap and owners.  dyn indicates
403        that the project was created dynamically.  Return the user and project
404        names.
405        """
406        self.state_lock.acquire()
407        self.allocation[aid] = { }
408        try:
409            pname = ap['project']['name']['localname']
410        except KeyError:
411            pname = None
412
413        if dyn:
414            if not pname:
415                self.state_lock.release()
416                raise service_error(service_error.internal,
417                        "Misformed allocation response?")
418            if pname in self.projects: self.projects[pname] += 1
419            else: self.projects[pname] = 1
420            self.allocation[aid]['project'] = pname
421        else:
422            # sproject is a static project associated with this allocation.
423            self.allocation[aid]['sproject'] = pname
424
425        self.allocation[aid]['keys'] = [ ]
426
427        try:
428            for u in ap['project']['user']:
429                uname = u['userID']['localname']
430                if u['role'] == 'experimentCreation':
431                    self.allocation[aid]['user'] = uname
432                for k in [ k['sshPubkey'] for k in u['access'] \
433                        if k.has_key('sshPubkey') ]:
434                    kv = "%s:%s" % (uname, k)
435                    if self.keys.has_key(kv): self.keys[kv] += 1
436                    else: self.keys[kv] = 1
437                    self.allocation[aid]['keys'].append((uname, k))
438        except KeyError:
439            self.state_lock.release()
440            raise service_error(service_error.internal,
441                    "Misformed allocation response?")
442
443        self.allocation[aid]['owners'] = owners
444        self.write_state()
445        self.state_lock.release()
446        return (pname, uname)
447
448    # End of RequestAccess support routines
449
450    def RequestAccess(self, req, fid):
451        """
452        Handle the access request.  Proxy if not for us.
453
454        Parse out the fields and make the allocations or rejections if for us,
455        otherwise, assuming we're willing to proxy, proxy the request out.
456        """
457
458        def gateway_hardware(h):
459            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
460            else: return h
461
462        def get_export_project(svcs):
463            """
464            if the service requests includes one to export a project, return
465            that project.
466            """
467            rv = None
468            for s in svcs:
469                if s.get('name', '') == 'project_export' and \
470                        s.get('visibility', '') == 'export':
471                    if not rv: 
472                        for a in s.get('feddAttr', []):
473                            if a.get('attribute', '') == 'project' \
474                                    and 'value' in a:
475                                rv = a['value']
476                    else:
477                        raise service_error(service_error, access, 
478                                'Requesting multiple project exports is ' + \
479                                        'not supported');
480            return rv
481
482        # The dance to get into the request body
483        if req.has_key('RequestAccessRequestBody'):
484            req = req['RequestAccessRequestBody']
485        else:
486            raise service_error(service_error.req, "No request!?")
487
488        alog = open("./auth.log", 'w')
489        print >>alog, self.auth
490        print >> alog, "after"
491        if self.auth.import_credentials(
492                data_list=req.get('abac_credential', [])):
493            self.auth.save()
494        print >>alog, self.auth
495        alog.close()
496
497        if self.auth_type == "legacy":
498            found, dyn, owners = self.legacy_lookup_access(req, fid)
499        elif self.auth_type == 'abac':
500            found, dyn, owners = self.lookup_access(req, fid)
501        else:
502            raise service_error(service_error.internal, 
503                    'Unknown auth_type: %s' % self.auth_type)
504        ap = None
505
506        # if this includes a project export request and the exported
507        # project is not the access project, access denied.
508        if 'service' in req:
509            ep = get_export_project(req['service'])
510            if ep and ep != found[0]:
511                raise service_error(service_error.access,
512                        "Cannot export %s" % ep)
513
514        if self.ssh_pubkey_file:
515            ap = self.do_project_allocation(dyn[1], found[0], found[1])
516        else:
517            raise service_error(service_error.internal, 
518                    "SSH access parameters required")
519        # keep track of what's been added
520        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
521        aid = unicode(allocID)
522
523        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
524
525        services, svc_state = self.export_services(req.get('service',[]),
526                pname, uname)
527        self.state_lock.acquire()
528        # Store services state in global state
529        for k, v in svc_state.items():
530            self.allocation[aid][k] = v
531        self.write_state()
532        self.state_lock.release()
533        # Give the owners the right to change this allocation
534        for o in owners:
535            self.auth.set_attribute(o, allocID)
536        self.auth.save()
537        try:
538            f = open("%s/%s.pem" % (self.certdir, aid), "w")
539            print >>f, alloc_cert
540            f.close()
541        except EnvironmentError, e:
542            raise service_error(service_error.internal, 
543                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
544        resp = self.build_access_response({ 'fedid': allocID } ,
545                ap, services)
546        return resp
547
548    def do_release_project(self, del_project, del_users, del_types):
549        """
550        If a project and users has to be deleted, make the call.
551        """
552        msg = { 'project': { }}
553        if del_project:
554            msg['project']['name']= {'localname': del_project}
555        users = [ ]
556        for u in del_users.keys():
557            users.append({ 'userID': { 'localname': u },\
558                'access' :  \
559                        [ {'sshPubkey' : s } for s in del_users[u]]\
560            })
561        if users: 
562            msg['project']['user'] = users
563        if len(del_types) > 0:
564            msg['resources'] = { 'node': \
565                    [ {'hardware': [ h ] } for h in del_types ]\
566                }
567        if self.allocate_project.release_project:
568            msg = { 'ReleaseProjectRequestBody' : msg}
569            self.allocate_project.release_project(msg)
570
571    def ReleaseAccess(self, req, fid):
572        # The dance to get into the request body
573        if req.has_key('ReleaseAccessRequestBody'):
574            req = req['ReleaseAccessRequestBody']
575        else:
576            raise service_error(service_error.req, "No request!?")
577
578        try:
579            if req['allocID'].has_key('localname'):
580                auth_attr = aid = req['allocID']['localname']
581            elif req['allocID'].has_key('fedid'):
582                aid = unicode(req['allocID']['fedid'])
583                auth_attr = req['allocID']['fedid']
584            else:
585                raise service_error(service_error.req,
586                        "Only localnames and fedids are understood")
587        except KeyError:
588            raise service_error(service_error.req, "Badly formed request")
589
590        self.log.debug("[access] deallocation requested for %s by %s" % \
591                (aid, fid))
592        if not self.auth.check_attribute(fid, auth_attr):
593            self.log.debug("[access] deallocation denied for %s", aid)
594            raise service_error(service_error.access, "Access Denied")
595
596        # If we know this allocation, reduce the reference counts and
597        # remove the local allocations.  Otherwise report an error.  If
598        # there is an allocation to delete, del_users will be a dictonary
599        # of sets where the key is the user that owns the keys in the set.
600        # We use a set to avoid duplicates.  del_project is just the name
601        # of any dynamic project to delete.  We're somewhat lazy about
602        # deleting authorization attributes.  Having access to something
603        # that doesn't exist isn't harmful.
604        del_users = { }
605        del_project = None
606        del_types = set()
607
608        self.state_lock.acquire()
609        if aid in self.allocation:
610            self.log.debug("Found allocation for %s" %aid)
611            for k in self.allocation[aid]['keys']:
612                kk = "%s:%s" % k
613                self.keys[kk] -= 1
614                if self.keys[kk] == 0:
615                    if not del_users.has_key(k[0]):
616                        del_users[k[0]] = set()
617                    del_users[k[0]].add(k[1])
618                    del self.keys[kk]
619
620            if 'project' in self.allocation[aid]:
621                pname = self.allocation[aid]['project']
622                self.projects[pname] -= 1
623                if self.projects[pname] == 0:
624                    del_project = pname
625                    del self.projects[pname]
626
627            if 'types' in self.allocation[aid]:
628                for t in self.allocation[aid]['types']:
629                    self.types[t] -= 1
630                    if self.types[t] == 0:
631                        if not del_project: del_project = t[0]
632                        del_types.add(t[1])
633                        del self.types[t]
634
635            del self.allocation[aid]
636            self.write_state()
637            self.state_lock.release()
638            # If we actually have resources to deallocate, prepare the call.
639            if del_project or del_users:
640                self.do_release_project(del_project, del_users, del_types)
641            # And remove the access cert
642            cf = "%s/%s.pem" % (self.certdir, aid)
643            self.log.debug("Removing %s" % cf)
644            os.remove(cf)
645            return { 'allocID': req['allocID'] } 
646        else:
647            self.state_lock.release()
648            raise service_error(service_error.req, "No such allocation")
649
650    # These are subroutines for StartSegment
651    def generate_ns2(self, topo, expfn, softdir, connInfo):
652        """
653        Convert topo into an ns2 file, decorated with appropriate commands for
654        the particular testbed setup.  Convert all requests for software, etc
655        to point at the staged copies on this testbed and add the federation
656        startcommands.
657        """
658        class dragon_commands:
659            """
660            Functor to spit out approrpiate dragon commands for nodes listed in
661            the connectivity description.  The constructor makes a dict mapping
662            dragon nodes to their parameters and the __call__ checks each
663            element in turn for membership.
664            """
665            def __init__(self, map):
666                self.node_info = map
667
668            def __call__(self, e):
669                s = ""
670                if isinstance(e, topdl.Computer):
671                    if self.node_info.has_key(e.name):
672                        info = self.node_info[e.name]
673                        for ifname, vlan, type in info:
674                            for i in e.interface:
675                                if i.name == ifname:
676                                    addr = i.get_attribute('ip4_address')
677                                    subs = i.substrate[0]
678                                    break
679                            else:
680                                raise service_error(service_error.internal,
681                                        "No interface %s on element %s" % \
682                                                (ifname, e.name))
683                            # XXX: do netmask right
684                            if type =='link':
685                                s = ("tb-allow-external ${%s} " + \
686                                        "dragonportal ip %s vlan %s " + \
687                                        "netmask 255.255.255.0\n") % \
688                                        (topdl.to_tcl_name(e.name), addr, vlan)
689                            elif type =='lan':
690                                s = ("tb-allow-external ${%s} " + \
691                                        "dragonportal " + \
692                                        "ip %s vlan %s usurp %s\n") % \
693                                        (topdl.to_tcl_name(e.name), addr, 
694                                                vlan, subs)
695                            else:
696                                raise service_error(service_error_internal,
697                                        "Unknown DRAGON type %s" % type)
698                return s
699
700        class not_dragon:
701            """
702            Return true if a node is in the given map of dragon nodes.
703            """
704            def __init__(self, map):
705                self.nodes = set(map.keys())
706
707            def __call__(self, e):
708                return e.name not in self.nodes
709
710        # Main line of generate_ns2
711        t = topo.clone()
712
713        # Create the map of nodes that need direct connections (dragon
714        # connections) from the connInfo
715        dragon_map = { }
716        for i in [ i for i in connInfo if i['type'] == 'transit']:
717            for a in i.get('fedAttr', []):
718                if a['attribute'] == 'vlan_id':
719                    vlan = a['value']
720                    break
721            else:
722                raise service_error(service_error.internal, "No vlan tag")
723            members = i.get('member', [])
724            if len(members) > 1: type = 'lan'
725            else: type = 'link'
726
727            try:
728                for m in members:
729                    if m['element'] in dragon_map:
730                        dragon_map[m['element']].append(( m['interface'], 
731                            vlan, type))
732                    else:
733                        dragon_map[m['element']] = [( m['interface'], 
734                            vlan, type),]
735            except KeyError:
736                raise service_error(service_error.req,
737                        "Missing connectivity info")
738
739        # Weed out the things we aren't going to instantiate: Segments, portal
740        # substrates, and portal interfaces.  (The copy in the for loop allows
741        # us to delete from e.elements in side the for loop).  While we're
742        # touching all the elements, we also adjust paths from the original
743        # testbed to local testbed paths and put the federation commands into
744        # the start commands
745        for e in [e for e in t.elements]:
746            if isinstance(e, topdl.Segment):
747                t.elements.remove(e)
748            if isinstance(e, topdl.Computer):
749                self.add_kit(e, self.federation_software)
750                if e.get_attribute('portal') and self.portal_startcommand:
751                    # Add local portal support software
752                    self.add_kit(e, self.portal_software)
753                    # Portals never have a user-specified start command
754                    e.set_attribute('startup', self.portal_startcommand)
755                elif self.node_startcommand:
756                    if e.get_attribute('startup'):
757                        e.set_attribute('startup', "%s \\$USER '%s'" % \
758                                (self.node_startcommand, 
759                                    e.get_attribute('startup')))
760                    else:
761                        e.set_attribute('startup', self.node_startcommand)
762
763                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
764                # Remove portal interfaces that do not connect to DRAGON
765                e.interface = [i for i in e.interface \
766                        if not i.get_attribute('portal') or i.name in dinf ]
767            # Fix software paths
768            for s in getattr(e, 'software', []):
769                s.location = re.sub("^.*/", softdir, s.location)
770
771        t.substrates = [ s.clone() for s in t.substrates ]
772        t.incorporate_elements()
773
774        # Customize the ns2 output for local portal commands and images
775        filters = []
776
777        if self.dragon_endpoint:
778            add_filter = not_dragon(dragon_map)
779            filters.append(dragon_commands(dragon_map))
780        else:
781            add_filter = None
782
783        if self.portal_command:
784            filters.append(topdl.generate_portal_command_filter(
785                self.portal_command, add_filter=add_filter))
786
787        if self.portal_image:
788            filters.append(topdl.generate_portal_image_filter(
789                self.portal_image))
790
791        if self.portal_type:
792            filters.append(topdl.generate_portal_hardware_filter(
793                self.portal_type))
794
795        # Convert to ns and write it out
796        expfile = topdl.topology_to_ns2(t, filters)
797        try:
798            f = open(expfn, "w")
799            print >>f, expfile
800            f.close()
801        except EnvironmentError:
802            raise service_error(service_error.internal,
803                    "Cannot write experiment file %s: %s" % (expfn,e))
804
805    def export_store_info(self, cf, proj, ename, connInfo):
806        """
807        For the export requests in the connection info, install the peer names
808        at the experiment controller via SetValue calls.
809        """
810
811        for c in connInfo:
812            for p in [ p for p in c.get('parameter', []) \
813                    if p.get('type', '') == 'output']:
814
815                if p.get('name', '') == 'peer':
816                    k = p.get('key', None)
817                    surl = p.get('store', None)
818                    if surl and k and k.index('/') != -1:
819                        value = "%s.%s.%s%s" % \
820                                (k[k.index('/')+1:], ename, proj, self.domain)
821                        req = { 'name': k, 'value': value }
822                        self.log.debug("Setting %s to %s on %s" % \
823                                (k, value, surl))
824                        self.call_SetValue(surl, req, cf)
825                    else:
826                        self.log.error("Bad export request: %s" % p)
827                elif p.get('name', '') == 'ssh_port':
828                    k = p.get('key', None)
829                    surl = p.get('store', None)
830                    if surl and k:
831                        req = { 'name': k, 'value': self.ssh_port }
832                        self.log.debug("Setting %s to %s on %s" % \
833                                (k, self.ssh_port, surl))
834                        self.call_SetValue(surl, req, cf)
835                    else:
836                        self.log.error("Bad export request: %s" % p)
837                else:
838                    self.log.error("Unknown export parameter: %s" % \
839                            p.get('name'))
840                    continue
841
842    def add_seer_node(self, topo, name, startup):
843        """
844        Add a seer node to the given topology, with the startup command passed
845        in.  Used by configure seer_services.
846        """
847        c_node = topdl.Computer(
848                name=name, 
849                os= topdl.OperatingSystem(
850                    attribute=[
851                    { 'attribute': 'osid', 
852                        'value': self.local_seer_image },
853                    ]),
854                attribute=[
855                    { 'attribute': 'startup', 'value': startup },
856                    ]
857                )
858        self.add_kit(c_node, self.local_seer_software)
859        topo.elements.append(c_node)
860
861    def configure_seer_services(self, services, topo, softdir):
862        """
863        Make changes to the topology required for the seer requests being made.
864        Specifically, add any control or master nodes required and set up the
865        start commands on the nodes to interconnect them.
866        """
867        local_seer = False      # True if we need to add a control node
868        collect_seer = False    # True if there is a seer-master node
869        seer_master= False      # True if we need to add the seer-master
870        for s in services:
871            s_name = s.get('name', '')
872            s_vis = s.get('visibility','')
873
874            if s_name  == 'local_seer_control' and s_vis == 'export':
875                local_seer = True
876            elif s_name == 'seer_master':
877                if s_vis == 'import':
878                    collect_seer = True
879                elif s_vis == 'export':
880                    seer_master = True
881       
882        # We've got the whole picture now, so add nodes if needed and configure
883        # them to interconnect properly.
884        if local_seer or seer_master:
885            # Copy local seer control node software to the tempdir
886            for l, f in self.local_seer_software:
887                base = os.path.basename(f)
888                copy_file(f, "%s/%s" % (softdir, base))
889        # If we're collecting seers somewhere the controllers need to talk to
890        # the master.  In testbeds that export the service, that will be a
891        # local node that we'll add below.  Elsewhere it will be the control
892        # portal that will port forward to the exporting master.
893        if local_seer:
894            if collect_seer:
895                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
896            else:
897                startup = self.local_seer_start
898            self.add_seer_node(topo, 'control', startup)
899        # If this is the seer master, add that node, too.
900        if seer_master:
901            self.add_seer_node(topo, 'seer-master', 
902                    "%s -R -n -R seer-master -R -A -R sink" % \
903                            self.seer_master_start)
904
905    def retrieve_software(self, topo, certfile, softdir):
906        """
907        Collect the software that nodes in the topology need loaded and stage
908        it locally.  This implies retrieving it from the experiment_controller
909        and placing it into softdir.  Certfile is used to prove that this node
910        has access to that data (it's the allocation/segment fedid).  Finally
911        local portal and federation software is also copied to the same staging
912        directory for simplicity - all software needed for experiment creation
913        is in softdir.
914        """
915        sw = set()
916        for e in topo.elements:
917            for s in getattr(e, 'software', []):
918                sw.add(s.location)
919        for s in sw:
920            self.log.debug("Retrieving %s" % s)
921            try:
922                get_url(s, certfile, softdir)
923            except:
924                t, v, st = sys.exc_info()
925                raise service_error(service_error.internal,
926                        "Error retrieving %s: %s" % (s, v))
927
928        # Copy local federation and portal node software to the tempdir
929        for s in (self.federation_software, self.portal_software):
930            for l, f in s:
931                base = os.path.basename(f)
932                copy_file(f, "%s/%s" % (softdir, base))
933
934
935    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
936        """
937        Gather common configuration files, retrieve or create an experiment
938        name and project name, and return the ssh_key filenames.  Create an
939        allocation log bound to the state log variable as well.
940        """
941        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
942        ename = None
943        pubkey_base = None
944        secretkey_base = None
945        proj = None
946        user = None
947        alloc_log = None
948
949        for a in attrs:
950            if a['attribute'] in configs:
951                try:
952                    self.log.debug("Retrieving %s from %s" % \
953                            (a['attribute'], a['value']))
954                    get_url(a['value'], certfile, tmpdir)
955                except:
956                    t, v, st = sys.exc_info()
957                    raise service_error(service_error.internal,
958                            "Error retrieving %s: %s" % (a.get('value', ""), v))
959            if a['attribute'] == 'ssh_pubkey':
960                pubkey_base = a['value'].rpartition('/')[2]
961            if a['attribute'] == 'ssh_secretkey':
962                secretkey_base = a['value'].rpartition('/')[2]
963            if a['attribute'] == 'experiment_name':
964                ename = a['value']
965
966        if not ename:
967            ename = ""
968            for i in range(0,5):
969                ename += random.choice(string.ascii_letters)
970            self.log.warn("No experiment name: picked one randomly: %s" \
971                    % ename)
972
973        if not pubkey_base:
974            raise service_error(service_error.req, 
975                    "No public key attribute")
976
977        if not secretkey_base:
978            raise service_error(service_error.req, 
979                    "No secret key attribute")
980
981        self.state_lock.acquire()
982        if aid in self.allocation:
983            proj = self.allocation[aid].get('project', None)
984            if not proj: 
985                proj = self.allocation[aid].get('sproject', None)
986            user = self.allocation[aid].get('user', None)
987            self.allocation[aid]['experiment'] = ename
988            self.allocation[aid]['log'] = [ ]
989            # Create a logger that logs to the experiment's state object as
990            # well as to the main log file.
991            alloc_log = logging.getLogger('fedd.access.%s' % ename)
992            h = logging.StreamHandler(
993                    list_log.list_log(self.allocation[aid]['log']))
994            # XXX: there should be a global one of these rather than
995            # repeating the code.
996            h.setFormatter(logging.Formatter(
997                "%(asctime)s %(name)s %(message)s",
998                        '%d %b %y %H:%M:%S'))
999            alloc_log.addHandler(h)
1000            self.write_state()
1001        self.state_lock.release()
1002
1003        if not proj:
1004            raise service_error(service_error.internal, 
1005                    "Can't find project for %s" %aid)
1006
1007        if not user:
1008            raise service_error(service_error.internal, 
1009                    "Can't find creation user for %s" %aid)
1010
1011        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1012
1013    def finalize_experiment(self, starter, topo, aid, alloc_id):
1014        """
1015        Store key bits of experiment state in the global repository, including
1016        the response that may need to be replayed, and return the response.
1017        """
1018        # Copy the assigned names into the return topology
1019        embedding = [ ]
1020        for n in starter.node:
1021            embedding.append({ 
1022                'toponame': n,
1023                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1024                })
1025        # Grab the log (this is some anal locking, but better safe than
1026        # sorry)
1027        self.state_lock.acquire()
1028        logv = "".join(self.allocation[aid]['log'])
1029        # It's possible that the StartSegment call gets retried (!).
1030        # if the 'started' key is in the allocation, we'll return it rather
1031        # than redo the setup.
1032        self.allocation[aid]['started'] = { 
1033                'allocID': alloc_id,
1034                'allocationLog': logv,
1035                'segmentdescription': { 
1036                    'topdldescription': topo.clone().to_dict()
1037                    },
1038                'embedding': embedding
1039                }
1040        retval = copy.copy(self.allocation[aid]['started'])
1041        self.write_state()
1042        self.state_lock.release()
1043        return retval
1044   
1045    # End of StartSegment support routines
1046
1047    def StartSegment(self, req, fid):
1048        err = None  # Any service_error generated after tmpdir is created
1049        rv = None   # Return value from segment creation
1050
1051        try:
1052            req = req['StartSegmentRequestBody']
1053            auth_attr = req['allocID']['fedid']
1054            topref = req['segmentdescription']['topdldescription']
1055        except KeyError:
1056            raise service_error(server_error.req, "Badly formed request")
1057
1058        connInfo = req.get('connection', [])
1059        services = req.get('service', [])
1060        aid = "%s" % auth_attr
1061        attrs = req.get('fedAttr', [])
1062        if not self.auth.check_attribute(fid, auth_attr):
1063            raise service_error(service_error.access, "Access denied")
1064        else:
1065            # See if this is a replay of an earlier succeeded StartSegment -
1066            # sometimes SSL kills 'em.  If so, replay the response rather than
1067            # redoing the allocation.
1068            self.state_lock.acquire()
1069            retval = self.allocation[aid].get('started', None)
1070            self.state_lock.release()
1071            if retval:
1072                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1073                        "replaying response")
1074                return retval
1075
1076        # A new request.  Do it.
1077
1078        if topref: topo = topdl.Topology(**topref)
1079        else:
1080            raise service_error(service_error.req, 
1081                    "Request missing segmentdescription'")
1082       
1083        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1084        try:
1085            tmpdir = tempfile.mkdtemp(prefix="access-")
1086            softdir = "%s/software" % tmpdir
1087            os.mkdir(softdir)
1088        except EnvironmentError:
1089            raise service_error(service_error.internal, "Cannot create tmp dir")
1090
1091        # Try block alllows us to clean up temporary files.
1092        try:
1093            self.retrieve_software(topo, certfile, softdir)
1094            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1095                    self.initialize_experiment_info(attrs, aid, 
1096                            certfile, tmpdir)
1097
1098            # Set up userconf and seer if needed
1099            self.configure_userconf(services, tmpdir)
1100            self.configure_seer_services(services, topo, softdir)
1101            # Get and send synch store variables
1102            self.export_store_info(certfile, proj, ename, connInfo)
1103            self.import_store_info(certfile, connInfo)
1104
1105            expfile = "%s/experiment.tcl" % tmpdir
1106
1107            self.generate_portal_configs(topo, pubkey_base, 
1108                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1109            self.generate_ns2(topo, expfile, 
1110                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1111
1112            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1113                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1114                    cert=self.xmlrpc_cert)
1115            rv = starter(self, ename, proj, user, expfile, tmpdir)
1116        except service_error, e:
1117            err = e
1118        except:
1119            t, v, st = sys.exc_info()
1120            err = service_error(service_error.internal, "%s: %s" % \
1121                    (v, traceback.extract_tb(st)))
1122
1123        # Walk up tmpdir, deleting as we go
1124        if self.cleanup: self.remove_dirs(tmpdir)
1125        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1126
1127        if rv:
1128            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1129        elif err:
1130            raise service_error(service_error.federant,
1131                    "Swapin failed: %s" % err)
1132        else:
1133            raise service_error(service_error.federant, "Swapin failed")
1134
1135    def TerminateSegment(self, req, fid):
1136        try:
1137            req = req['TerminateSegmentRequestBody']
1138        except KeyError:
1139            raise service_error(server_error.req, "Badly formed request")
1140
1141        auth_attr = req['allocID']['fedid']
1142        aid = "%s" % auth_attr
1143        attrs = req.get('fedAttr', [])
1144        if not self.auth.check_attribute(fid, auth_attr):
1145            raise service_error(service_error.access, "Access denied")
1146
1147        self.state_lock.acquire()
1148        if aid in self.allocation:
1149            proj = self.allocation[aid].get('project', None)
1150            if not proj: 
1151                proj = self.allocation[aid].get('sproject', None)
1152            user = self.allocation[aid].get('user', None)
1153            ename = self.allocation[aid].get('experiment', None)
1154        else:
1155            proj = None
1156            user = None
1157            ename = None
1158        self.state_lock.release()
1159
1160        if not proj:
1161            raise service_error(service_error.internal, 
1162                    "Can't find project for %s" % aid)
1163
1164        if not user:
1165            raise service_error(service_error.internal, 
1166                    "Can't find creation user for %s" % aid)
1167        if not ename:
1168            raise service_error(service_error.internal, 
1169                    "Can't find experiment name for %s" % aid)
1170        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1171                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1172        stopper(self, user, proj, ename)
1173        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.