source: fedd/federation/emulab_access.py @ 027b87b

axis_examplecompt_changesinfo-ops
Last change on this file since 027b87b was 027b87b, checked in by Ted Faber <faber@…>, 13 years ago

This little class added a useless complexity. While I'm in here I removed it.

  • Property mode set to 100644
File size: 38.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from fedid import fedid, generate_fedid
22from authorizer import authorizer, abac_authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30import topdl
31import list_log
32import proxy_emulab_segment
33import local_emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    def __init__(self, config=None, auth=None):
52        """
53        Initializer.  Pulls parameters out of the ConfigParser's access section.
54        """
55
56        access_base.__init__(self, config, auth)
57
58        self.allow_proxy = config.getboolean("access", "allow_proxy")
59
60        self.domain = config.get("access", "domain")
61        self.userconfdir = config.get("access","userconfdir")
62        self.userconfcmd = config.get("access","userconfcmd")
63        self.userconfurl = config.get("access","userconfurl")
64        self.federation_software = config.get("access", "federation_software")
65        self.portal_software = config.get("access", "portal_software")
66        self.local_seer_software = config.get("access", "local_seer_software")
67        self.local_seer_image = config.get("access", "local_seer_image")
68        self.local_seer_start = config.get("access", "local_seer_start")
69        self.seer_master_start = config.get("access", "seer_master_start")
70        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
71        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
72        self.ssh_port = config.get("access","ssh_port") or "22"
73        self.boss = config.get("access", "boss")
74        self.ops = config.get("access", "ops")
75        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
76        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
77
78        self.dragon_endpoint = config.get("access", "dragon")
79        self.dragon_vlans = config.get("access", "dragon_vlans")
80        self.deter_internal = config.get("access", "deter_internal")
81
82        self.tunnel_config = config.getboolean("access", "tunnel_config")
83        self.portal_command = config.get("access", "portal_command")
84        self.portal_image = config.get("access", "portal_image")
85        self.portal_type = config.get("access", "portal_type") or "pc"
86        self.portal_startcommand = config.get("access", "portal_startcommand")
87        self.node_startcommand = config.get("access", "node_startcommand")
88
89        self.federation_software = self.software_list(self.federation_software)
90        self.portal_software = self.software_list(self.portal_software)
91        self.local_seer_software = self.software_list(self.local_seer_software)
92
93        self.access_type = self.access_type.lower()
94        if self.access_type == 'remote_emulab':
95            self.start_segment = proxy_emulab_segment.start_segment
96            self.stop_segment = proxy_emulab_segment.stop_segment
97        elif self.access_type == 'local_emulab':
98            self.start_segment = local_emulab_segment.start_segment
99            self.stop_segment = local_emulab_segment.stop_segment
100        else:
101            self.start_segment = None
102            self.stop_segment = None
103
104        self.restricted = [ ]
105        self.access = { }
106        # XXX: this should go?
107        #if config.has_option("access", "accessdb"):
108        #    self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        # authorization information
114        self.auth_type = config.get('access', 'auth_type') \
115                or 'legacy'
116        self.auth_dir = config.get('access', 'auth_dir')
117        accessdb = config.get("access", "accessdb")
118        # initialize the authorization system
119        if self.auth_type == 'legacy':
120            if accessdb:
121                self.read_access(accessdb, self.make_access_project)
122        elif self.auth_type == 'abac':
123            self.auth = abac_authorizer(load=self.auth_dir)
124            if accessdb:
125                self.read_abac_access(accessdb, self.make_abac_access_project)
126        else:
127            raise service_error(service_error.internal, 
128                    "Unknown auth_type: %s" % self.auth_type)
129
130        # read_state in the base_class
131        self.state_lock.acquire()
132        for a  in ('allocation', 'projects', 'keys', 'types'):
133            if a not in self.state:
134                self.state[a] = { }
135        self.allocation = self.state['allocation']
136        self.projects = self.state['projects']
137        self.keys = self.state['keys']
138        self.types = self.state['types']
139        if self.auth_type == "legacy": 
140            # Add the ownership attributes to the authorizer.  Note that the
141            # indices of the allocation dict are strings, but the attributes are
142            # fedids, so there is a conversion.
143            for k in self.allocation.keys():
144                for o in self.allocation[k].get('owners', []):
145                    self.auth.set_attribute(o, fedid(hexstr=k))
146                if self.allocation[k].has_key('userconfig'):
147                    sfid = self.allocation[k]['userconfig']
148                    fid = fedid(hexstr=sfid)
149                    self.auth.set_attribute(fid, "/%s" % sfid)
150        self.state_lock.release()
151        self.exports = {
152                'SMB': self.export_SMB,
153                'seer': self.export_seer,
154                'tmcd': self.export_tmcd,
155                'userconfig': self.export_userconfig,
156                'project_export': self.export_project_export,
157                'local_seer_control': self.export_local_seer,
158                'seer_master': self.export_seer_master,
159                'hide_hosts': self.export_hide_hosts,
160                }
161
162        if not self.local_seer_image or not self.local_seer_software or \
163                not self.local_seer_start:
164            if 'local_seer_control' in self.exports:
165                del self.exports['local_seer_control']
166
167        if not self.local_seer_image or not self.local_seer_software or \
168                not self.seer_master_start:
169            if 'seer_master' in self.exports:
170                del self.exports['seer_master']
171
172
173        self.soap_services = {\
174            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
175            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
176            'StartSegment': soap_handler("StartSegment", self.StartSegment),
177            'TerminateSegment': soap_handler("TerminateSegment",
178                self.TerminateSegment),
179            }
180        self.xmlrpc_services =  {\
181            'RequestAccess': xmlrpc_handler('RequestAccess',
182                self.RequestAccess),
183            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
184                self.ReleaseAccess),
185            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
186            'TerminateSegment': xmlrpc_handler('TerminateSegment',
187                self.TerminateSegment),
188            }
189
190        self.call_SetValue = service_caller('SetValue')
191        self.call_GetValue = service_caller('GetValue', log=self.log)
192
193        if not config.has_option("allocate", "uri"):
194            self.allocate_project = \
195                allocate_project_local(config, auth)
196        else:
197            self.allocate_project = \
198                allocate_project_remote(config, auth)
199
200
201        # If the project allocator exports services, put them in this object's
202        # maps so that classes that instantiate this can call the services.
203        self.soap_services.update(self.allocate_project.soap_services)
204        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
205
206    @staticmethod
207    def make_access_project(str):
208        """
209        Convert a string of the form (id[:resources:resouces], id, id) into a
210        tuple of the form (project, user, user) where users may be names or
211        fedids.  The resources strings are obsolete and ignored.
212        """
213        def parse_name(n):
214            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
215            else: return n
216
217        str = str.strip()
218        if str.startswith('(') and str.endswith(')'):
219            str = str[1:-1]
220            names = [ s.strip() for s in str.split(",")]
221            if len(names) > 3:
222                raise self.parse_error("More than three fields in name")
223            first = names[0].split(":")
224            if first == 'fedid:':
225                del first[0]
226                first[0] = fedid(hexstr=first[0])
227            names[0] = first[0]
228
229            for i in range(1,2):
230                names[i] = parse_name(names[i])
231
232            return tuple(names)
233        else:
234            raise self.parse_error('Bad mapping (unbalanced parens)')
235
236    @staticmethod
237    def make_abac_access_project(str):
238        """
239        Convert a string of the form (id, id) into an access_project.  This is
240        called by read_abac_access to convert to local attributes.  It returns
241        a tuple of the form (project, user, user) where the two users are
242        always the same.
243        """
244
245        str = str.strip()
246        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
247            # The slice takes the parens off the string.
248            proj, user = str[1:-1].split(',')
249            return (proj.strip(), user.strip(), user.strip())
250        else:
251            raise self.parse_error(
252                    'Bad mapping (unbalanced parens or more than 1 comma)')
253
254
255    # RequestAccess support routines
256
257    def lookup_access(self, req, fid):
258        """
259        Look up the local access control information mapped to this fedid and
260        credentials.  In this case it is a (project, create_user, access_user)
261        triple, and a triple of three booleans indicating which, if any will
262        need to be dynamically created.  Finally a list of owners for that
263        allocation is returned.
264
265        lookup_access_base pulls the first triple out, and it is parsed by this
266        routine into the boolean map.  Owners is always the controlling fedid.
267        """
268        # Return values
269        rp = None
270        ru = None
271        # This maps a valid user to the Emulab projects and users to use
272        found, match = self.lookup_access_base(req, fid)
273        tb, project, user = match
274       
275        if found == None:
276            raise service_error(service_error.access,
277                    "Access denied - cannot map access")
278
279        # resolve <dynamic> and <same> in found
280        dyn_proj = False
281        dyn_create_user = False
282        dyn_service_user = False
283
284        if found[0] == "<same>":
285            if project != None:
286                rp = project
287            else : 
288                raise service_error(\
289                        service_error.server_config,
290                        "Project matched <same> when no project given")
291        elif found[0] == "<dynamic>":
292            rp = None
293            dyn_proj = True
294        else:
295            rp = found[0]
296
297        if found[1] == "<same>":
298            if user_match == "<any>":
299                if user != None: rcu = user[0]
300                else: raise service_error(\
301                        service_error.server_config,
302                        "Matched <same> on anonymous request")
303            else:
304                rcu = user_match
305        elif found[1] == "<dynamic>":
306            rcu = None
307            dyn_create_user = True
308        else:
309            rcu = found[1]
310       
311        if found[2] == "<same>":
312            if user_match == "<any>":
313                if user != None: rsu = user[0]
314                else: raise service_error(\
315                        service_error.server_config,
316                        "Matched <same> on anonymous request")
317            else:
318                rsu = user_match
319        elif found[2] == "<dynamic>":
320            rsu = None
321            dyn_service_user = True
322        else:
323            rsu = found[2]
324
325        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
326                [ fid ]
327
328    def lookup_abac_access(self, req, fid): 
329        # Import request credentials into this (clone later??)
330        if self.auth.import_credentials(data_list=req.get('abac_credential', [])):
331            self.auth.save()
332
333        # Check every attribute that we know how to map and take the first
334        # success.
335        print "%s" %self.auth
336        for attr in (self.access.keys()):
337            if self.auth.check_attribute(fid, attr):
338                # XXX: needs to deal with dynamics
339                return copy.copy(self.access[attr]), (False, False, False), \
340                        [ fid ]
341            else:
342                self.log.debug("Access failed for %s %s" % (attr, fid))
343        else:
344            raise service_error(service_error.access, "Access denied")
345
346
347    def do_project_allocation(self, dyn, project, user):
348        """
349        Call the project allocation routines and return the info.
350        """
351        if dyn: 
352            # Compose the dynamic project request
353            # (only dynamic, dynamic currently allowed)
354            preq = { 'AllocateProjectRequestBody': \
355                        { 'project' : {\
356                            'user': [ \
357                            { \
358                                'access': [ { 
359                                    'sshPubkey': self.ssh_pubkey_file } ],
360                                 'role': "serviceAccess",\
361                            }, \
362                            { \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                 'role': "experimentCreation",\
366                            }, \
367                            ], \
368                            }\
369                        }\
370                    }
371            return self.allocate_project.dynamic_project(preq)
372        else:
373            preq = {'StaticProjectRequestBody' : \
374                    { 'project': \
375                        { 'name' : { 'localname' : project },\
376                          'user' : [ \
377                            {\
378                                'userID': { 'localname' : user }, \
379                                'access': [ { 
380                                    'sshPubkey': self.ssh_pubkey_file } ],
381                                'role': 'experimentCreation'\
382                            },\
383                            {\
384                                'userID': { 'localname' : user}, \
385                                'access': [ { 
386                                    'sshPubkey': self.ssh_pubkey_file } ],
387                                'role': 'serviceAccess'\
388                            },\
389                        ]}\
390                    }\
391            }
392            return self.allocate_project.static_project(preq)
393
394    def save_project_state(self, aid, ap, dyn, owners):
395        """
396        Parse out and save the information relevant to the project created for
397        this experiment.  That info is largely in ap and owners.  dyn indicates
398        that the project was created dynamically.  Return the user and project
399        names.
400        """
401        self.state_lock.acquire()
402        self.allocation[aid] = { }
403        try:
404            pname = ap['project']['name']['localname']
405        except KeyError:
406            pname = None
407
408        if dyn:
409            if not pname:
410                self.state_lock.release()
411                raise service_error(service_error.internal,
412                        "Misformed allocation response?")
413            if pname in self.projects: self.projects[pname] += 1
414            else: self.projects[pname] = 1
415            self.allocation[aid]['project'] = pname
416        else:
417            # sproject is a static project associated with this allocation.
418            self.allocation[aid]['sproject'] = pname
419
420        self.allocation[aid]['keys'] = [ ]
421
422        try:
423            for u in ap['project']['user']:
424                uname = u['userID']['localname']
425                if u['role'] == 'experimentCreation':
426                    self.allocation[aid]['user'] = uname
427                for k in [ k['sshPubkey'] for k in u['access'] \
428                        if k.has_key('sshPubkey') ]:
429                    kv = "%s:%s" % (uname, k)
430                    if self.keys.has_key(kv): self.keys[kv] += 1
431                    else: self.keys[kv] = 1
432                    self.allocation[aid]['keys'].append((uname, k))
433        except KeyError:
434            self.state_lock.release()
435            raise service_error(service_error.internal,
436                    "Misformed allocation response?")
437
438        self.allocation[aid]['owners'] = owners
439        self.write_state()
440        self.state_lock.release()
441        return (pname, uname)
442
443    # End of RequestAccess support routines
444
445    def RequestAccess(self, req, fid):
446        """
447        Handle the access request.  Proxy if not for us.
448
449        Parse out the fields and make the allocations or rejections if for us,
450        otherwise, assuming we're willing to proxy, proxy the request out.
451        """
452
453        def gateway_hardware(h):
454            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
455            else: return h
456
457        def get_export_project(svcs):
458            """
459            if the service requests includes one to export a project, return
460            that project.
461            """
462            rv = None
463            for s in svcs:
464                if s.get('name', '') == 'project_export' and \
465                        s.get('visibility', '') == 'export':
466                    if not rv: 
467                        for a in s.get('feddAttr', []):
468                            if a.get('attribute', '') == 'project' \
469                                    and 'value' in a:
470                                rv = a['value']
471                    else:
472                        raise service_error(service_error, access, 
473                                'Requesting multiple project exports is ' + \
474                                        'not supported');
475            return rv
476
477        # The dance to get into the request body
478        if req.has_key('RequestAccessRequestBody'):
479            req = req['RequestAccessRequestBody']
480        else:
481            raise service_error(service_error.req, "No request!?")
482
483        alog = open("./auth.log", 'w')
484        print >>alog, self.auth
485        print >> alog, "after"
486        if self.auth.import_credentials(
487                data_list=req.get('abac_credential', [])):
488            self.auth.save()
489        print >>alog, self.auth
490        alog.close()
491
492        if self.auth_type == "legacy":
493            found, dyn, owners = self.lookup_access(req, fid)
494        elif self.auth_type == 'abac':
495            found, dyn, owners = self.lookup_abac_access(req, fid)
496        else:
497            raise service_error(service_error.internal, 
498                    'Unknown auth_type: %s' % self.auth_type)
499        ap = None
500
501        # if this includes a project export request and the exported
502        # project is not the access project, access denied.
503        if 'service' in req:
504            ep = get_export_project(req['service'])
505            if ep and ep != found[0]:
506                raise service_error(service_error.access,
507                        "Cannot export %s" % ep)
508
509        if self.ssh_pubkey_file:
510            ap = self.do_project_allocation(dyn[1], found[0], found[1])
511        else:
512            raise service_error(service_error.internal, 
513                    "SSH access parameters required")
514        # keep track of what's been added
515        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
516        aid = unicode(allocID)
517
518        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
519
520        services, svc_state = self.export_services(req.get('service',[]),
521                pname, uname)
522        self.state_lock.acquire()
523        # Store services state in global state
524        for k, v in svc_state.items():
525            self.allocation[aid][k] = v
526        self.write_state()
527        self.state_lock.release()
528        # Give the owners the right to change this allocation
529        for o in owners:
530            self.auth.set_attribute(o, allocID)
531            print "Set delete rights on %s for %s" % (o, allocID)
532        self.auth.save()
533        try:
534            f = open("%s/%s.pem" % (self.certdir, aid), "w")
535            print >>f, alloc_cert
536            f.close()
537        except EnvironmentError, e:
538            raise service_error(service_error.internal, 
539                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
540        resp = self.build_access_response({ 'fedid': allocID } ,
541                ap, services)
542        return resp
543
544    def do_release_project(self, del_project, del_users, del_types):
545        """
546        If a project and users has to be deleted, make the call.
547        """
548        msg = { 'project': { }}
549        if del_project:
550            msg['project']['name']= {'localname': del_project}
551        users = [ ]
552        for u in del_users.keys():
553            users.append({ 'userID': { 'localname': u },\
554                'access' :  \
555                        [ {'sshPubkey' : s } for s in del_users[u]]\
556            })
557        if users: 
558            msg['project']['user'] = users
559        if len(del_types) > 0:
560            msg['resources'] = { 'node': \
561                    [ {'hardware': [ h ] } for h in del_types ]\
562                }
563        if self.allocate_project.release_project:
564            msg = { 'ReleaseProjectRequestBody' : msg}
565            self.allocate_project.release_project(msg)
566
567    def ReleaseAccess(self, req, fid):
568        # The dance to get into the request body
569        if req.has_key('ReleaseAccessRequestBody'):
570            req = req['ReleaseAccessRequestBody']
571        else:
572            raise service_error(service_error.req, "No request!?")
573
574        try:
575            if req['allocID'].has_key('localname'):
576                auth_attr = aid = req['allocID']['localname']
577            elif req['allocID'].has_key('fedid'):
578                aid = unicode(req['allocID']['fedid'])
579                auth_attr = req['allocID']['fedid']
580            else:
581                raise service_error(service_error.req,
582                        "Only localnames and fedids are understood")
583        except KeyError:
584            raise service_error(service_error.req, "Badly formed request")
585
586        self.log.debug("[access] deallocation requested for %s by %s" % \
587                (aid, fid))
588        if not self.auth.check_attribute(fid, auth_attr):
589            self.log.debug("[access] deallocation denied for %s", aid)
590            raise service_error(service_error.access, "Access Denied")
591
592        # If we know this allocation, reduce the reference counts and
593        # remove the local allocations.  Otherwise report an error.  If
594        # there is an allocation to delete, del_users will be a dictonary
595        # of sets where the key is the user that owns the keys in the set.
596        # We use a set to avoid duplicates.  del_project is just the name
597        # of any dynamic project to delete.  We're somewhat lazy about
598        # deleting authorization attributes.  Having access to something
599        # that doesn't exist isn't harmful.
600        del_users = { }
601        del_project = None
602        del_types = set()
603
604        self.state_lock.acquire()
605        if aid in self.allocation:
606            self.log.debug("Found allocation for %s" %aid)
607            for k in self.allocation[aid]['keys']:
608                kk = "%s:%s" % k
609                self.keys[kk] -= 1
610                if self.keys[kk] == 0:
611                    if not del_users.has_key(k[0]):
612                        del_users[k[0]] = set()
613                    del_users[k[0]].add(k[1])
614                    del self.keys[kk]
615
616            if 'project' in self.allocation[aid]:
617                pname = self.allocation[aid]['project']
618                self.projects[pname] -= 1
619                if self.projects[pname] == 0:
620                    del_project = pname
621                    del self.projects[pname]
622
623            if 'types' in self.allocation[aid]:
624                for t in self.allocation[aid]['types']:
625                    self.types[t] -= 1
626                    if self.types[t] == 0:
627                        if not del_project: del_project = t[0]
628                        del_types.add(t[1])
629                        del self.types[t]
630
631            del self.allocation[aid]
632            self.write_state()
633            self.state_lock.release()
634            # If we actually have resources to deallocate, prepare the call.
635            if del_project or del_users:
636                self.do_release_project(del_project, del_users, del_types)
637            # And remove the access cert
638            cf = "%s/%s.pem" % (self.certdir, aid)
639            self.log.debug("Removing %s" % cf)
640            os.remove(cf)
641            return { 'allocID': req['allocID'] } 
642        else:
643            self.state_lock.release()
644            raise service_error(service_error.req, "No such allocation")
645
646    # These are subroutines for StartSegment
647    def generate_ns2(self, topo, expfn, softdir, connInfo):
648        """
649        Convert topo into an ns2 file, decorated with appropriate commands for
650        the particular testbed setup.  Convert all requests for software, etc
651        to point at the staged copies on this testbed and add the federation
652        startcommands.
653        """
654        class dragon_commands:
655            """
656            Functor to spit out approrpiate dragon commands for nodes listed in
657            the connectivity description.  The constructor makes a dict mapping
658            dragon nodes to their parameters and the __call__ checks each
659            element in turn for membership.
660            """
661            def __init__(self, map):
662                self.node_info = map
663
664            def __call__(self, e):
665                s = ""
666                if isinstance(e, topdl.Computer):
667                    if self.node_info.has_key(e.name):
668                        info = self.node_info[e.name]
669                        for ifname, vlan, type in info:
670                            for i in e.interface:
671                                if i.name == ifname:
672                                    addr = i.get_attribute('ip4_address')
673                                    subs = i.substrate[0]
674                                    break
675                            else:
676                                raise service_error(service_error.internal,
677                                        "No interface %s on element %s" % \
678                                                (ifname, e.name))
679                            # XXX: do netmask right
680                            if type =='link':
681                                s = ("tb-allow-external ${%s} " + \
682                                        "dragonportal ip %s vlan %s " + \
683                                        "netmask 255.255.255.0\n") % \
684                                        (topdl.to_tcl_name(e.name), addr, vlan)
685                            elif type =='lan':
686                                s = ("tb-allow-external ${%s} " + \
687                                        "dragonportal " + \
688                                        "ip %s vlan %s usurp %s\n") % \
689                                        (topdl.to_tcl_name(e.name), addr, 
690                                                vlan, subs)
691                            else:
692                                raise service_error(service_error_internal,
693                                        "Unknown DRAGON type %s" % type)
694                return s
695
696        class not_dragon:
697            """
698            Return true if a node is in the given map of dragon nodes.
699            """
700            def __init__(self, map):
701                self.nodes = set(map.keys())
702
703            def __call__(self, e):
704                return e.name not in self.nodes
705
706        # Main line of generate_ns2
707        t = topo.clone()
708
709        # Create the map of nodes that need direct connections (dragon
710        # connections) from the connInfo
711        dragon_map = { }
712        for i in [ i for i in connInfo if i['type'] == 'transit']:
713            for a in i.get('fedAttr', []):
714                if a['attribute'] == 'vlan_id':
715                    vlan = a['value']
716                    break
717            else:
718                raise service_error(service_error.internal, "No vlan tag")
719            members = i.get('member', [])
720            if len(members) > 1: type = 'lan'
721            else: type = 'link'
722
723            try:
724                for m in members:
725                    if m['element'] in dragon_map:
726                        dragon_map[m['element']].append(( m['interface'], 
727                            vlan, type))
728                    else:
729                        dragon_map[m['element']] = [( m['interface'], 
730                            vlan, type),]
731            except KeyError:
732                raise service_error(service_error.req,
733                        "Missing connectivity info")
734
735        # Weed out the things we aren't going to instantiate: Segments, portal
736        # substrates, and portal interfaces.  (The copy in the for loop allows
737        # us to delete from e.elements in side the for loop).  While we're
738        # touching all the elements, we also adjust paths from the original
739        # testbed to local testbed paths and put the federation commands into
740        # the start commands
741        for e in [e for e in t.elements]:
742            if isinstance(e, topdl.Segment):
743                t.elements.remove(e)
744            if isinstance(e, topdl.Computer):
745                self.add_kit(e, self.federation_software)
746                if e.get_attribute('portal') and self.portal_startcommand:
747                    # Add local portal support software
748                    self.add_kit(e, self.portal_software)
749                    # Portals never have a user-specified start command
750                    e.set_attribute('startup', self.portal_startcommand)
751                elif self.node_startcommand:
752                    if e.get_attribute('startup'):
753                        e.set_attribute('startup', "%s \\$USER '%s'" % \
754                                (self.node_startcommand, 
755                                    e.get_attribute('startup')))
756                    else:
757                        e.set_attribute('startup', self.node_startcommand)
758
759                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
760                # Remove portal interfaces that do not connect to DRAGON
761                e.interface = [i for i in e.interface \
762                        if not i.get_attribute('portal') or i.name in dinf ]
763            # Fix software paths
764            for s in getattr(e, 'software', []):
765                s.location = re.sub("^.*/", softdir, s.location)
766
767        t.substrates = [ s.clone() for s in t.substrates ]
768        t.incorporate_elements()
769
770        # Customize the ns2 output for local portal commands and images
771        filters = []
772
773        if self.dragon_endpoint:
774            add_filter = not_dragon(dragon_map)
775            filters.append(dragon_commands(dragon_map))
776        else:
777            add_filter = None
778
779        if self.portal_command:
780            filters.append(topdl.generate_portal_command_filter(
781                self.portal_command, add_filter=add_filter))
782
783        if self.portal_image:
784            filters.append(topdl.generate_portal_image_filter(
785                self.portal_image))
786
787        if self.portal_type:
788            filters.append(topdl.generate_portal_hardware_filter(
789                self.portal_type))
790
791        # Convert to ns and write it out
792        expfile = topdl.topology_to_ns2(t, filters)
793        try:
794            f = open(expfn, "w")
795            print >>f, expfile
796            f.close()
797        except EnvironmentError:
798            raise service_error(service_error.internal,
799                    "Cannot write experiment file %s: %s" % (expfn,e))
800
801    def export_store_info(self, cf, proj, ename, connInfo):
802        """
803        For the export requests in the connection info, install the peer names
804        at the experiment controller via SetValue calls.
805        """
806
807        for c in connInfo:
808            for p in [ p for p in c.get('parameter', []) \
809                    if p.get('type', '') == 'output']:
810
811                if p.get('name', '') == 'peer':
812                    k = p.get('key', None)
813                    surl = p.get('store', None)
814                    if surl and k and k.index('/') != -1:
815                        value = "%s.%s.%s%s" % \
816                                (k[k.index('/')+1:], ename, proj, self.domain)
817                        req = { 'name': k, 'value': value }
818                        self.log.debug("Setting %s to %s on %s" % \
819                                (k, value, surl))
820                        self.call_SetValue(surl, req, cf)
821                    else:
822                        self.log.error("Bad export request: %s" % p)
823                elif p.get('name', '') == 'ssh_port':
824                    k = p.get('key', None)
825                    surl = p.get('store', None)
826                    if surl and k:
827                        req = { 'name': k, 'value': self.ssh_port }
828                        self.log.debug("Setting %s to %s on %s" % \
829                                (k, self.ssh_port, surl))
830                        self.call_SetValue(surl, req, cf)
831                    else:
832                        self.log.error("Bad export request: %s" % p)
833                else:
834                    self.log.error("Unknown export parameter: %s" % \
835                            p.get('name'))
836                    continue
837
838    def add_seer_node(self, topo, name, startup):
839        """
840        Add a seer node to the given topology, with the startup command passed
841        in.  Used by configure seer_services.
842        """
843        c_node = topdl.Computer(
844                name=name, 
845                os= topdl.OperatingSystem(
846                    attribute=[
847                    { 'attribute': 'osid', 
848                        'value': self.local_seer_image },
849                    ]),
850                attribute=[
851                    { 'attribute': 'startup', 'value': startup },
852                    ]
853                )
854        self.add_kit(c_node, self.local_seer_software)
855        topo.elements.append(c_node)
856
857    def configure_seer_services(self, services, topo, softdir):
858        """
859        Make changes to the topology required for the seer requests being made.
860        Specifically, add any control or master nodes required and set up the
861        start commands on the nodes to interconnect them.
862        """
863        local_seer = False      # True if we need to add a control node
864        collect_seer = False    # True if there is a seer-master node
865        seer_master= False      # True if we need to add the seer-master
866        for s in services:
867            s_name = s.get('name', '')
868            s_vis = s.get('visibility','')
869
870            if s_name  == 'local_seer_control' and s_vis == 'export':
871                local_seer = True
872            elif s_name == 'seer_master':
873                if s_vis == 'import':
874                    collect_seer = True
875                elif s_vis == 'export':
876                    seer_master = True
877       
878        # We've got the whole picture now, so add nodes if needed and configure
879        # them to interconnect properly.
880        if local_seer or seer_master:
881            # Copy local seer control node software to the tempdir
882            for l, f in self.local_seer_software:
883                base = os.path.basename(f)
884                copy_file(f, "%s/%s" % (softdir, base))
885        # If we're collecting seers somewhere the controllers need to talk to
886        # the master.  In testbeds that export the service, that will be a
887        # local node that we'll add below.  Elsewhere it will be the control
888        # portal that will port forward to the exporting master.
889        if local_seer:
890            if collect_seer:
891                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
892            else:
893                startup = self.local_seer_start
894            self.add_seer_node(topo, 'control', startup)
895        # If this is the seer master, add that node, too.
896        if seer_master:
897            self.add_seer_node(topo, 'seer-master', 
898                    "%s -R -n -R seer-master -R -A -R sink" % \
899                            self.seer_master_start)
900
901    def retrieve_software(self, topo, certfile, softdir):
902        """
903        Collect the software that nodes in the topology need loaded and stage
904        it locally.  This implies retrieving it from the experiment_controller
905        and placing it into softdir.  Certfile is used to prove that this node
906        has access to that data (it's the allocation/segment fedid).  Finally
907        local portal and federation software is also copied to the same staging
908        directory for simplicity - all software needed for experiment creation
909        is in softdir.
910        """
911        sw = set()
912        for e in topo.elements:
913            for s in getattr(e, 'software', []):
914                sw.add(s.location)
915        for s in sw:
916            self.log.debug("Retrieving %s" % s)
917            try:
918                get_url(s, certfile, softdir)
919            except:
920                t, v, st = sys.exc_info()
921                raise service_error(service_error.internal,
922                        "Error retrieving %s: %s" % (s, v))
923
924        # Copy local federation and portal node software to the tempdir
925        for s in (self.federation_software, self.portal_software):
926            for l, f in s:
927                base = os.path.basename(f)
928                copy_file(f, "%s/%s" % (softdir, base))
929
930
931    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
932        """
933        Gather common configuration files, retrieve or create an experiment
934        name and project name, and return the ssh_key filenames.  Create an
935        allocation log bound to the state log variable as well.
936        """
937        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
938        ename = None
939        pubkey_base = None
940        secretkey_base = None
941        proj = None
942        user = None
943        alloc_log = None
944
945        for a in attrs:
946            if a['attribute'] in configs:
947                try:
948                    self.log.debug("Retrieving %s from %s" % \
949                            (a['attribute'], a['value']))
950                    get_url(a['value'], certfile, tmpdir)
951                except:
952                    t, v, st = sys.exc_info()
953                    raise service_error(service_error.internal,
954                            "Error retrieving %s: %s" % (a.get('value', ""), v))
955            if a['attribute'] == 'ssh_pubkey':
956                pubkey_base = a['value'].rpartition('/')[2]
957            if a['attribute'] == 'ssh_secretkey':
958                secretkey_base = a['value'].rpartition('/')[2]
959            if a['attribute'] == 'experiment_name':
960                ename = a['value']
961
962        if not ename:
963            ename = ""
964            for i in range(0,5):
965                ename += random.choice(string.ascii_letters)
966            self.log.warn("No experiment name: picked one randomly: %s" \
967                    % ename)
968
969        if not pubkey_base:
970            raise service_error(service_error.req, 
971                    "No public key attribute")
972
973        if not secretkey_base:
974            raise service_error(service_error.req, 
975                    "No secret key attribute")
976
977        self.state_lock.acquire()
978        if aid in self.allocation:
979            proj = self.allocation[aid].get('project', None)
980            if not proj: 
981                proj = self.allocation[aid].get('sproject', None)
982            user = self.allocation[aid].get('user', None)
983            self.allocation[aid]['experiment'] = ename
984            self.allocation[aid]['log'] = [ ]
985            # Create a logger that logs to the experiment's state object as
986            # well as to the main log file.
987            alloc_log = logging.getLogger('fedd.access.%s' % ename)
988            h = logging.StreamHandler(
989                    list_log.list_log(self.allocation[aid]['log']))
990            # XXX: there should be a global one of these rather than
991            # repeating the code.
992            h.setFormatter(logging.Formatter(
993                "%(asctime)s %(name)s %(message)s",
994                        '%d %b %y %H:%M:%S'))
995            alloc_log.addHandler(h)
996            self.write_state()
997        self.state_lock.release()
998
999        if not proj:
1000            raise service_error(service_error.internal, 
1001                    "Can't find project for %s" %aid)
1002
1003        if not user:
1004            raise service_error(service_error.internal, 
1005                    "Can't find creation user for %s" %aid)
1006
1007        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1008
1009    def finalize_experiment(self, starter, topo, aid, alloc_id):
1010        """
1011        Store key bits of experiment state in the global repository, including
1012        the response that may need to be replayed, and return the response.
1013        """
1014        # Copy the assigned names into the return topology
1015        embedding = [ ]
1016        for n in starter.node:
1017            embedding.append({ 
1018                'toponame': n,
1019                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1020                })
1021        # Grab the log (this is some anal locking, but better safe than
1022        # sorry)
1023        self.state_lock.acquire()
1024        logv = "".join(self.allocation[aid]['log'])
1025        # It's possible that the StartSegment call gets retried (!).
1026        # if the 'started' key is in the allocation, we'll return it rather
1027        # than redo the setup.
1028        self.allocation[aid]['started'] = { 
1029                'allocID': alloc_id,
1030                'allocationLog': logv,
1031                'segmentdescription': { 
1032                    'topdldescription': topo.clone().to_dict()
1033                    },
1034                'embedding': embedding
1035                }
1036        retval = copy.copy(self.allocation[aid]['started'])
1037        self.write_state()
1038        self.state_lock.release()
1039        return retval
1040   
1041    # End of StartSegment support routines
1042
1043    def StartSegment(self, req, fid):
1044        err = None  # Any service_error generated after tmpdir is created
1045        rv = None   # Return value from segment creation
1046
1047        try:
1048            req = req['StartSegmentRequestBody']
1049            auth_attr = req['allocID']['fedid']
1050            topref = req['segmentdescription']['topdldescription']
1051        except KeyError:
1052            raise service_error(server_error.req, "Badly formed request")
1053
1054        connInfo = req.get('connection', [])
1055        services = req.get('service', [])
1056        aid = "%s" % auth_attr
1057        attrs = req.get('fedAttr', [])
1058        if not self.auth.check_attribute(fid, auth_attr):
1059            raise service_error(service_error.access, "Access denied")
1060        else:
1061            # See if this is a replay of an earlier succeeded StartSegment -
1062            # sometimes SSL kills 'em.  If so, replay the response rather than
1063            # redoing the allocation.
1064            self.state_lock.acquire()
1065            retval = self.allocation[aid].get('started', None)
1066            self.state_lock.release()
1067            if retval:
1068                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1069                        "replaying response")
1070                return retval
1071
1072        # A new request.  Do it.
1073
1074        if topref: topo = topdl.Topology(**topref)
1075        else:
1076            raise service_error(service_error.req, 
1077                    "Request missing segmentdescription'")
1078       
1079        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1080        try:
1081            tmpdir = tempfile.mkdtemp(prefix="access-")
1082            softdir = "%s/software" % tmpdir
1083            os.mkdir(softdir)
1084        except EnvironmentError:
1085            raise service_error(service_error.internal, "Cannot create tmp dir")
1086
1087        # Try block alllows us to clean up temporary files.
1088        try:
1089            self.retrieve_software(topo, certfile, softdir)
1090            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1091                    self.initialize_experiment_info(attrs, aid, 
1092                            certfile, tmpdir)
1093
1094            # Set up userconf and seer if needed
1095            self.configure_userconf(services, tmpdir)
1096            self.configure_seer_services(services, topo, softdir)
1097            # Get and send synch store variables
1098            self.export_store_info(certfile, proj, ename, connInfo)
1099            self.import_store_info(certfile, connInfo)
1100
1101            expfile = "%s/experiment.tcl" % tmpdir
1102
1103            self.generate_portal_configs(topo, pubkey_base, 
1104                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1105            self.generate_ns2(topo, expfile, 
1106                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1107
1108            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1109                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1110                    cert=self.xmlrpc_cert)
1111            rv = starter(self, ename, proj, user, expfile, tmpdir)
1112        except service_error, e:
1113            err = e
1114        except:
1115            t, v, st = sys.exc_info()
1116            err = service_error(service_error.internal, "%s: %s" % \
1117                    (v, traceback.extract_tb(st)))
1118
1119        # Walk up tmpdir, deleting as we go
1120        if self.cleanup: self.remove_dirs(tmpdir)
1121        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1122
1123        if rv:
1124            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1125        elif err:
1126            raise service_error(service_error.federant,
1127                    "Swapin failed: %s" % err)
1128        else:
1129            raise service_error(service_error.federant, "Swapin failed")
1130
1131    def TerminateSegment(self, req, fid):
1132        try:
1133            req = req['TerminateSegmentRequestBody']
1134        except KeyError:
1135            raise service_error(server_error.req, "Badly formed request")
1136
1137        auth_attr = req['allocID']['fedid']
1138        aid = "%s" % auth_attr
1139        attrs = req.get('fedAttr', [])
1140        if not self.auth.check_attribute(fid, auth_attr):
1141            raise service_error(service_error.access, "Access denied")
1142
1143        self.state_lock.acquire()
1144        if aid in self.allocation:
1145            proj = self.allocation[aid].get('project', None)
1146            if not proj: 
1147                proj = self.allocation[aid].get('sproject', None)
1148            user = self.allocation[aid].get('user', None)
1149            ename = self.allocation[aid].get('experiment', None)
1150        else:
1151            proj = None
1152            user = None
1153            ename = None
1154        self.state_lock.release()
1155
1156        if not proj:
1157            raise service_error(service_error.internal, 
1158                    "Can't find project for %s" % aid)
1159
1160        if not user:
1161            raise service_error(service_error.internal, 
1162                    "Can't find creation user for %s" % aid)
1163        if not ename:
1164            raise service_error(service_error.internal, 
1165                    "Can't find experiment name for %s" % aid)
1166        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1167                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1168        stopper(self, user, proj, ename)
1169        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.