source: fedd/federation/emulab_access.py @ 6e63513

axis_examplecompt_changesinfo-ops
Last change on this file since 6e63513 was 6e63513, checked in by Ted Faber <faber@…>, 13 years ago

Checkpoint

  • Property mode set to 100644
File size: 38.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        # XXX: this should go?
108        #if config.has_option("access", "accessdb"):
109        #    self.read_access(config.get("access", "accessdb"))
110        tb = config.get('access', 'testbed')
111        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
112        else: self.testbed = [ ]
113
114        # authorization information
115        self.auth_type = config.get('access', 'auth_type') \
116                or 'legacy'
117        self.auth_dir = config.get('access', 'auth_dir')
118        accessdb = config.get("access", "accessdb")
119        # initialize the authorization system
120        if self.auth_type == 'legacy':
121            if accessdb:
122                self.read_access(accessdb, self.make_access_project)
123        elif self.auth_type == 'abac':
124            self.auth = abac_authorizer(load=self.auth_dir)
125            if accessdb:
126                self.read_abac_access(accessdb, self.make_abac_access_project)
127        else:
128            raise service_error(service_error.internal, 
129                    "Unknown auth_type: %s" % self.auth_type)
130
131        # read_state in the base_class
132        self.state_lock.acquire()
133        for a  in ('allocation', 'projects', 'keys', 'types'):
134            if a not in self.state:
135                self.state[a] = { }
136        self.allocation = self.state['allocation']
137        self.projects = self.state['projects']
138        self.keys = self.state['keys']
139        self.types = self.state['types']
140        if self.auth_type == "legacy": 
141            # Add the ownership attributes to the authorizer.  Note that the
142            # indices of the allocation dict are strings, but the attributes are
143            # fedids, so there is a conversion.
144            for k in self.allocation.keys():
145                for o in self.allocation[k].get('owners', []):
146                    self.auth.set_attribute(o, fedid(hexstr=k))
147                if self.allocation[k].has_key('userconfig'):
148                    sfid = self.allocation[k]['userconfig']
149                    fid = fedid(hexstr=sfid)
150                    self.auth.set_attribute(fid, "/%s" % sfid)
151        self.state_lock.release()
152        self.exports = {
153                'SMB': self.export_SMB,
154                'seer': self.export_seer,
155                'tmcd': self.export_tmcd,
156                'userconfig': self.export_userconfig,
157                'project_export': self.export_project_export,
158                'local_seer_control': self.export_local_seer,
159                'seer_master': self.export_seer_master,
160                'hide_hosts': self.export_hide_hosts,
161                }
162
163        if not self.local_seer_image or not self.local_seer_software or \
164                not self.local_seer_start:
165            if 'local_seer_control' in self.exports:
166                del self.exports['local_seer_control']
167
168        if not self.local_seer_image or not self.local_seer_software or \
169                not self.seer_master_start:
170            if 'seer_master' in self.exports:
171                del self.exports['seer_master']
172
173
174        self.soap_services = {\
175            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
176            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
177            'StartSegment': soap_handler("StartSegment", self.StartSegment),
178            'TerminateSegment': soap_handler("TerminateSegment",
179                self.TerminateSegment),
180            }
181        self.xmlrpc_services =  {\
182            'RequestAccess': xmlrpc_handler('RequestAccess',
183                self.RequestAccess),
184            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
185                self.ReleaseAccess),
186            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': xmlrpc_handler('TerminateSegment',
188                self.TerminateSegment),
189            }
190
191        self.call_SetValue = service_caller('SetValue')
192        self.call_GetValue = service_caller('GetValue', log=self.log)
193
194        if not config.has_option("allocate", "uri"):
195            self.allocate_project = \
196                allocate_project_local(config, auth)
197        else:
198            self.allocate_project = \
199                allocate_project_remote(config, auth)
200
201
202        # If the project allocator exports services, put them in this object's
203        # maps so that classes that instantiate this can call the services.
204        self.soap_services.update(self.allocate_project.soap_services)
205        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
206
207    @staticmethod
208    def make_access_project(str):
209        """
210        Convert a string of the form (id[:resources:resouces], id, id) into an
211        access_project.  This is called by read_access to convert to local
212        attributes.  It returns a tuple of the form (project, user, user) where
213        users may be names or fedids.
214        """
215        def parse_name(n):
216            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
217            else: return n
218
219        str = str.strip()
220        if str.startswith('(') and str.endswith(')'):
221            str = str[1:-1]
222            names = [ s.strip() for s in str.split(",")]
223            if len(names) > 3:
224                raise self.parse_error("More than three fields in name")
225            first = names[0].split(":")
226            if first == 'fedid:':
227                del first[0]
228                first[0] = fedid(hexstr=first[0])
229            names[0] = access_project(first[0], first[1:])
230
231            for i in range(1,2):
232                names[i] = parse_name(names[i])
233
234            return tuple(names)
235        else:
236            raise self.parse_error('Bad mapping (unbalanced parens)')
237
238    @staticmethod
239    def make_abac_access_project(str):
240        """
241        Convert a string of the form (id, id) into an access_project.  This is
242        called by read_abac_access to convert to local attributes.  It returns
243        a tuple of the form (project, user, user) where the two users are
244        always the same.
245        """
246
247        str = str.strip()
248        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
249            proj, user = str.split(',')
250            return ( proj.strip(), user.strip(), user.strip())
251        else:
252            raise self.parse_error(
253                    'Bad mapping (unbalanced parens or more than 1 comma)')
254
255
256    # RequestAccess support routines
257
258    def lookup_access(self, req, fid):
259        """
260        Look up the local access control information mapped to this fedid and
261        credentials.  In this case it is a (project, create_user, access_user)
262        triple, and a triple of three booleans indicating which, if any will
263        need to be dynamically created.  Finally a list of owners for that
264        allocation is returned.
265
266        lookup_access_base pulls the first triple out, and it is parsed by this
267        routine into the boolean map.  Owners is always the controlling fedid.
268        """
269        # Return values
270        rp = access_project(None, ())
271        ru = None
272        # This maps a valid user to the Emulab projects and users to use
273        found, match = self.lookup_access_base(req, fid)
274        tb, project, user = match
275       
276        if found == None:
277            raise service_error(service_error.access,
278                    "Access denied - cannot map access")
279
280        # resolve <dynamic> and <same> in found
281        dyn_proj = False
282        dyn_create_user = False
283        dyn_service_user = False
284
285        if found[0].name == "<same>":
286            if project != None:
287                rp.name = project
288            else : 
289                raise service_error(\
290                        service_error.server_config,
291                        "Project matched <same> when no project given")
292        elif found[0].name == "<dynamic>":
293            rp.name = None
294            dyn_proj = True
295        else:
296            rp.name = found[0].name
297        rp.node_types = found[0].node_types;
298
299        if found[1] == "<same>":
300            if user_match == "<any>":
301                if user != None: rcu = user[0]
302                else: raise service_error(\
303                        service_error.server_config,
304                        "Matched <same> on anonymous request")
305            else:
306                rcu = user_match
307        elif found[1] == "<dynamic>":
308            rcu = None
309            dyn_create_user = True
310        else:
311            rcu = found[1]
312       
313        if found[2] == "<same>":
314            if user_match == "<any>":
315                if user != None: rsu = user[0]
316                else: raise service_error(\
317                        service_error.server_config,
318                        "Matched <same> on anonymous request")
319            else:
320                rsu = user_match
321        elif found[2] == "<dynamic>":
322            rsu = None
323            dyn_service_user = True
324        else:
325            rsu = found[2]
326
327        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
328                [ fid ]
329
330    def lookup_abac_access(self, req, fid): 
331        # Import request credentials into this (clone later??)
332        if self.auth.import_credentials(data_list=req.get('abac_credential', [])):
333            self.auth.save()
334
335        # Check every attribute that we know how to map and take the first
336        # success.
337        for attr in (self.access.keys()):
338            if self.auth.check_attribute(fid, attr):
339                # XXX: needs to deal with dynamics
340                return copy.copy(self.access[attr]), (False, False, False), \
341                        [ fid ]
342            else:
343                self.log.debug("Access failed for %s %s" % (attr, fid))
344        else:
345            raise service_error(service_error.access, "Access denied")
346
347
348    def do_project_allocation(self, dyn, project, user):
349        """
350        Call the project allocation routines and return the info.
351        """
352        if dyn: 
353            # Compose the dynamic project request
354            # (only dynamic, dynamic currently allowed)
355            preq = { 'AllocateProjectRequestBody': \
356                        { 'project' : {\
357                            'user': [ \
358                            { \
359                                'access': [ { 
360                                    'sshPubkey': self.ssh_pubkey_file } ],
361                                 'role': "serviceAccess",\
362                            }, \
363                            { \
364                                'access': [ { 
365                                    'sshPubkey': self.ssh_pubkey_file } ],
366                                 'role': "experimentCreation",\
367                            }, \
368                            ], \
369                            }\
370                        }\
371                    }
372            return self.allocate_project.dynamic_project(preq)
373        else:
374            preq = {'StaticProjectRequestBody' : \
375                    { 'project': \
376                        { 'name' : { 'localname' : project },\
377                          'user' : [ \
378                            {\
379                                'userID': { 'localname' : user }, \
380                                'access': [ { 
381                                    'sshPubkey': self.ssh_pubkey_file } ],
382                                'role': 'experimentCreation'\
383                            },\
384                            {\
385                                'userID': { 'localname' : user}, \
386                                'access': [ { 
387                                    'sshPubkey': self.ssh_pubkey_file } ],
388                                'role': 'serviceAccess'\
389                            },\
390                        ]}\
391                    }\
392            }
393            return self.allocate_project.static_project(preq)
394
395    def save_project_state(self, aid, ap, dyn, owners):
396        """
397        Parse out and save the information relevant to the project created for
398        this experiment.  That info is largely in ap and owners.  dyn indicates
399        that the project was created dynamically.  Return the user and project
400        names.
401        """
402        self.state_lock.acquire()
403        self.allocation[aid] = { }
404        try:
405            pname = ap['project']['name']['localname']
406        except KeyError:
407            pname = None
408
409        if dyn:
410            if not pname:
411                self.state_lock.release()
412                raise service_error(service_error.internal,
413                        "Misformed allocation response?")
414            if pname in self.projects: self.projects[pname] += 1
415            else: self.projects[pname] = 1
416            self.allocation[aid]['project'] = pname
417        else:
418            # sproject is a static project associated with this allocation.
419            self.allocation[aid]['sproject'] = pname
420
421        self.allocation[aid]['keys'] = [ ]
422
423        try:
424            for u in ap['project']['user']:
425                uname = u['userID']['localname']
426                if u['role'] == 'experimentCreation':
427                    self.allocation[aid]['user'] = uname
428                for k in [ k['sshPubkey'] for k in u['access'] \
429                        if k.has_key('sshPubkey') ]:
430                    kv = "%s:%s" % (uname, k)
431                    if self.keys.has_key(kv): self.keys[kv] += 1
432                    else: self.keys[kv] = 1
433                    self.allocation[aid]['keys'].append((uname, k))
434        except KeyError:
435            self.state_lock.release()
436            raise service_error(service_error.internal,
437                    "Misformed allocation response?")
438
439        self.allocation[aid]['owners'] = owners
440        self.write_state()
441        self.state_lock.release()
442        return (pname, uname)
443
444    # End of RequestAccess support routines
445
446    def RequestAccess(self, req, fid):
447        """
448        Handle the access request.  Proxy if not for us.
449
450        Parse out the fields and make the allocations or rejections if for us,
451        otherwise, assuming we're willing to proxy, proxy the request out.
452        """
453
454        def gateway_hardware(h):
455            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
456            else: return h
457
458        def get_export_project(svcs):
459            """
460            if the service requests includes one to export a project, return
461            that project.
462            """
463            rv = None
464            for s in svcs:
465                if s.get('name', '') == 'project_export' and \
466                        s.get('visibility', '') == 'export':
467                    if not rv: 
468                        for a in s.get('feddAttr', []):
469                            if a.get('attribute', '') == 'project' \
470                                    and 'value' in a:
471                                rv = a['value']
472                    else:
473                        raise service_error(service_error, access, 
474                                'Requesting multiple project exports is ' + \
475                                        'not supported');
476            return rv
477
478        # The dance to get into the request body
479        if req.has_key('RequestAccessRequestBody'):
480            req = req['RequestAccessRequestBody']
481        else:
482            raise service_error(service_error.req, "No request!?")
483
484
485        if self.auth_type == "legacy":
486            found, dyn, owners = self.lookup_access(req, fid)
487        elif self.auth_type == 'abac':
488            found, dyn, owners = self.lookup_abac_access(req, fid)
489        else:
490            raise service_error(service_error.internal, 
491                    'Unknown auth_type: %s' % self.auth_type)
492        ap = None
493
494        # if this includes a project export request and the exported
495        # project is not the access project, access denied.
496        if 'service' in req:
497            ep = get_export_project(req['service'])
498            if ep and ep != found[0].name:
499                raise service_error(service_error.access,
500                        "Cannot export %s" % ep)
501
502        if self.ssh_pubkey_file:
503            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
504        else:
505            raise service_error(service_error.internal, 
506                    "SSH access parameters required")
507        # keep track of what's been added
508        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
509        aid = unicode(allocID)
510
511        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
512
513        services, svc_state = self.export_services(req.get('service',[]),
514                pname, uname)
515        self.state_lock.acquire()
516        # Store services state in global state
517        for k, v in svc_state.items():
518            self.allocation[aid][k] = v
519        self.write_state()
520        self.state_lock.release()
521        # Give the owners the right to change this allocation
522        for o in owners:
523            self.auth.set_attribute(o, allocID)
524        self.auth.save()
525        try:
526            f = open("%s/%s.pem" % (self.certdir, aid), "w")
527            print >>f, alloc_cert
528            f.close()
529        except EnvironmentError, e:
530            raise service_error(service_error.internal, 
531                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
532        resp = self.build_access_response({ 'fedid': allocID } ,
533                ap, services)
534        return resp
535
536    def do_release_project(self, del_project, del_users, del_types):
537        """
538        If a project and users has to be deleted, make the call.
539        """
540        msg = { 'project': { }}
541        if del_project:
542            msg['project']['name']= {'localname': del_project}
543        users = [ ]
544        for u in del_users.keys():
545            users.append({ 'userID': { 'localname': u },\
546                'access' :  \
547                        [ {'sshPubkey' : s } for s in del_users[u]]\
548            })
549        if users: 
550            msg['project']['user'] = users
551        if len(del_types) > 0:
552            msg['resources'] = { 'node': \
553                    [ {'hardware': [ h ] } for h in del_types ]\
554                }
555        if self.allocate_project.release_project:
556            msg = { 'ReleaseProjectRequestBody' : msg}
557            self.allocate_project.release_project(msg)
558
559    def ReleaseAccess(self, req, fid):
560        # The dance to get into the request body
561        if req.has_key('ReleaseAccessRequestBody'):
562            req = req['ReleaseAccessRequestBody']
563        else:
564            raise service_error(service_error.req, "No request!?")
565
566        try:
567            if req['allocID'].has_key('localname'):
568                auth_attr = aid = req['allocID']['localname']
569            elif req['allocID'].has_key('fedid'):
570                aid = unicode(req['allocID']['fedid'])
571                auth_attr = req['allocID']['fedid']
572            else:
573                raise service_error(service_error.req,
574                        "Only localnames and fedids are understood")
575        except KeyError:
576            raise service_error(service_error.req, "Badly formed request")
577
578        self.log.debug("[access] deallocation requested for %s", aid)
579        if not self.auth.check_attribute(fid, auth_attr):
580            self.log.debug("[access] deallocation denied for %s", aid)
581            raise service_error(service_error.access, "Access Denied")
582
583        # If we know this allocation, reduce the reference counts and
584        # remove the local allocations.  Otherwise report an error.  If
585        # there is an allocation to delete, del_users will be a dictonary
586        # of sets where the key is the user that owns the keys in the set.
587        # We use a set to avoid duplicates.  del_project is just the name
588        # of any dynamic project to delete.  We're somewhat lazy about
589        # deleting authorization attributes.  Having access to something
590        # that doesn't exist isn't harmful.
591        del_users = { }
592        del_project = None
593        del_types = set()
594
595        self.state_lock.acquire()
596        if aid in self.allocation:
597            self.log.debug("Found allocation for %s" %aid)
598            for k in self.allocation[aid]['keys']:
599                kk = "%s:%s" % k
600                self.keys[kk] -= 1
601                if self.keys[kk] == 0:
602                    if not del_users.has_key(k[0]):
603                        del_users[k[0]] = set()
604                    del_users[k[0]].add(k[1])
605                    del self.keys[kk]
606
607            if 'project' in self.allocation[aid]:
608                pname = self.allocation[aid]['project']
609                self.projects[pname] -= 1
610                if self.projects[pname] == 0:
611                    del_project = pname
612                    del self.projects[pname]
613
614            if 'types' in self.allocation[aid]:
615                for t in self.allocation[aid]['types']:
616                    self.types[t] -= 1
617                    if self.types[t] == 0:
618                        if not del_project: del_project = t[0]
619                        del_types.add(t[1])
620                        del self.types[t]
621
622            del self.allocation[aid]
623            self.write_state()
624            self.state_lock.release()
625            # If we actually have resources to deallocate, prepare the call.
626            if del_project or del_users:
627                self.do_release_project(del_project, del_users, del_types)
628            # And remove the access cert
629            cf = "%s/%s.pem" % (self.certdir, aid)
630            self.log.debug("Removing %s" % cf)
631            os.remove(cf)
632            return { 'allocID': req['allocID'] } 
633        else:
634            self.state_lock.release()
635            raise service_error(service_error.req, "No such allocation")
636
637    # These are subroutines for StartSegment
638    def generate_ns2(self, topo, expfn, softdir, connInfo):
639        """
640        Convert topo into an ns2 file, decorated with appropriate commands for
641        the particular testbed setup.  Convert all requests for software, etc
642        to point at the staged copies on this testbed and add the federation
643        startcommands.
644        """
645        class dragon_commands:
646            """
647            Functor to spit out approrpiate dragon commands for nodes listed in
648            the connectivity description.  The constructor makes a dict mapping
649            dragon nodes to their parameters and the __call__ checks each
650            element in turn for membership.
651            """
652            def __init__(self, map):
653                self.node_info = map
654
655            def __call__(self, e):
656                s = ""
657                if isinstance(e, topdl.Computer):
658                    if self.node_info.has_key(e.name):
659                        info = self.node_info[e.name]
660                        for ifname, vlan, type in info:
661                            for i in e.interface:
662                                if i.name == ifname:
663                                    addr = i.get_attribute('ip4_address')
664                                    subs = i.substrate[0]
665                                    break
666                            else:
667                                raise service_error(service_error.internal,
668                                        "No interface %s on element %s" % \
669                                                (ifname, e.name))
670                            # XXX: do netmask right
671                            if type =='link':
672                                s = ("tb-allow-external ${%s} " + \
673                                        "dragonportal ip %s vlan %s " + \
674                                        "netmask 255.255.255.0\n") % \
675                                        (topdl.to_tcl_name(e.name), addr, vlan)
676                            elif type =='lan':
677                                s = ("tb-allow-external ${%s} " + \
678                                        "dragonportal " + \
679                                        "ip %s vlan %s usurp %s\n") % \
680                                        (topdl.to_tcl_name(e.name), addr, 
681                                                vlan, subs)
682                            else:
683                                raise service_error(service_error_internal,
684                                        "Unknown DRAGON type %s" % type)
685                return s
686
687        class not_dragon:
688            """
689            Return true if a node is in the given map of dragon nodes.
690            """
691            def __init__(self, map):
692                self.nodes = set(map.keys())
693
694            def __call__(self, e):
695                return e.name not in self.nodes
696
697        # Main line of generate_ns2
698        t = topo.clone()
699
700        # Create the map of nodes that need direct connections (dragon
701        # connections) from the connInfo
702        dragon_map = { }
703        for i in [ i for i in connInfo if i['type'] == 'transit']:
704            for a in i.get('fedAttr', []):
705                if a['attribute'] == 'vlan_id':
706                    vlan = a['value']
707                    break
708            else:
709                raise service_error(service_error.internal, "No vlan tag")
710            members = i.get('member', [])
711            if len(members) > 1: type = 'lan'
712            else: type = 'link'
713
714            try:
715                for m in members:
716                    if m['element'] in dragon_map:
717                        dragon_map[m['element']].append(( m['interface'], 
718                            vlan, type))
719                    else:
720                        dragon_map[m['element']] = [( m['interface'], 
721                            vlan, type),]
722            except KeyError:
723                raise service_error(service_error.req,
724                        "Missing connectivity info")
725
726        # Weed out the things we aren't going to instantiate: Segments, portal
727        # substrates, and portal interfaces.  (The copy in the for loop allows
728        # us to delete from e.elements in side the for loop).  While we're
729        # touching all the elements, we also adjust paths from the original
730        # testbed to local testbed paths and put the federation commands into
731        # the start commands
732        for e in [e for e in t.elements]:
733            if isinstance(e, topdl.Segment):
734                t.elements.remove(e)
735            if isinstance(e, topdl.Computer):
736                self.add_kit(e, self.federation_software)
737                if e.get_attribute('portal') and self.portal_startcommand:
738                    # Add local portal support software
739                    self.add_kit(e, self.portal_software)
740                    # Portals never have a user-specified start command
741                    e.set_attribute('startup', self.portal_startcommand)
742                elif self.node_startcommand:
743                    if e.get_attribute('startup'):
744                        e.set_attribute('startup', "%s \\$USER '%s'" % \
745                                (self.node_startcommand, 
746                                    e.get_attribute('startup')))
747                    else:
748                        e.set_attribute('startup', self.node_startcommand)
749
750                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
751                # Remove portal interfaces that do not connect to DRAGON
752                e.interface = [i for i in e.interface \
753                        if not i.get_attribute('portal') or i.name in dinf ]
754            # Fix software paths
755            for s in getattr(e, 'software', []):
756                s.location = re.sub("^.*/", softdir, s.location)
757
758        t.substrates = [ s.clone() for s in t.substrates ]
759        t.incorporate_elements()
760
761        # Customize the ns2 output for local portal commands and images
762        filters = []
763
764        if self.dragon_endpoint:
765            add_filter = not_dragon(dragon_map)
766            filters.append(dragon_commands(dragon_map))
767        else:
768            add_filter = None
769
770        if self.portal_command:
771            filters.append(topdl.generate_portal_command_filter(
772                self.portal_command, add_filter=add_filter))
773
774        if self.portal_image:
775            filters.append(topdl.generate_portal_image_filter(
776                self.portal_image))
777
778        if self.portal_type:
779            filters.append(topdl.generate_portal_hardware_filter(
780                self.portal_type))
781
782        # Convert to ns and write it out
783        expfile = topdl.topology_to_ns2(t, filters)
784        try:
785            f = open(expfn, "w")
786            print >>f, expfile
787            f.close()
788        except EnvironmentError:
789            raise service_error(service_error.internal,
790                    "Cannot write experiment file %s: %s" % (expfn,e))
791
792    def export_store_info(self, cf, proj, ename, connInfo):
793        """
794        For the export requests in the connection info, install the peer names
795        at the experiment controller via SetValue calls.
796        """
797
798        for c in connInfo:
799            for p in [ p for p in c.get('parameter', []) \
800                    if p.get('type', '') == 'output']:
801
802                if p.get('name', '') == 'peer':
803                    k = p.get('key', None)
804                    surl = p.get('store', None)
805                    if surl and k and k.index('/') != -1:
806                        value = "%s.%s.%s%s" % \
807                                (k[k.index('/')+1:], ename, proj, self.domain)
808                        req = { 'name': k, 'value': value }
809                        self.log.debug("Setting %s to %s on %s" % \
810                                (k, value, surl))
811                        self.call_SetValue(surl, req, cf)
812                    else:
813                        self.log.error("Bad export request: %s" % p)
814                elif p.get('name', '') == 'ssh_port':
815                    k = p.get('key', None)
816                    surl = p.get('store', None)
817                    if surl and k:
818                        req = { 'name': k, 'value': self.ssh_port }
819                        self.log.debug("Setting %s to %s on %s" % \
820                                (k, self.ssh_port, surl))
821                        self.call_SetValue(surl, req, cf)
822                    else:
823                        self.log.error("Bad export request: %s" % p)
824                else:
825                    self.log.error("Unknown export parameter: %s" % \
826                            p.get('name'))
827                    continue
828
829    def add_seer_node(self, topo, name, startup):
830        """
831        Add a seer node to the given topology, with the startup command passed
832        in.  Used by configure seer_services.
833        """
834        c_node = topdl.Computer(
835                name=name, 
836                os= topdl.OperatingSystem(
837                    attribute=[
838                    { 'attribute': 'osid', 
839                        'value': self.local_seer_image },
840                    ]),
841                attribute=[
842                    { 'attribute': 'startup', 'value': startup },
843                    ]
844                )
845        self.add_kit(c_node, self.local_seer_software)
846        topo.elements.append(c_node)
847
848    def configure_seer_services(self, services, topo, softdir):
849        """
850        Make changes to the topology required for the seer requests being made.
851        Specifically, add any control or master nodes required and set up the
852        start commands on the nodes to interconnect them.
853        """
854        local_seer = False      # True if we need to add a control node
855        collect_seer = False    # True if there is a seer-master node
856        seer_master= False      # True if we need to add the seer-master
857        for s in services:
858            s_name = s.get('name', '')
859            s_vis = s.get('visibility','')
860
861            if s_name  == 'local_seer_control' and s_vis == 'export':
862                local_seer = True
863            elif s_name == 'seer_master':
864                if s_vis == 'import':
865                    collect_seer = True
866                elif s_vis == 'export':
867                    seer_master = True
868       
869        # We've got the whole picture now, so add nodes if needed and configure
870        # them to interconnect properly.
871        if local_seer or seer_master:
872            # Copy local seer control node software to the tempdir
873            for l, f in self.local_seer_software:
874                base = os.path.basename(f)
875                copy_file(f, "%s/%s" % (softdir, base))
876        # If we're collecting seers somewhere the controllers need to talk to
877        # the master.  In testbeds that export the service, that will be a
878        # local node that we'll add below.  Elsewhere it will be the control
879        # portal that will port forward to the exporting master.
880        if local_seer:
881            if collect_seer:
882                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
883            else:
884                startup = self.local_seer_start
885            self.add_seer_node(topo, 'control', startup)
886        # If this is the seer master, add that node, too.
887        if seer_master:
888            self.add_seer_node(topo, 'seer-master', 
889                    "%s -R -n -R seer-master -R -A -R sink" % \
890                            self.seer_master_start)
891
892    def retrieve_software(self, topo, certfile, softdir):
893        """
894        Collect the software that nodes in the topology need loaded and stage
895        it locally.  This implies retrieving it from the experiment_controller
896        and placing it into softdir.  Certfile is used to prove that this node
897        has access to that data (it's the allocation/segment fedid).  Finally
898        local portal and federation software is also copied to the same staging
899        directory for simplicity - all software needed for experiment creation
900        is in softdir.
901        """
902        sw = set()
903        for e in topo.elements:
904            for s in getattr(e, 'software', []):
905                sw.add(s.location)
906        for s in sw:
907            self.log.debug("Retrieving %s" % s)
908            try:
909                get_url(s, certfile, softdir)
910            except:
911                t, v, st = sys.exc_info()
912                raise service_error(service_error.internal,
913                        "Error retrieving %s: %s" % (s, v))
914
915        # Copy local federation and portal node software to the tempdir
916        for s in (self.federation_software, self.portal_software):
917            for l, f in s:
918                base = os.path.basename(f)
919                copy_file(f, "%s/%s" % (softdir, base))
920
921
922    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
923        """
924        Gather common configuration files, retrieve or create an experiment
925        name and project name, and return the ssh_key filenames.  Create an
926        allocation log bound to the state log variable as well.
927        """
928        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
929        ename = None
930        pubkey_base = None
931        secretkey_base = None
932        proj = None
933        user = None
934        alloc_log = None
935
936        for a in attrs:
937            if a['attribute'] in configs:
938                try:
939                    self.log.debug("Retrieving %s from %s" % \
940                            (a['attribute'], a['value']))
941                    get_url(a['value'], certfile, tmpdir)
942                except:
943                    t, v, st = sys.exc_info()
944                    raise service_error(service_error.internal,
945                            "Error retrieving %s: %s" % (a.get('value', ""), v))
946            if a['attribute'] == 'ssh_pubkey':
947                pubkey_base = a['value'].rpartition('/')[2]
948            if a['attribute'] == 'ssh_secretkey':
949                secretkey_base = a['value'].rpartition('/')[2]
950            if a['attribute'] == 'experiment_name':
951                ename = a['value']
952
953        if not ename:
954            ename = ""
955            for i in range(0,5):
956                ename += random.choice(string.ascii_letters)
957            self.log.warn("No experiment name: picked one randomly: %s" \
958                    % ename)
959
960        if not pubkey_base:
961            raise service_error(service_error.req, 
962                    "No public key attribute")
963
964        if not secretkey_base:
965            raise service_error(service_error.req, 
966                    "No secret key attribute")
967
968        self.state_lock.acquire()
969        if aid in self.allocation:
970            proj = self.allocation[aid].get('project', None)
971            if not proj: 
972                proj = self.allocation[aid].get('sproject', None)
973            user = self.allocation[aid].get('user', None)
974            self.allocation[aid]['experiment'] = ename
975            self.allocation[aid]['log'] = [ ]
976            # Create a logger that logs to the experiment's state object as
977            # well as to the main log file.
978            alloc_log = logging.getLogger('fedd.access.%s' % ename)
979            h = logging.StreamHandler(
980                    list_log.list_log(self.allocation[aid]['log']))
981            # XXX: there should be a global one of these rather than
982            # repeating the code.
983            h.setFormatter(logging.Formatter(
984                "%(asctime)s %(name)s %(message)s",
985                        '%d %b %y %H:%M:%S'))
986            alloc_log.addHandler(h)
987            self.write_state()
988        self.state_lock.release()
989
990        if not proj:
991            raise service_error(service_error.internal, 
992                    "Can't find project for %s" %aid)
993
994        if not user:
995            raise service_error(service_error.internal, 
996                    "Can't find creation user for %s" %aid)
997
998        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
999
1000    def finalize_experiment(self, starter, topo, aid, alloc_id):
1001        """
1002        Store key bits of experiment state in the global repository, including
1003        the response that may need to be replayed, and return the response.
1004        """
1005        # Copy the assigned names into the return topology
1006        embedding = [ ]
1007        for n in starter.node:
1008            embedding.append({ 
1009                'toponame': n,
1010                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1011                })
1012        # Grab the log (this is some anal locking, but better safe than
1013        # sorry)
1014        self.state_lock.acquire()
1015        logv = "".join(self.allocation[aid]['log'])
1016        # It's possible that the StartSegment call gets retried (!).
1017        # if the 'started' key is in the allocation, we'll return it rather
1018        # than redo the setup.
1019        self.allocation[aid]['started'] = { 
1020                'allocID': alloc_id,
1021                'allocationLog': logv,
1022                'segmentdescription': { 
1023                    'topdldescription': topo.clone().to_dict()
1024                    },
1025                'embedding': embedding
1026                }
1027        retval = copy.copy(self.allocation[aid]['started'])
1028        self.write_state()
1029        self.state_lock.release()
1030        return retval
1031   
1032    # End of StartSegment support routines
1033
1034    def StartSegment(self, req, fid):
1035        err = None  # Any service_error generated after tmpdir is created
1036        rv = None   # Return value from segment creation
1037
1038        try:
1039            req = req['StartSegmentRequestBody']
1040            auth_attr = req['allocID']['fedid']
1041            topref = req['segmentdescription']['topdldescription']
1042        except KeyError:
1043            raise service_error(server_error.req, "Badly formed request")
1044
1045        connInfo = req.get('connection', [])
1046        services = req.get('service', [])
1047        aid = "%s" % auth_attr
1048        attrs = req.get('fedAttr', [])
1049        if not self.auth.check_attribute(fid, auth_attr):
1050            raise service_error(service_error.access, "Access denied")
1051        else:
1052            # See if this is a replay of an earlier succeeded StartSegment -
1053            # sometimes SSL kills 'em.  If so, replay the response rather than
1054            # redoing the allocation.
1055            self.state_lock.acquire()
1056            retval = self.allocation[aid].get('started', None)
1057            self.state_lock.release()
1058            if retval:
1059                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1060                        "replaying response")
1061                return retval
1062
1063        # A new request.  Do it.
1064
1065        if topref: topo = topdl.Topology(**topref)
1066        else:
1067            raise service_error(service_error.req, 
1068                    "Request missing segmentdescription'")
1069       
1070        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1071        try:
1072            tmpdir = tempfile.mkdtemp(prefix="access-")
1073            softdir = "%s/software" % tmpdir
1074            os.mkdir(softdir)
1075        except EnvironmentError:
1076            raise service_error(service_error.internal, "Cannot create tmp dir")
1077
1078        # Try block alllows us to clean up temporary files.
1079        try:
1080            self.retrieve_software(topo, certfile, softdir)
1081            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1082                    self.initialize_experiment_info(attrs, aid, 
1083                            certfile, tmpdir)
1084
1085            # Set up userconf and seer if needed
1086            self.configure_userconf(services, tmpdir)
1087            self.configure_seer_services(services, topo, softdir)
1088            # Get and send synch store variables
1089            self.export_store_info(certfile, proj, ename, connInfo)
1090            self.import_store_info(certfile, connInfo)
1091
1092            expfile = "%s/experiment.tcl" % tmpdir
1093
1094            self.generate_portal_configs(topo, pubkey_base, 
1095                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1096            self.generate_ns2(topo, expfile, 
1097                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1098
1099            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1100                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1101                    cert=self.xmlrpc_cert)
1102            rv = starter(self, ename, proj, user, expfile, tmpdir)
1103        except service_error, e:
1104            err = e
1105        except:
1106            t, v, st = sys.exc_info()
1107            err = service_error(service_error.internal, "%s: %s" % \
1108                    (v, traceback.extract_tb(st)))
1109
1110        # Walk up tmpdir, deleting as we go
1111        if self.cleanup: self.remove_dirs(tmpdir)
1112        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1113
1114        if rv:
1115            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1116        elif err:
1117            raise service_error(service_error.federant,
1118                    "Swapin failed: %s" % err)
1119        else:
1120            raise service_error(service_error.federant, "Swapin failed")
1121
1122    def TerminateSegment(self, req, fid):
1123        try:
1124            req = req['TerminateSegmentRequestBody']
1125        except KeyError:
1126            raise service_error(server_error.req, "Badly formed request")
1127
1128        auth_attr = req['allocID']['fedid']
1129        aid = "%s" % auth_attr
1130        attrs = req.get('fedAttr', [])
1131        if not self.auth.check_attribute(fid, auth_attr):
1132            raise service_error(service_error.access, "Access denied")
1133
1134        self.state_lock.acquire()
1135        if aid in self.allocation:
1136            proj = self.allocation[aid].get('project', None)
1137            if not proj: 
1138                proj = self.allocation[aid].get('sproject', None)
1139            user = self.allocation[aid].get('user', None)
1140            ename = self.allocation[aid].get('experiment', None)
1141        else:
1142            proj = None
1143            user = None
1144            ename = None
1145        self.state_lock.release()
1146
1147        if not proj:
1148            raise service_error(service_error.internal, 
1149                    "Can't find project for %s" % aid)
1150
1151        if not user:
1152            raise service_error(service_error.internal, 
1153                    "Can't find creation user for %s" % aid)
1154        if not ename:
1155            raise service_error(service_error.internal, 
1156                    "Can't find experiment name for %s" % aid)
1157        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1158                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1159        stopper(self, user, proj, ename)
1160        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.