source: fedd/federation/emulab_access.py @ c573278

axis_examplecompt_changesinfo-ops
Last change on this file since c573278 was c573278, checked in by Ted Faber <faber@…>, 13 years ago

Checkpoint. Still lots to do

  • Property mode set to 100644
File size: 38.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        # XXX: this should go?
108        #if config.has_option("access", "accessdb"):
109        #    self.read_access(config.get("access", "accessdb"))
110        tb = config.get('access', 'testbed')
111        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
112        else: self.testbed = [ ]
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            if accessdb:
122                self.read_access(accessdb, self.make_access_project)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            if accessdb:
126                self.read_abac_access(accessdb, self.make_abac_access_project)
127        else:
128            raise service_error(service_error.internal, 
129                    "Unknown auth_type: %s" % self.auth_type)
130
131        # read_state in the base_class
132        self.state_lock.acquire()
133        for a  in ('allocation', 'projects', 'keys', 'types'):
134            if a not in self.state:
135                self.state[a] = { }
136        self.allocation = self.state['allocation']
137        self.projects = self.state['projects']
138        self.keys = self.state['keys']
139        self.types = self.state['types']
140        if self.auth_type == "legacy": 
141            # Add the ownership attributes to the authorizer.  Note that the
142            # indices of the allocation dict are strings, but the attributes are
143            # fedids, so there is a conversion.
144            for k in self.allocation.keys():
145                for o in self.allocation[k].get('owners', []):
146                    self.auth.set_attribute(o, fedid(hexstr=k))
147                if self.allocation[k].has_key('userconfig'):
148                    sfid = self.allocation[k]['userconfig']
149                    fid = fedid(hexstr=sfid)
150                    self.auth.set_attribute(fid, "/%s" % sfid)
151        self.state_lock.release()
152        self.exports = {
153                'SMB': self.export_SMB,
154                'seer': self.export_seer,
155                'tmcd': self.export_tmcd,
156                'userconfig': self.export_userconfig,
157                'project_export': self.export_project_export,
158                'local_seer_control': self.export_local_seer,
159                'seer_master': self.export_seer_master,
160                'hide_hosts': self.export_hide_hosts,
161                }
162
163        if not self.local_seer_image or not self.local_seer_software or \
164                not self.local_seer_start:
165            if 'local_seer_control' in self.exports:
166                del self.exports['local_seer_control']
167
168        if not self.local_seer_image or not self.local_seer_software or \
169                not self.seer_master_start:
170            if 'seer_master' in self.exports:
171                del self.exports['seer_master']
172
173
174        self.soap_services = {\
175            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
176            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
177            'StartSegment': soap_handler("StartSegment", self.StartSegment),
178            'TerminateSegment': soap_handler("TerminateSegment",
179                self.TerminateSegment),
180            }
181        self.xmlrpc_services =  {\
182            'RequestAccess': xmlrpc_handler('RequestAccess',
183                self.RequestAccess),
184            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
185                self.ReleaseAccess),
186            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': xmlrpc_handler('TerminateSegment',
188                self.TerminateSegment),
189            }
190
191        self.call_SetValue = service_caller('SetValue')
192        self.call_GetValue = service_caller('GetValue', log=self.log)
193
194        if not config.has_option("allocate", "uri"):
195            self.allocate_project = \
196                allocate_project_local(config, auth)
197        else:
198            self.allocate_project = \
199                allocate_project_remote(config, auth)
200
201
202        # If the project allocator exports services, put them in this object's
203        # maps so that classes that instantiate this can call the services.
204        self.soap_services.update(self.allocate_project.soap_services)
205        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
206
207    @staticmethod
208    def make_access_project(str):
209        """
210        Convert a string of the form (id[:resources:resouces], id, id) into an
211        access_project.  This is called by read_access to convert to local
212        attributes.  It returns a tuple of the form (project, user, user) where
213        users may be names or fedids.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = access_project(first[0], first[1:])
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def make_abac_access_project(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_abac_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            proj, user = str.split(',')
250            return ( proj.strip(), user.strip(), user.strip())
251        else:
252            raise self.parse_error(
253                    'Bad mapping (unbalanced parens or more than 1 comma)')
254
255
256    # RequestAccess support routines
257
258    def lookup_access(self, req, fid):
259        """
260        Look up the local access control information mapped to this fedid and
261        credentials.  In this case it is a (project, create_user, access_user)
262        triple, and a triple of three booleans indicating which, if any will
263        need to be dynamically created.  Finally a list of owners for that
264        allocation is returned.
265
266        lookup_access_base pulls the first triple out, and it is parsed by this
267        routine into the boolean map.  Owners is always the controlling fedid.
268        """
269        # Return values
270        rp = access_project(None, ())
271        ru = None
272        # This maps a valid user to the Emulab projects and users to use
273        found, match = self.lookup_access_base(req, fid)
274        tb, project, user = match
275       
276        if found == None:
277            raise service_error(service_error.access,
278                    "Access denied - cannot map access")
279
280        # resolve <dynamic> and <same> in found
281        dyn_proj = False
282        dyn_create_user = False
283        dyn_service_user = False
284
285        if found[0].name == "<same>":
286            if project != None:
287                rp.name = project
288            else : 
289                raise service_error(\
290                        service_error.server_config,
291                        "Project matched <same> when no project given")
292        elif found[0].name == "<dynamic>":
293            rp.name = None
294            dyn_proj = True
295        else:
296            rp.name = found[0].name
297        rp.node_types = found[0].node_types;
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def lookup_abac_access(self, req, fid): 
331        # Import request credentials into this (clone later??)
332        if self.auth.import_credentials(data_list=req.get('abac_credential', [])):
333            self.auth.save()
334
335        # Check every attribute that we know how to map and take the first
336        # success.
337        print "%s" %self.auth
338        for attr in (self.access.keys()):
339            if self.auth.check_attribute(fid, attr):
340                # XXX: needs to deal with dynamics
341                return copy.copy(self.access[attr]), (False, False, False), \
342                        [ fid ]
343            else:
344                self.log.debug("Access failed for %s %s" % (attr, fid))
345        else:
346            raise service_error(service_error.access, "Access denied")
347
348
349    def do_project_allocation(self, dyn, project, user):
350        """
351        Call the project allocation routines and return the info.
352        """
353        if dyn: 
354            # Compose the dynamic project request
355            # (only dynamic, dynamic currently allowed)
356            preq = { 'AllocateProjectRequestBody': \
357                        { 'project' : {\
358                            'user': [ \
359                            { \
360                                'access': [ { 
361                                    'sshPubkey': self.ssh_pubkey_file } ],
362                                 'role': "serviceAccess",\
363                            }, \
364                            { \
365                                'access': [ { 
366                                    'sshPubkey': self.ssh_pubkey_file } ],
367                                 'role': "experimentCreation",\
368                            }, \
369                            ], \
370                            }\
371                        }\
372                    }
373            return self.allocate_project.dynamic_project(preq)
374        else:
375            preq = {'StaticProjectRequestBody' : \
376                    { 'project': \
377                        { 'name' : { 'localname' : project },\
378                          'user' : [ \
379                            {\
380                                'userID': { 'localname' : user }, \
381                                'access': [ { 
382                                    'sshPubkey': self.ssh_pubkey_file } ],
383                                'role': 'experimentCreation'\
384                            },\
385                            {\
386                                'userID': { 'localname' : user}, \
387                                'access': [ { 
388                                    'sshPubkey': self.ssh_pubkey_file } ],
389                                'role': 'serviceAccess'\
390                            },\
391                        ]}\
392                    }\
393            }
394            return self.allocate_project.static_project(preq)
395
396    def save_project_state(self, aid, ap, dyn, owners):
397        """
398        Parse out and save the information relevant to the project created for
399        this experiment.  That info is largely in ap and owners.  dyn indicates
400        that the project was created dynamically.  Return the user and project
401        names.
402        """
403        self.state_lock.acquire()
404        self.allocation[aid] = { }
405        try:
406            pname = ap['project']['name']['localname']
407        except KeyError:
408            pname = None
409
410        if dyn:
411            if not pname:
412                self.state_lock.release()
413                raise service_error(service_error.internal,
414                        "Misformed allocation response?")
415            if pname in self.projects: self.projects[pname] += 1
416            else: self.projects[pname] = 1
417            self.allocation[aid]['project'] = pname
418        else:
419            # sproject is a static project associated with this allocation.
420            self.allocation[aid]['sproject'] = pname
421
422        self.allocation[aid]['keys'] = [ ]
423
424        try:
425            for u in ap['project']['user']:
426                uname = u['userID']['localname']
427                if u['role'] == 'experimentCreation':
428                    self.allocation[aid]['user'] = uname
429                for k in [ k['sshPubkey'] for k in u['access'] \
430                        if k.has_key('sshPubkey') ]:
431                    kv = "%s:%s" % (uname, k)
432                    if self.keys.has_key(kv): self.keys[kv] += 1
433                    else: self.keys[kv] = 1
434                    self.allocation[aid]['keys'].append((uname, k))
435        except KeyError:
436            self.state_lock.release()
437            raise service_error(service_error.internal,
438                    "Misformed allocation response?")
439
440        self.allocation[aid]['owners'] = owners
441        self.write_state()
442        self.state_lock.release()
443        return (pname, uname)
444
445    # End of RequestAccess support routines
446
447    def RequestAccess(self, req, fid):
448        """
449        Handle the access request.  Proxy if not for us.
450
451        Parse out the fields and make the allocations or rejections if for us,
452        otherwise, assuming we're willing to proxy, proxy the request out.
453        """
454
455        def gateway_hardware(h):
456            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
457            else: return h
458
459        def get_export_project(svcs):
460            """
461            if the service requests includes one to export a project, return
462            that project.
463            """
464            rv = None
465            for s in svcs:
466                if s.get('name', '') == 'project_export' and \
467                        s.get('visibility', '') == 'export':
468                    if not rv: 
469                        for a in s.get('feddAttr', []):
470                            if a.get('attribute', '') == 'project' \
471                                    and 'value' in a:
472                                rv = a['value']
473                    else:
474                        raise service_error(service_error, access, 
475                                'Requesting multiple project exports is ' + \
476                                        'not supported');
477            return rv
478
479        # The dance to get into the request body
480        if req.has_key('RequestAccessRequestBody'):
481            req = req['RequestAccessRequestBody']
482        else:
483            raise service_error(service_error.req, "No request!?")
484
485        alog = open("./auth.log", 'w')
486        print >>alog, self.auth
487        print >> alog, "after"
488        if self.auth.import_credentials(
489                data_list=req.get('abac_credential', [])):
490            self.auth.save()
491        print >>alog, self.auth
492        alog.close()
493
494        if self.auth_type == "legacy":
495            found, dyn, owners = self.lookup_access(req, fid)
496        elif self.auth_type == 'abac':
497            found, dyn, owners = self.lookup_abac_access(req, fid)
498        else:
499            raise service_error(service_error.internal, 
500                    'Unknown auth_type: %s' % self.auth_type)
501        ap = None
502
503        # if this includes a project export request and the exported
504        # project is not the access project, access denied.
505        if 'service' in req:
506            ep = get_export_project(req['service'])
507            if ep and ep != found[0].name:
508                raise service_error(service_error.access,
509                        "Cannot export %s" % ep)
510
511        if self.ssh_pubkey_file:
512            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
513        else:
514            raise service_error(service_error.internal, 
515                    "SSH access parameters required")
516        # keep track of what's been added
517        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
518        aid = unicode(allocID)
519
520        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
521
522        services, svc_state = self.export_services(req.get('service',[]),
523                pname, uname)
524        self.state_lock.acquire()
525        # Store services state in global state
526        for k, v in svc_state.items():
527            self.allocation[aid][k] = v
528        self.write_state()
529        self.state_lock.release()
530        # Give the owners the right to change this allocation
531        for o in owners:
532            self.auth.set_attribute(o, allocID)
533        self.auth.save()
534        try:
535            f = open("%s/%s.pem" % (self.certdir, aid), "w")
536            print >>f, alloc_cert
537            f.close()
538        except EnvironmentError, e:
539            raise service_error(service_error.internal, 
540                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
541        resp = self.build_access_response({ 'fedid': allocID } ,
542                ap, services)
543        return resp
544
545    def do_release_project(self, del_project, del_users, del_types):
546        """
547        If a project and users has to be deleted, make the call.
548        """
549        msg = { 'project': { }}
550        if del_project:
551            msg['project']['name']= {'localname': del_project}
552        users = [ ]
553        for u in del_users.keys():
554            users.append({ 'userID': { 'localname': u },\
555                'access' :  \
556                        [ {'sshPubkey' : s } for s in del_users[u]]\
557            })
558        if users: 
559            msg['project']['user'] = users
560        if len(del_types) > 0:
561            msg['resources'] = { 'node': \
562                    [ {'hardware': [ h ] } for h in del_types ]\
563                }
564        if self.allocate_project.release_project:
565            msg = { 'ReleaseProjectRequestBody' : msg}
566            self.allocate_project.release_project(msg)
567
568    def ReleaseAccess(self, req, fid):
569        # The dance to get into the request body
570        if req.has_key('ReleaseAccessRequestBody'):
571            req = req['ReleaseAccessRequestBody']
572        else:
573            raise service_error(service_error.req, "No request!?")
574
575        try:
576            if req['allocID'].has_key('localname'):
577                auth_attr = aid = req['allocID']['localname']
578            elif req['allocID'].has_key('fedid'):
579                aid = unicode(req['allocID']['fedid'])
580                auth_attr = req['allocID']['fedid']
581            else:
582                raise service_error(service_error.req,
583                        "Only localnames and fedids are understood")
584        except KeyError:
585            raise service_error(service_error.req, "Badly formed request")
586
587        self.log.debug("[access] deallocation requested for %s", aid)
588        if not self.auth.check_attribute(fid, auth_attr):
589            self.log.debug("[access] deallocation denied for %s", aid)
590            raise service_error(service_error.access, "Access Denied")
591
592        # If we know this allocation, reduce the reference counts and
593        # remove the local allocations.  Otherwise report an error.  If
594        # there is an allocation to delete, del_users will be a dictonary
595        # of sets where the key is the user that owns the keys in the set.
596        # We use a set to avoid duplicates.  del_project is just the name
597        # of any dynamic project to delete.  We're somewhat lazy about
598        # deleting authorization attributes.  Having access to something
599        # that doesn't exist isn't harmful.
600        del_users = { }
601        del_project = None
602        del_types = set()
603
604        self.state_lock.acquire()
605        if aid in self.allocation:
606            self.log.debug("Found allocation for %s" %aid)
607            for k in self.allocation[aid]['keys']:
608                kk = "%s:%s" % k
609                self.keys[kk] -= 1
610                if self.keys[kk] == 0:
611                    if not del_users.has_key(k[0]):
612                        del_users[k[0]] = set()
613                    del_users[k[0]].add(k[1])
614                    del self.keys[kk]
615
616            if 'project' in self.allocation[aid]:
617                pname = self.allocation[aid]['project']
618                self.projects[pname] -= 1
619                if self.projects[pname] == 0:
620                    del_project = pname
621                    del self.projects[pname]
622
623            if 'types' in self.allocation[aid]:
624                for t in self.allocation[aid]['types']:
625                    self.types[t] -= 1
626                    if self.types[t] == 0:
627                        if not del_project: del_project = t[0]
628                        del_types.add(t[1])
629                        del self.types[t]
630
631            del self.allocation[aid]
632            self.write_state()
633            self.state_lock.release()
634            # If we actually have resources to deallocate, prepare the call.
635            if del_project or del_users:
636                self.do_release_project(del_project, del_users, del_types)
637            # And remove the access cert
638            cf = "%s/%s.pem" % (self.certdir, aid)
639            self.log.debug("Removing %s" % cf)
640            os.remove(cf)
641            return { 'allocID': req['allocID'] } 
642        else:
643            self.state_lock.release()
644            raise service_error(service_error.req, "No such allocation")
645
646    # These are subroutines for StartSegment
647    def generate_ns2(self, topo, expfn, softdir, connInfo):
648        """
649        Convert topo into an ns2 file, decorated with appropriate commands for
650        the particular testbed setup.  Convert all requests for software, etc
651        to point at the staged copies on this testbed and add the federation
652        startcommands.
653        """
654        class dragon_commands:
655            """
656            Functor to spit out approrpiate dragon commands for nodes listed in
657            the connectivity description.  The constructor makes a dict mapping
658            dragon nodes to their parameters and the __call__ checks each
659            element in turn for membership.
660            """
661            def __init__(self, map):
662                self.node_info = map
663
664            def __call__(self, e):
665                s = ""
666                if isinstance(e, topdl.Computer):
667                    if self.node_info.has_key(e.name):
668                        info = self.node_info[e.name]
669                        for ifname, vlan, type in info:
670                            for i in e.interface:
671                                if i.name == ifname:
672                                    addr = i.get_attribute('ip4_address')
673                                    subs = i.substrate[0]
674                                    break
675                            else:
676                                raise service_error(service_error.internal,
677                                        "No interface %s on element %s" % \
678                                                (ifname, e.name))
679                            # XXX: do netmask right
680                            if type =='link':
681                                s = ("tb-allow-external ${%s} " + \
682                                        "dragonportal ip %s vlan %s " + \
683                                        "netmask 255.255.255.0\n") % \
684                                        (topdl.to_tcl_name(e.name), addr, vlan)
685                            elif type =='lan':
686                                s = ("tb-allow-external ${%s} " + \
687                                        "dragonportal " + \
688                                        "ip %s vlan %s usurp %s\n") % \
689                                        (topdl.to_tcl_name(e.name), addr, 
690                                                vlan, subs)
691                            else:
692                                raise service_error(service_error_internal,
693                                        "Unknown DRAGON type %s" % type)
694                return s
695
696        class not_dragon:
697            """
698            Return true if a node is in the given map of dragon nodes.
699            """
700            def __init__(self, map):
701                self.nodes = set(map.keys())
702
703            def __call__(self, e):
704                return e.name not in self.nodes
705
706        # Main line of generate_ns2
707        t = topo.clone()
708
709        # Create the map of nodes that need direct connections (dragon
710        # connections) from the connInfo
711        dragon_map = { }
712        for i in [ i for i in connInfo if i['type'] == 'transit']:
713            for a in i.get('fedAttr', []):
714                if a['attribute'] == 'vlan_id':
715                    vlan = a['value']
716                    break
717            else:
718                raise service_error(service_error.internal, "No vlan tag")
719            members = i.get('member', [])
720            if len(members) > 1: type = 'lan'
721            else: type = 'link'
722
723            try:
724                for m in members:
725                    if m['element'] in dragon_map:
726                        dragon_map[m['element']].append(( m['interface'], 
727                            vlan, type))
728                    else:
729                        dragon_map[m['element']] = [( m['interface'], 
730                            vlan, type),]
731            except KeyError:
732                raise service_error(service_error.req,
733                        "Missing connectivity info")
734
735        # Weed out the things we aren't going to instantiate: Segments, portal
736        # substrates, and portal interfaces.  (The copy in the for loop allows
737        # us to delete from e.elements in side the for loop).  While we're
738        # touching all the elements, we also adjust paths from the original
739        # testbed to local testbed paths and put the federation commands into
740        # the start commands
741        for e in [e for e in t.elements]:
742            if isinstance(e, topdl.Segment):
743                t.elements.remove(e)
744            if isinstance(e, topdl.Computer):
745                self.add_kit(e, self.federation_software)
746                if e.get_attribute('portal') and self.portal_startcommand:
747                    # Add local portal support software
748                    self.add_kit(e, self.portal_software)
749                    # Portals never have a user-specified start command
750                    e.set_attribute('startup', self.portal_startcommand)
751                elif self.node_startcommand:
752                    if e.get_attribute('startup'):
753                        e.set_attribute('startup', "%s \\$USER '%s'" % \
754                                (self.node_startcommand, 
755                                    e.get_attribute('startup')))
756                    else:
757                        e.set_attribute('startup', self.node_startcommand)
758
759                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
760                # Remove portal interfaces that do not connect to DRAGON
761                e.interface = [i for i in e.interface \
762                        if not i.get_attribute('portal') or i.name in dinf ]
763            # Fix software paths
764            for s in getattr(e, 'software', []):
765                s.location = re.sub("^.*/", softdir, s.location)
766
767        t.substrates = [ s.clone() for s in t.substrates ]
768        t.incorporate_elements()
769
770        # Customize the ns2 output for local portal commands and images
771        filters = []
772
773        if self.dragon_endpoint:
774            add_filter = not_dragon(dragon_map)
775            filters.append(dragon_commands(dragon_map))
776        else:
777            add_filter = None
778
779        if self.portal_command:
780            filters.append(topdl.generate_portal_command_filter(
781                self.portal_command, add_filter=add_filter))
782
783        if self.portal_image:
784            filters.append(topdl.generate_portal_image_filter(
785                self.portal_image))
786
787        if self.portal_type:
788            filters.append(topdl.generate_portal_hardware_filter(
789                self.portal_type))
790
791        # Convert to ns and write it out
792        expfile = topdl.topology_to_ns2(t, filters)
793        try:
794            f = open(expfn, "w")
795            print >>f, expfile
796            f.close()
797        except EnvironmentError:
798            raise service_error(service_error.internal,
799                    "Cannot write experiment file %s: %s" % (expfn,e))
800
801    def export_store_info(self, cf, proj, ename, connInfo):
802        """
803        For the export requests in the connection info, install the peer names
804        at the experiment controller via SetValue calls.
805        """
806
807        for c in connInfo:
808            for p in [ p for p in c.get('parameter', []) \
809                    if p.get('type', '') == 'output']:
810
811                if p.get('name', '') == 'peer':
812                    k = p.get('key', None)
813                    surl = p.get('store', None)
814                    if surl and k and k.index('/') != -1:
815                        value = "%s.%s.%s%s" % \
816                                (k[k.index('/')+1:], ename, proj, self.domain)
817                        req = { 'name': k, 'value': value }
818                        self.log.debug("Setting %s to %s on %s" % \
819                                (k, value, surl))
820                        self.call_SetValue(surl, req, cf)
821                    else:
822                        self.log.error("Bad export request: %s" % p)
823                elif p.get('name', '') == 'ssh_port':
824                    k = p.get('key', None)
825                    surl = p.get('store', None)
826                    if surl and k:
827                        req = { 'name': k, 'value': self.ssh_port }
828                        self.log.debug("Setting %s to %s on %s" % \
829                                (k, self.ssh_port, surl))
830                        self.call_SetValue(surl, req, cf)
831                    else:
832                        self.log.error("Bad export request: %s" % p)
833                else:
834                    self.log.error("Unknown export parameter: %s" % \
835                            p.get('name'))
836                    continue
837
838    def add_seer_node(self, topo, name, startup):
839        """
840        Add a seer node to the given topology, with the startup command passed
841        in.  Used by configure seer_services.
842        """
843        c_node = topdl.Computer(
844                name=name, 
845                os= topdl.OperatingSystem(
846                    attribute=[
847                    { 'attribute': 'osid', 
848                        'value': self.local_seer_image },
849                    ]),
850                attribute=[
851                    { 'attribute': 'startup', 'value': startup },
852                    ]
853                )
854        self.add_kit(c_node, self.local_seer_software)
855        topo.elements.append(c_node)
856
857    def configure_seer_services(self, services, topo, softdir):
858        """
859        Make changes to the topology required for the seer requests being made.
860        Specifically, add any control or master nodes required and set up the
861        start commands on the nodes to interconnect them.
862        """
863        local_seer = False      # True if we need to add a control node
864        collect_seer = False    # True if there is a seer-master node
865        seer_master= False      # True if we need to add the seer-master
866        for s in services:
867            s_name = s.get('name', '')
868            s_vis = s.get('visibility','')
869
870            if s_name  == 'local_seer_control' and s_vis == 'export':
871                local_seer = True
872            elif s_name == 'seer_master':
873                if s_vis == 'import':
874                    collect_seer = True
875                elif s_vis == 'export':
876                    seer_master = True
877       
878        # We've got the whole picture now, so add nodes if needed and configure
879        # them to interconnect properly.
880        if local_seer or seer_master:
881            # Copy local seer control node software to the tempdir
882            for l, f in self.local_seer_software:
883                base = os.path.basename(f)
884                copy_file(f, "%s/%s" % (softdir, base))
885        # If we're collecting seers somewhere the controllers need to talk to
886        # the master.  In testbeds that export the service, that will be a
887        # local node that we'll add below.  Elsewhere it will be the control
888        # portal that will port forward to the exporting master.
889        if local_seer:
890            if collect_seer:
891                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
892            else:
893                startup = self.local_seer_start
894            self.add_seer_node(topo, 'control', startup)
895        # If this is the seer master, add that node, too.
896        if seer_master:
897            self.add_seer_node(topo, 'seer-master', 
898                    "%s -R -n -R seer-master -R -A -R sink" % \
899                            self.seer_master_start)
900
901    def retrieve_software(self, topo, certfile, softdir):
902        """
903        Collect the software that nodes in the topology need loaded and stage
904        it locally.  This implies retrieving it from the experiment_controller
905        and placing it into softdir.  Certfile is used to prove that this node
906        has access to that data (it's the allocation/segment fedid).  Finally
907        local portal and federation software is also copied to the same staging
908        directory for simplicity - all software needed for experiment creation
909        is in softdir.
910        """
911        sw = set()
912        for e in topo.elements:
913            for s in getattr(e, 'software', []):
914                sw.add(s.location)
915        for s in sw:
916            self.log.debug("Retrieving %s" % s)
917            try:
918                get_url(s, certfile, softdir)
919            except:
920                t, v, st = sys.exc_info()
921                raise service_error(service_error.internal,
922                        "Error retrieving %s: %s" % (s, v))
923
924        # Copy local federation and portal node software to the tempdir
925        for s in (self.federation_software, self.portal_software):
926            for l, f in s:
927                base = os.path.basename(f)
928                copy_file(f, "%s/%s" % (softdir, base))
929
930
931    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
932        """
933        Gather common configuration files, retrieve or create an experiment
934        name and project name, and return the ssh_key filenames.  Create an
935        allocation log bound to the state log variable as well.
936        """
937        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
938        ename = None
939        pubkey_base = None
940        secretkey_base = None
941        proj = None
942        user = None
943        alloc_log = None
944
945        for a in attrs:
946            if a['attribute'] in configs:
947                try:
948                    self.log.debug("Retrieving %s from %s" % \
949                            (a['attribute'], a['value']))
950                    get_url(a['value'], certfile, tmpdir)
951                except:
952                    t, v, st = sys.exc_info()
953                    raise service_error(service_error.internal,
954                            "Error retrieving %s: %s" % (a.get('value', ""), v))
955            if a['attribute'] == 'ssh_pubkey':
956                pubkey_base = a['value'].rpartition('/')[2]
957            if a['attribute'] == 'ssh_secretkey':
958                secretkey_base = a['value'].rpartition('/')[2]
959            if a['attribute'] == 'experiment_name':
960                ename = a['value']
961
962        if not ename:
963            ename = ""
964            for i in range(0,5):
965                ename += random.choice(string.ascii_letters)
966            self.log.warn("No experiment name: picked one randomly: %s" \
967                    % ename)
968
969        if not pubkey_base:
970            raise service_error(service_error.req, 
971                    "No public key attribute")
972
973        if not secretkey_base:
974            raise service_error(service_error.req, 
975                    "No secret key attribute")
976
977        self.state_lock.acquire()
978        if aid in self.allocation:
979            proj = self.allocation[aid].get('project', None)
980            if not proj: 
981                proj = self.allocation[aid].get('sproject', None)
982            user = self.allocation[aid].get('user', None)
983            self.allocation[aid]['experiment'] = ename
984            self.allocation[aid]['log'] = [ ]
985            # Create a logger that logs to the experiment's state object as
986            # well as to the main log file.
987            alloc_log = logging.getLogger('fedd.access.%s' % ename)
988            h = logging.StreamHandler(
989                    list_log.list_log(self.allocation[aid]['log']))
990            # XXX: there should be a global one of these rather than
991            # repeating the code.
992            h.setFormatter(logging.Formatter(
993                "%(asctime)s %(name)s %(message)s",
994                        '%d %b %y %H:%M:%S'))
995            alloc_log.addHandler(h)
996            self.write_state()
997        self.state_lock.release()
998
999        if not proj:
1000            raise service_error(service_error.internal, 
1001                    "Can't find project for %s" %aid)
1002
1003        if not user:
1004            raise service_error(service_error.internal, 
1005                    "Can't find creation user for %s" %aid)
1006
1007        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1008
1009    def finalize_experiment(self, starter, topo, aid, alloc_id):
1010        """
1011        Store key bits of experiment state in the global repository, including
1012        the response that may need to be replayed, and return the response.
1013        """
1014        # Copy the assigned names into the return topology
1015        embedding = [ ]
1016        for n in starter.node:
1017            embedding.append({ 
1018                'toponame': n,
1019                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1020                })
1021        # Grab the log (this is some anal locking, but better safe than
1022        # sorry)
1023        self.state_lock.acquire()
1024        logv = "".join(self.allocation[aid]['log'])
1025        # It's possible that the StartSegment call gets retried (!).
1026        # if the 'started' key is in the allocation, we'll return it rather
1027        # than redo the setup.
1028        self.allocation[aid]['started'] = { 
1029                'allocID': alloc_id,
1030                'allocationLog': logv,
1031                'segmentdescription': { 
1032                    'topdldescription': topo.clone().to_dict()
1033                    },
1034                'embedding': embedding
1035                }
1036        retval = copy.copy(self.allocation[aid]['started'])
1037        self.write_state()
1038        self.state_lock.release()
1039        return retval
1040   
1041    # End of StartSegment support routines
1042
1043    def StartSegment(self, req, fid):
1044        err = None  # Any service_error generated after tmpdir is created
1045        rv = None   # Return value from segment creation
1046
1047        try:
1048            req = req['StartSegmentRequestBody']
1049            auth_attr = req['allocID']['fedid']
1050            topref = req['segmentdescription']['topdldescription']
1051        except KeyError:
1052            raise service_error(server_error.req, "Badly formed request")
1053
1054        connInfo = req.get('connection', [])
1055        services = req.get('service', [])
1056        aid = "%s" % auth_attr
1057        attrs = req.get('fedAttr', [])
1058        if not self.auth.check_attribute(fid, auth_attr):
1059            raise service_error(service_error.access, "Access denied")
1060        else:
1061            # See if this is a replay of an earlier succeeded StartSegment -
1062            # sometimes SSL kills 'em.  If so, replay the response rather than
1063            # redoing the allocation.
1064            self.state_lock.acquire()
1065            retval = self.allocation[aid].get('started', None)
1066            self.state_lock.release()
1067            if retval:
1068                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1069                        "replaying response")
1070                return retval
1071
1072        # A new request.  Do it.
1073
1074        if topref: topo = topdl.Topology(**topref)
1075        else:
1076            raise service_error(service_error.req, 
1077                    "Request missing segmentdescription'")
1078       
1079        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1080        try:
1081            tmpdir = tempfile.mkdtemp(prefix="access-")
1082            softdir = "%s/software" % tmpdir
1083            os.mkdir(softdir)
1084        except EnvironmentError:
1085            raise service_error(service_error.internal, "Cannot create tmp dir")
1086
1087        # Try block alllows us to clean up temporary files.
1088        try:
1089            self.retrieve_software(topo, certfile, softdir)
1090            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1091                    self.initialize_experiment_info(attrs, aid, 
1092                            certfile, tmpdir)
1093
1094            # Set up userconf and seer if needed
1095            self.configure_userconf(services, tmpdir)
1096            self.configure_seer_services(services, topo, softdir)
1097            # Get and send synch store variables
1098            self.export_store_info(certfile, proj, ename, connInfo)
1099            self.import_store_info(certfile, connInfo)
1100
1101            expfile = "%s/experiment.tcl" % tmpdir
1102
1103            self.generate_portal_configs(topo, pubkey_base, 
1104                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1105            self.generate_ns2(topo, expfile, 
1106                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1107
1108            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1109                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1110                    cert=self.xmlrpc_cert)
1111            rv = starter(self, ename, proj, user, expfile, tmpdir)
1112        except service_error, e:
1113            err = e
1114        except:
1115            t, v, st = sys.exc_info()
1116            err = service_error(service_error.internal, "%s: %s" % \
1117                    (v, traceback.extract_tb(st)))
1118
1119        # Walk up tmpdir, deleting as we go
1120        if self.cleanup: self.remove_dirs(tmpdir)
1121        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1122
1123        if rv:
1124            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1125        elif err:
1126            raise service_error(service_error.federant,
1127                    "Swapin failed: %s" % err)
1128        else:
1129            raise service_error(service_error.federant, "Swapin failed")
1130
1131    def TerminateSegment(self, req, fid):
1132        try:
1133            req = req['TerminateSegmentRequestBody']
1134        except KeyError:
1135            raise service_error(server_error.req, "Badly formed request")
1136
1137        auth_attr = req['allocID']['fedid']
1138        aid = "%s" % auth_attr
1139        attrs = req.get('fedAttr', [])
1140        if not self.auth.check_attribute(fid, auth_attr):
1141            raise service_error(service_error.access, "Access denied")
1142
1143        self.state_lock.acquire()
1144        if aid in self.allocation:
1145            proj = self.allocation[aid].get('project', None)
1146            if not proj: 
1147                proj = self.allocation[aid].get('sproject', None)
1148            user = self.allocation[aid].get('user', None)
1149            ename = self.allocation[aid].get('experiment', None)
1150        else:
1151            proj = None
1152            user = None
1153            ename = None
1154        self.state_lock.release()
1155
1156        if not proj:
1157            raise service_error(service_error.internal, 
1158                    "Can't find project for %s" % aid)
1159
1160        if not user:
1161            raise service_error(service_error.internal, 
1162                    "Can't find creation user for %s" % aid)
1163        if not ename:
1164            raise service_error(service_error.internal, 
1165                    "Can't find experiment name for %s" % aid)
1166        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1167                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1168        stopper(self, user, proj, ename)
1169        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.