source: fedd/federation/emulab_access.py @ 2933343

compt_changesinfo-ops
Last change on this file since 2933343 was 2933343, checked in by Ted Faber <faber@…>, 13 years ago

"This should go?" is gone.

  • Property mode set to 100644
File size: 38.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    max_name_len = 19
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        access_base.__init__(self, config, auth)
61
62        self.max_name_len = access.max_name_len
63
64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75        self.seer_master_start = config.get("access", "seer_master_start")
76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
78        self.ssh_port = config.get("access","ssh_port") or "22"
79        self.boss = config.get("access", "boss")
80        self.ops = config.get("access", "ops")
81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103        elif self.access_type == 'local_emulab':
104            self.start_segment = local_emulab_segment.start_segment
105            self.stop_segment = local_emulab_segment.stop_segment
106        else:
107            self.start_segment = None
108            self.stop_segment = None
109
110        self.restricted = [ ]
111        tb = config.get('access', 'testbed')
112        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
113        else: self.testbed = [ ]
114
115        # authorization information
116        self.auth_type = config.get('access', 'auth_type') \
117                or 'legacy'
118        self.auth_dir = config.get('access', 'auth_dir')
119        accessdb = config.get("access", "accessdb")
120        # initialize the authorization system
121        if self.auth_type == 'legacy':
122            self.access = { }
123            if accessdb:
124                self.legacy_read_access(accessdb, self.legacy_access_tuple)
125        elif self.auth_type == 'abac':
126            self.auth = abac_authorizer(load=self.auth_dir)
127            self.access = [ ]
128            if accessdb:
129                self.read_access(accessdb, self.access_tuple)
130        else:
131            raise service_error(service_error.internal, 
132                    "Unknown auth_type: %s" % self.auth_type)
133
134        # read_state in the base_class
135        self.state_lock.acquire()
136        for a  in ('allocation', 'projects', 'keys', 'types'):
137            if a not in self.state:
138                self.state[a] = { }
139        self.allocation = self.state['allocation']
140        self.projects = self.state['projects']
141        self.keys = self.state['keys']
142        self.types = self.state['types']
143        if self.auth_type == "legacy": 
144            # Add the ownership attributes to the authorizer.  Note that the
145            # indices of the allocation dict are strings, but the attributes are
146            # fedids, so there is a conversion.
147            for k in self.allocation.keys():
148                for o in self.allocation[k].get('owners', []):
149                    self.auth.set_attribute(o, fedid(hexstr=k))
150                if self.allocation[k].has_key('userconfig'):
151                    sfid = self.allocation[k]['userconfig']
152                    fid = fedid(hexstr=sfid)
153                    self.auth.set_attribute(fid, "/%s" % sfid)
154        self.state_lock.release()
155        self.exports = {
156                'SMB': self.export_SMB,
157                'seer': self.export_seer,
158                'tmcd': self.export_tmcd,
159                'userconfig': self.export_userconfig,
160                'project_export': self.export_project_export,
161                'local_seer_control': self.export_local_seer,
162                'seer_master': self.export_seer_master,
163                'hide_hosts': self.export_hide_hosts,
164                }
165
166        if not self.local_seer_image or not self.local_seer_software or \
167                not self.local_seer_start:
168            if 'local_seer_control' in self.exports:
169                del self.exports['local_seer_control']
170
171        if not self.local_seer_image or not self.local_seer_software or \
172                not self.seer_master_start:
173            if 'seer_master' in self.exports:
174                del self.exports['seer_master']
175
176
177        self.soap_services = {\
178            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
179            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
180            'StartSegment': soap_handler("StartSegment", self.StartSegment),
181            'TerminateSegment': soap_handler("TerminateSegment",
182                self.TerminateSegment),
183            }
184        self.xmlrpc_services =  {\
185            'RequestAccess': xmlrpc_handler('RequestAccess',
186                self.RequestAccess),
187            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
188                self.ReleaseAccess),
189            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
190            'TerminateSegment': xmlrpc_handler('TerminateSegment',
191                self.TerminateSegment),
192            }
193
194        self.call_SetValue = service_caller('SetValue')
195        self.call_GetValue = service_caller('GetValue', log=self.log)
196
197        if not config.has_option("allocate", "uri"):
198            self.allocate_project = \
199                allocate_project_local(config, auth)
200        else:
201            self.allocate_project = \
202                allocate_project_remote(config, auth)
203
204
205        # If the project allocator exports services, put them in this object's
206        # maps so that classes that instantiate this can call the services.
207        self.soap_services.update(self.allocate_project.soap_services)
208        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
209
210    @staticmethod
211    def legacy_access_tuple(str):
212        """
213        Convert a string of the form (id[:resources:resouces], id, id) into a
214        tuple of the form (project, user, user) where users may be names or
215        fedids.  The resources strings are obsolete and ignored.
216        """
217        def parse_name(n):
218            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
219            else: return n
220
221        str = str.strip()
222        if str.startswith('(') and str.endswith(')'):
223            str = str[1:-1]
224            names = [ s.strip() for s in str.split(",")]
225            if len(names) > 3:
226                raise self.parse_error("More than three fields in name")
227            first = names[0].split(":")
228            if first == 'fedid:':
229                del first[0]
230                first[0] = fedid(hexstr=first[0])
231            names[0] = first[0]
232
233            for i in range(1,2):
234                names[i] = parse_name(names[i])
235
236            return tuple(names)
237        else:
238            raise self.parse_error('Bad mapping (unbalanced parens)')
239
240    @staticmethod
241    def access_tuple(str):
242        """
243        Convert a string of the form (id, id) into an access_project.  This is
244        called by read_access to convert to local attributes.  It returns
245        a tuple of the form (project, user, user) where the two users are
246        always the same.
247        """
248
249        str = str.strip()
250        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
251            # The slice takes the parens off the string.
252            proj, user = str[1:-1].split(',')
253            return (proj.strip(), user.strip(), user.strip())
254        else:
255            raise self.parse_error(
256                    'Bad mapping (unbalanced parens or more than 1 comma)')
257
258    # RequestAccess support routines
259
260    def legacy_lookup_access(self, req, fid):
261        """
262        Look up the local access control information mapped to this fedid and
263        credentials.  In this case it is a (project, create_user, access_user)
264        triple, and a triple of three booleans indicating which, if any will
265        need to be dynamically created.  Finally a list of owners for that
266        allocation is returned.
267
268        lookup_access_base pulls the first triple out, and it is parsed by this
269        routine into the boolean map.  Owners is always the controlling fedid.
270        """
271        # Return values
272        rp = None
273        ru = None
274        # This maps a valid user to the Emulab projects and users to use
275        found, match = self.legacy_lookup_access_base(req, fid)
276        tb, project, user = match
277       
278        if found == None:
279            raise service_error(service_error.access,
280                    "Access denied - cannot map access")
281
282        # resolve <dynamic> and <same> in found
283        dyn_proj = False
284        dyn_create_user = False
285        dyn_service_user = False
286
287        if found[0] == "<same>":
288            if project != None:
289                rp = project
290            else : 
291                raise service_error(\
292                        service_error.server_config,
293                        "Project matched <same> when no project given")
294        elif found[0] == "<dynamic>":
295            rp = None
296            dyn_proj = True
297        else:
298            rp = found[0]
299
300        if found[1] == "<same>":
301            if user_match == "<any>":
302                if user != None: rcu = user[0]
303                else: raise service_error(\
304                        service_error.server_config,
305                        "Matched <same> on anonymous request")
306            else:
307                rcu = user_match
308        elif found[1] == "<dynamic>":
309            rcu = None
310            dyn_create_user = True
311        else:
312            rcu = found[1]
313       
314        if found[2] == "<same>":
315            if user_match == "<any>":
316                if user != None: rsu = user[0]
317                else: raise service_error(\
318                        service_error.server_config,
319                        "Matched <same> on anonymous request")
320            else:
321                rsu = user_match
322        elif found[2] == "<dynamic>":
323            rsu = None
324            dyn_service_user = True
325        else:
326            rsu = found[2]
327
328        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
329                [ fid ]
330
331    def do_project_allocation(self, dyn, project, user):
332        """
333        Call the project allocation routines and return the info.
334        """
335        if dyn: 
336            # Compose the dynamic project request
337            # (only dynamic, dynamic currently allowed)
338            preq = { 'AllocateProjectRequestBody': \
339                        { 'project' : {\
340                            'user': [ \
341                            { \
342                                'access': [ { 
343                                    'sshPubkey': self.ssh_pubkey_file } ],
344                                 'role': "serviceAccess",\
345                            }, \
346                            { \
347                                'access': [ { 
348                                    'sshPubkey': self.ssh_pubkey_file } ],
349                                 'role': "experimentCreation",\
350                            }, \
351                            ], \
352                            }\
353                        }\
354                    }
355            return self.allocate_project.dynamic_project(preq)
356        else:
357            preq = {'StaticProjectRequestBody' : \
358                    { 'project': \
359                        { 'name' : { 'localname' : project },\
360                          'user' : [ \
361                            {\
362                                'userID': { 'localname' : user }, \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                'role': 'experimentCreation'\
366                            },\
367                            {\
368                                'userID': { 'localname' : user}, \
369                                'access': [ { 
370                                    'sshPubkey': self.ssh_pubkey_file } ],
371                                'role': 'serviceAccess'\
372                            },\
373                        ]}\
374                    }\
375            }
376            return self.allocate_project.static_project(preq)
377
378    def save_project_state(self, aid, ap, dyn, owners):
379        """
380        Parse out and save the information relevant to the project created for
381        this experiment.  That info is largely in ap and owners.  dyn indicates
382        that the project was created dynamically.  Return the user and project
383        names.
384        """
385        self.state_lock.acquire()
386        self.allocation[aid] = { }
387        self.allocation[aid]['auth'] = set()
388        try:
389            pname = ap['project']['name']['localname']
390        except KeyError:
391            pname = None
392
393        if dyn:
394            if not pname:
395                self.state_lock.release()
396                raise service_error(service_error.internal,
397                        "Misformed allocation response?")
398            if pname in self.projects: self.projects[pname] += 1
399            else: self.projects[pname] = 1
400            self.allocation[aid]['project'] = pname
401        else:
402            # sproject is a static project associated with this allocation.
403            self.allocation[aid]['sproject'] = pname
404
405        self.allocation[aid]['keys'] = [ ]
406
407        try:
408            for u in ap['project']['user']:
409                uname = u['userID']['localname']
410                if u['role'] == 'experimentCreation':
411                    self.allocation[aid]['user'] = uname
412                for k in [ k['sshPubkey'] for k in u['access'] \
413                        if k.has_key('sshPubkey') ]:
414                    kv = "%s:%s" % (uname, k)
415                    if self.keys.has_key(kv): self.keys[kv] += 1
416                    else: self.keys[kv] = 1
417                    self.allocation[aid]['keys'].append((uname, k))
418        except KeyError:
419            self.state_lock.release()
420            raise service_error(service_error.internal,
421                    "Misformed allocation response?")
422
423        self.allocation[aid]['owners'] = owners
424        self.write_state()
425        self.state_lock.release()
426        return (pname, uname)
427
428    # End of RequestAccess support routines
429
430    def RequestAccess(self, req, fid):
431        """
432        Handle the access request.  Proxy if not for us.
433
434        Parse out the fields and make the allocations or rejections if for us,
435        otherwise, assuming we're willing to proxy, proxy the request out.
436        """
437
438        def gateway_hardware(h):
439            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
440            else: return h
441
442        def get_export_project(svcs):
443            """
444            if the service requests includes one to export a project, return
445            that project.
446            """
447            rv = None
448            for s in svcs:
449                if s.get('name', '') == 'project_export' and \
450                        s.get('visibility', '') == 'export':
451                    if not rv: 
452                        for a in s.get('fedAttr', []):
453                            if a.get('attribute', '') == 'project' \
454                                    and 'value' in a:
455                                rv = a['value']
456                    else:
457                        raise service_error(service_error, access, 
458                                'Requesting multiple project exports is ' + \
459                                        'not supported');
460            return rv
461
462        # The dance to get into the request body
463        if req.has_key('RequestAccessRequestBody'):
464            req = req['RequestAccessRequestBody']
465        else:
466            raise service_error(service_error.req, "No request!?")
467
468        # if this includes a project export request, construct a filter such
469        # that only the ABAC attributes mapped to that project are checked for
470        # access.
471        if 'service' in req:
472            ep = get_export_project(req['service'])
473            if ep: pf = lambda(a): a.value[0] == ep
474            else: pf = None
475        else:
476            ep = None
477            pf = None
478
479        if self.auth.import_credentials(
480                data_list=req.get('abac_credential', [])):
481            self.auth.save()
482
483        if self.auth_type == "legacy":
484            found, dyn, owners= self.legacy_lookup_access(req, fid)
485            proof = access_proof("me", fid, "create")
486        elif self.auth_type == 'abac':
487            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
488        else:
489            raise service_error(service_error.internal, 
490                    'Unknown auth_type: %s' % self.auth_type)
491        ap = None
492
493        # This only happens in legacy lookups, but if this user has access to
494        # the testbed but not the project to be exported, raise the error.
495        if ep and ep != found[0]:
496            raise service_error(service_error.access,
497                    "Cannot export %s" % ep)
498
499        if self.ssh_pubkey_file:
500            ap = self.do_project_allocation(dyn[1], found[0], found[1])
501        else:
502            raise service_error(service_error.internal, 
503                    "SSH access parameters required")
504        # keep track of what's been added
505        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
506        aid = unicode(allocID)
507
508        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
509
510        services, svc_state = self.export_services(req.get('service',[]),
511                pname, uname)
512        self.state_lock.acquire()
513        # Store services state in global state
514        for k, v in svc_state.items():
515            self.allocation[aid][k] = v
516        self.append_allocation_authorization(aid, 
517                set([(o, allocID) for o in owners]), state_attr='allocation')
518        self.write_state()
519        self.state_lock.release()
520        try:
521            f = open("%s/%s.pem" % (self.certdir, aid), "w")
522            print >>f, alloc_cert
523            f.close()
524        except EnvironmentError, e:
525            raise service_error(service_error.internal, 
526                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
527        resp = self.build_access_response({ 'fedid': allocID } ,
528                ap, services, proof)
529        return resp
530
531    def do_release_project(self, del_project, del_users, del_types):
532        """
533        If a project and users has to be deleted, make the call.
534        """
535        msg = { 'project': { }}
536        if del_project:
537            msg['project']['name']= {'localname': del_project}
538        users = [ ]
539        for u in del_users.keys():
540            users.append({ 'userID': { 'localname': u },\
541                'access' :  \
542                        [ {'sshPubkey' : s } for s in del_users[u]]\
543            })
544        if users: 
545            msg['project']['user'] = users
546        if len(del_types) > 0:
547            msg['resources'] = { 'node': \
548                    [ {'hardware': [ h ] } for h in del_types ]\
549                }
550        if self.allocate_project.release_project:
551            msg = { 'ReleaseProjectRequestBody' : msg}
552            self.allocate_project.release_project(msg)
553
554    def ReleaseAccess(self, req, fid):
555        # The dance to get into the request body
556        if req.has_key('ReleaseAccessRequestBody'):
557            req = req['ReleaseAccessRequestBody']
558        else:
559            raise service_error(service_error.req, "No request!?")
560
561        try:
562            if req['allocID'].has_key('localname'):
563                auth_attr = aid = req['allocID']['localname']
564            elif req['allocID'].has_key('fedid'):
565                aid = unicode(req['allocID']['fedid'])
566                auth_attr = req['allocID']['fedid']
567            else:
568                raise service_error(service_error.req,
569                        "Only localnames and fedids are understood")
570        except KeyError:
571            raise service_error(service_error.req, "Badly formed request")
572
573        self.log.debug("[access] deallocation requested for %s by %s" % \
574                (aid, fid))
575        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
576                with_proof=True)
577        if not access_ok:
578            self.log.debug("[access] deallocation denied for %s", aid)
579            raise service_error(service_error.access, "Access Denied")
580
581        # If we know this allocation, reduce the reference counts and
582        # remove the local allocations.  Otherwise report an error.  If
583        # there is an allocation to delete, del_users will be a dictonary
584        # of sets where the key is the user that owns the keys in the set.
585        # We use a set to avoid duplicates.  del_project is just the name
586        # of any dynamic project to delete.  We're somewhat lazy about
587        # deleting authorization attributes.  Having access to something
588        # that doesn't exist isn't harmful.
589        del_users = { }
590        del_project = None
591        del_types = set()
592
593        self.state_lock.acquire()
594        if aid in self.allocation:
595            self.log.debug("Found allocation for %s" %aid)
596            self.clear_allocation_authorization(aid, state_attr='allocation')
597            for k in self.allocation[aid]['keys']:
598                kk = "%s:%s" % k
599                self.keys[kk] -= 1
600                if self.keys[kk] == 0:
601                    if not del_users.has_key(k[0]):
602                        del_users[k[0]] = set()
603                    del_users[k[0]].add(k[1])
604                    del self.keys[kk]
605
606            if 'project' in self.allocation[aid]:
607                pname = self.allocation[aid]['project']
608                self.projects[pname] -= 1
609                if self.projects[pname] == 0:
610                    del_project = pname
611                    del self.projects[pname]
612
613            if 'types' in self.allocation[aid]:
614                for t in self.allocation[aid]['types']:
615                    self.types[t] -= 1
616                    if self.types[t] == 0:
617                        if not del_project: del_project = t[0]
618                        del_types.add(t[1])
619                        del self.types[t]
620
621            del self.allocation[aid]
622            self.write_state()
623            self.state_lock.release()
624            # If we actually have resources to deallocate, prepare the call.
625            if del_project or del_users:
626                self.do_release_project(del_project, del_users, del_types)
627            # And remove the access cert
628            cf = "%s/%s.pem" % (self.certdir, aid)
629            self.log.debug("Removing %s" % cf)
630            os.remove(cf)
631            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
632        else:
633            self.state_lock.release()
634            raise service_error(service_error.req, "No such allocation")
635
636    # These are subroutines for StartSegment
637    def generate_ns2(self, topo, expfn, softdir, connInfo):
638        """
639        Convert topo into an ns2 file, decorated with appropriate commands for
640        the particular testbed setup.  Convert all requests for software, etc
641        to point at the staged copies on this testbed and add the federation
642        startcommands.
643        """
644        class dragon_commands:
645            """
646            Functor to spit out approrpiate dragon commands for nodes listed in
647            the connectivity description.  The constructor makes a dict mapping
648            dragon nodes to their parameters and the __call__ checks each
649            element in turn for membership.
650            """
651            def __init__(self, map):
652                self.node_info = map
653
654            def __call__(self, e):
655                s = ""
656                if isinstance(e, topdl.Computer):
657                    if self.node_info.has_key(e.name):
658                        info = self.node_info[e.name]
659                        for ifname, vlan, type in info:
660                            for i in e.interface:
661                                if i.name == ifname:
662                                    addr = i.get_attribute('ip4_address')
663                                    subs = i.substrate[0]
664                                    break
665                            else:
666                                raise service_error(service_error.internal,
667                                        "No interface %s on element %s" % \
668                                                (ifname, e.name))
669                            # XXX: do netmask right
670                            if type =='link':
671                                s = ("tb-allow-external ${%s} " + \
672                                        "dragonportal ip %s vlan %s " + \
673                                        "netmask 255.255.255.0\n") % \
674                                        (topdl.to_tcl_name(e.name), addr, vlan)
675                            elif type =='lan':
676                                s = ("tb-allow-external ${%s} " + \
677                                        "dragonportal " + \
678                                        "ip %s vlan %s usurp %s\n") % \
679                                        (topdl.to_tcl_name(e.name), addr, 
680                                                vlan, subs)
681                            else:
682                                raise service_error(service_error_internal,
683                                        "Unknown DRAGON type %s" % type)
684                return s
685
686        class not_dragon:
687            """
688            Return true if a node is in the given map of dragon nodes.
689            """
690            def __init__(self, map):
691                self.nodes = set(map.keys())
692
693            def __call__(self, e):
694                return e.name not in self.nodes
695
696        # Main line of generate_ns2
697        t = topo.clone()
698
699        # Create the map of nodes that need direct connections (dragon
700        # connections) from the connInfo
701        dragon_map = { }
702        for i in [ i for i in connInfo if i['type'] == 'transit']:
703            for a in i.get('fedAttr', []):
704                if a['attribute'] == 'vlan_id':
705                    vlan = a['value']
706                    break
707            else:
708                raise service_error(service_error.internal, "No vlan tag")
709            members = i.get('member', [])
710            if len(members) > 1: type = 'lan'
711            else: type = 'link'
712
713            try:
714                for m in members:
715                    if m['element'] in dragon_map:
716                        dragon_map[m['element']].append(( m['interface'], 
717                            vlan, type))
718                    else:
719                        dragon_map[m['element']] = [( m['interface'], 
720                            vlan, type),]
721            except KeyError:
722                raise service_error(service_error.req,
723                        "Missing connectivity info")
724
725        # Weed out the things we aren't going to instantiate: Segments, portal
726        # substrates, and portal interfaces.  (The copy in the for loop allows
727        # us to delete from e.elements in side the for loop).  While we're
728        # touching all the elements, we also adjust paths from the original
729        # testbed to local testbed paths and put the federation commands into
730        # the start commands
731        for e in [e for e in t.elements]:
732            if isinstance(e, topdl.Segment):
733                t.elements.remove(e)
734            if isinstance(e, topdl.Computer):
735                self.add_kit(e, self.federation_software)
736                if e.get_attribute('portal') and self.portal_startcommand:
737                    # Add local portal support software
738                    self.add_kit(e, self.portal_software)
739                    # Portals never have a user-specified start command
740                    e.set_attribute('startup', self.portal_startcommand)
741                elif self.node_startcommand:
742                    if e.get_attribute('startup'):
743                        e.set_attribute('startup', "%s \\$USER '%s'" % \
744                                (self.node_startcommand, 
745                                    e.get_attribute('startup')))
746                    else:
747                        e.set_attribute('startup', self.node_startcommand)
748
749                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
750                # Remove portal interfaces that do not connect to DRAGON
751                e.interface = [i for i in e.interface \
752                        if not i.get_attribute('portal') or i.name in dinf ]
753            # Fix software paths
754            for s in getattr(e, 'software', []):
755                s.location = re.sub("^.*/", softdir, s.location)
756
757        t.substrates = [ s.clone() for s in t.substrates ]
758        t.incorporate_elements()
759
760        # Customize the ns2 output for local portal commands and images
761        filters = []
762
763        if self.dragon_endpoint:
764            add_filter = not_dragon(dragon_map)
765            filters.append(dragon_commands(dragon_map))
766        else:
767            add_filter = None
768
769        if self.portal_command:
770            filters.append(topdl.generate_portal_command_filter(
771                self.portal_command, add_filter=add_filter))
772
773        if self.portal_image:
774            filters.append(topdl.generate_portal_image_filter(
775                self.portal_image))
776
777        if self.portal_type:
778            filters.append(topdl.generate_portal_hardware_filter(
779                self.portal_type))
780
781        # Convert to ns and write it out
782        expfile = topdl.topology_to_ns2(t, filters)
783        try:
784            f = open(expfn, "w")
785            print >>f, expfile
786            f.close()
787        except EnvironmentError:
788            raise service_error(service_error.internal,
789                    "Cannot write experiment file %s: %s" % (expfn,e))
790
791    def export_store_info(self, cf, proj, ename, connInfo):
792        """
793        For the export requests in the connection info, install the peer names
794        at the experiment controller via SetValue calls.
795        """
796
797        for c in connInfo:
798            for p in [ p for p in c.get('parameter', []) \
799                    if p.get('type', '') == 'output']:
800
801                if p.get('name', '') == 'peer':
802                    k = p.get('key', None)
803                    surl = p.get('store', None)
804                    if surl and k and k.index('/') != -1:
805                        value = "%s.%s.%s%s" % \
806                                (k[k.index('/')+1:], ename, proj, self.domain)
807                        req = { 'name': k, 'value': value }
808                        self.log.debug("Setting %s to %s on %s" % \
809                                (k, value, surl))
810                        self.call_SetValue(surl, req, cf)
811                    else:
812                        self.log.error("Bad export request: %s" % p)
813                elif p.get('name', '') == 'ssh_port':
814                    k = p.get('key', None)
815                    surl = p.get('store', None)
816                    if surl and k:
817                        req = { 'name': k, 'value': self.ssh_port }
818                        self.log.debug("Setting %s to %s on %s" % \
819                                (k, self.ssh_port, surl))
820                        self.call_SetValue(surl, req, cf)
821                    else:
822                        self.log.error("Bad export request: %s" % p)
823                else:
824                    self.log.error("Unknown export parameter: %s" % \
825                            p.get('name'))
826                    continue
827
828    def add_seer_node(self, topo, name, startup):
829        """
830        Add a seer node to the given topology, with the startup command passed
831        in.  Used by configure seer_services.
832        """
833        c_node = topdl.Computer(
834                name=name, 
835                os= topdl.OperatingSystem(
836                    attribute=[
837                    { 'attribute': 'osid', 
838                        'value': self.local_seer_image },
839                    ]),
840                attribute=[
841                    { 'attribute': 'startup', 'value': startup },
842                    ]
843                )
844        self.add_kit(c_node, self.local_seer_software)
845        topo.elements.append(c_node)
846
847    def configure_seer_services(self, services, topo, softdir):
848        """
849        Make changes to the topology required for the seer requests being made.
850        Specifically, add any control or master nodes required and set up the
851        start commands on the nodes to interconnect them.
852        """
853        local_seer = False      # True if we need to add a control node
854        collect_seer = False    # True if there is a seer-master node
855        seer_master= False      # True if we need to add the seer-master
856        for s in services:
857            s_name = s.get('name', '')
858            s_vis = s.get('visibility','')
859
860            if s_name  == 'local_seer_control' and s_vis == 'export':
861                local_seer = True
862            elif s_name == 'seer_master':
863                if s_vis == 'import':
864                    collect_seer = True
865                elif s_vis == 'export':
866                    seer_master = True
867       
868        # We've got the whole picture now, so add nodes if needed and configure
869        # them to interconnect properly.
870        if local_seer or seer_master:
871            # Copy local seer control node software to the tempdir
872            for l, f in self.local_seer_software:
873                base = os.path.basename(f)
874                copy_file(f, "%s/%s" % (softdir, base))
875        # If we're collecting seers somewhere the controllers need to talk to
876        # the master.  In testbeds that export the service, that will be a
877        # local node that we'll add below.  Elsewhere it will be the control
878        # portal that will port forward to the exporting master.
879        if local_seer:
880            if collect_seer:
881                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
882            else:
883                startup = self.local_seer_start
884            self.add_seer_node(topo, 'control', startup)
885        # If this is the seer master, add that node, too.
886        if seer_master:
887            self.add_seer_node(topo, 'seer-master', 
888                    "%s -R -n -R seer-master -R -A -R sink" % \
889                            self.seer_master_start)
890
891    def retrieve_software(self, topo, certfile, softdir):
892        """
893        Collect the software that nodes in the topology need loaded and stage
894        it locally.  This implies retrieving it from the experiment_controller
895        and placing it into softdir.  Certfile is used to prove that this node
896        has access to that data (it's the allocation/segment fedid).  Finally
897        local portal and federation software is also copied to the same staging
898        directory for simplicity - all software needed for experiment creation
899        is in softdir.
900        """
901        sw = set()
902        for e in topo.elements:
903            for s in getattr(e, 'software', []):
904                sw.add(s.location)
905        for s in sw:
906            self.log.debug("Retrieving %s" % s)
907            try:
908                get_url(s, certfile, softdir)
909            except:
910                t, v, st = sys.exc_info()
911                raise service_error(service_error.internal,
912                        "Error retrieving %s: %s" % (s, v))
913
914        # Copy local federation and portal node software to the tempdir
915        for s in (self.federation_software, self.portal_software):
916            for l, f in s:
917                base = os.path.basename(f)
918                copy_file(f, "%s/%s" % (softdir, base))
919
920
921    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
922        """
923        Gather common configuration files, retrieve or create an experiment
924        name and project name, and return the ssh_key filenames.  Create an
925        allocation log bound to the state log variable as well.
926        """
927        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
928        ename = None
929        pubkey_base = None
930        secretkey_base = None
931        proj = None
932        user = None
933        alloc_log = None
934        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
935
936        for a in attrs:
937            if a['attribute'] in configs:
938                try:
939                    self.log.debug("Retrieving %s from %s" % \
940                            (a['attribute'], a['value']))
941                    get_url(a['value'], certfile, tmpdir)
942                except:
943                    t, v, st = sys.exc_info()
944                    raise service_error(service_error.internal,
945                            "Error retrieving %s: %s" % (a.get('value', ""), v))
946            if a['attribute'] == 'ssh_pubkey':
947                pubkey_base = a['value'].rpartition('/')[2]
948            if a['attribute'] == 'ssh_secretkey':
949                secretkey_base = a['value'].rpartition('/')[2]
950            if a['attribute'] == 'experiment_name':
951                ename = a['value']
952
953        # Names longer than the emulab max are discarder
954        if ename and len(ename) <= self.max_name_len:
955            # Clean up the experiment name so that emulab will accept it.
956            ename = re.sub(vchars_re, '-', ename)
957
958        else:
959            ename = ""
960            for i in range(0,5):
961                ename += random.choice(string.ascii_letters)
962            self.log.warn("No experiment name or suggestion too long: " + \
963                    "picked one randomly: %s" % ename)
964
965        if not pubkey_base:
966            raise service_error(service_error.req, 
967                    "No public key attribute")
968
969        if not secretkey_base:
970            raise service_error(service_error.req, 
971                    "No secret key attribute")
972
973        self.state_lock.acquire()
974        if aid in self.allocation:
975            proj = self.allocation[aid].get('project', None)
976            if not proj: 
977                proj = self.allocation[aid].get('sproject', None)
978            user = self.allocation[aid].get('user', None)
979            self.allocation[aid]['experiment'] = ename
980            self.allocation[aid]['log'] = [ ]
981            # Create a logger that logs to the experiment's state object as
982            # well as to the main log file.
983            alloc_log = logging.getLogger('fedd.access.%s' % ename)
984            h = logging.StreamHandler(
985                    list_log.list_log(self.allocation[aid]['log']))
986            # XXX: there should be a global one of these rather than
987            # repeating the code.
988            h.setFormatter(logging.Formatter(
989                "%(asctime)s %(name)s %(message)s",
990                        '%d %b %y %H:%M:%S'))
991            alloc_log.addHandler(h)
992            self.write_state()
993        self.state_lock.release()
994
995        if not proj:
996            raise service_error(service_error.internal, 
997                    "Can't find project for %s" %aid)
998
999        if not user:
1000            raise service_error(service_error.internal, 
1001                    "Can't find creation user for %s" %aid)
1002
1003        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1004
1005    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1006        """
1007        Store key bits of experiment state in the global repository, including
1008        the response that may need to be replayed, and return the response.
1009        """
1010        # Copy the assigned names into the return topology
1011        embedding = [ ]
1012        for n in starter.node:
1013            embedding.append({ 
1014                'toponame': n,
1015                'physname': ["%s%s" %  (starter.node[n], self.domain)],
1016                })
1017        # Grab the log (this is some anal locking, but better safe than
1018        # sorry)
1019        self.state_lock.acquire()
1020        logv = "".join(self.allocation[aid]['log'])
1021        # It's possible that the StartSegment call gets retried (!).
1022        # if the 'started' key is in the allocation, we'll return it rather
1023        # than redo the setup.
1024        self.allocation[aid]['started'] = { 
1025                'allocID': alloc_id,
1026                'allocationLog': logv,
1027                'segmentdescription': { 
1028                    'topdldescription': topo.clone().to_dict()
1029                    },
1030                'embedding': embedding,
1031                'proof': proof.to_dict(),
1032                }
1033        retval = copy.copy(self.allocation[aid]['started'])
1034        self.write_state()
1035        self.state_lock.release()
1036        return retval
1037   
1038    # End of StartSegment support routines
1039
1040    def StartSegment(self, req, fid):
1041        err = None  # Any service_error generated after tmpdir is created
1042        rv = None   # Return value from segment creation
1043
1044        try:
1045            req = req['StartSegmentRequestBody']
1046            auth_attr = req['allocID']['fedid']
1047            topref = req['segmentdescription']['topdldescription']
1048        except KeyError:
1049            raise service_error(server_error.req, "Badly formed request")
1050
1051        connInfo = req.get('connection', [])
1052        services = req.get('service', [])
1053        aid = "%s" % auth_attr
1054        attrs = req.get('fedAttr', [])
1055
1056        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1057                with_proof=True)
1058        if not access_ok:
1059            raise service_error(service_error.access, "Access denied")
1060        else:
1061            # See if this is a replay of an earlier succeeded StartSegment -
1062            # sometimes SSL kills 'em.  If so, replay the response rather than
1063            # redoing the allocation.
1064            self.state_lock.acquire()
1065            retval = self.allocation[aid].get('started', None)
1066            self.state_lock.release()
1067            if retval:
1068                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1069                        "replaying response")
1070                return retval
1071
1072        # A new request.  Do it.
1073
1074        if topref: topo = topdl.Topology(**topref)
1075        else:
1076            raise service_error(service_error.req, 
1077                    "Request missing segmentdescription'")
1078       
1079        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1080        try:
1081            tmpdir = tempfile.mkdtemp(prefix="access-")
1082            softdir = "%s/software" % tmpdir
1083            os.mkdir(softdir)
1084        except EnvironmentError:
1085            raise service_error(service_error.internal, "Cannot create tmp dir")
1086
1087        # Try block alllows us to clean up temporary files.
1088        try:
1089            self.retrieve_software(topo, certfile, softdir)
1090            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1091                    self.initialize_experiment_info(attrs, aid, 
1092                            certfile, tmpdir)
1093
1094            # Set up userconf and seer if needed
1095            self.configure_userconf(services, tmpdir)
1096            self.configure_seer_services(services, topo, softdir)
1097            # Get and send synch store variables
1098            self.export_store_info(certfile, proj, ename, connInfo)
1099            self.import_store_info(certfile, connInfo)
1100
1101            expfile = "%s/experiment.tcl" % tmpdir
1102
1103            self.generate_portal_configs(topo, pubkey_base, 
1104                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1105            self.generate_ns2(topo, expfile, 
1106                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1107
1108            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1109                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1110                    cert=self.xmlrpc_cert)
1111            rv = starter(self, ename, proj, user, expfile, tmpdir)
1112        except service_error, e:
1113            err = e
1114        except:
1115            t, v, st = sys.exc_info()
1116            err = service_error(service_error.internal, "%s: %s" % \
1117                    (v, traceback.extract_tb(st)))
1118
1119        # Walk up tmpdir, deleting as we go
1120        if self.cleanup: self.remove_dirs(tmpdir)
1121        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1122
1123        if rv:
1124            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1125                    proof)
1126        elif err:
1127            raise service_error(service_error.federant,
1128                    "Swapin failed: %s" % err)
1129        else:
1130            raise service_error(service_error.federant, "Swapin failed")
1131
1132    def TerminateSegment(self, req, fid):
1133        try:
1134            req = req['TerminateSegmentRequestBody']
1135        except KeyError:
1136            raise service_error(server_error.req, "Badly formed request")
1137
1138        auth_attr = req['allocID']['fedid']
1139        aid = "%s" % auth_attr
1140        attrs = req.get('fedAttr', [])
1141
1142        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1143                with_proof=True)
1144        if not access_ok:
1145            raise service_error(service_error.access, "Access denied")
1146
1147        self.state_lock.acquire()
1148        if aid in self.allocation:
1149            proj = self.allocation[aid].get('project', None)
1150            if not proj: 
1151                proj = self.allocation[aid].get('sproject', None)
1152            user = self.allocation[aid].get('user', None)
1153            ename = self.allocation[aid].get('experiment', None)
1154        else:
1155            proj = None
1156            user = None
1157            ename = None
1158        self.state_lock.release()
1159
1160        if not proj:
1161            raise service_error(service_error.internal, 
1162                    "Can't find project for %s" % aid)
1163
1164        if not user:
1165            raise service_error(service_error.internal, 
1166                    "Can't find creation user for %s" % aid)
1167        if not ename:
1168            raise service_error(service_error.internal, 
1169                    "Can't find experiment name for %s" % aid)
1170        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1171                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1172        stopper(self, user, proj, ename)
1173        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.