source: fedd/federation/emulab_access.py @ f769cb3

compt_changesinfo-ops
Last change on this file since f769cb3 was f769cb3, checked in by Ted Faber <faber@…>, 12 years ago

Gather node information and return in info

  • Property mode set to 100644
File size: 39.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    max_name_len = 19
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        access_base.__init__(self, config, auth)
61
62        self.max_name_len = access.max_name_len
63
64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75        self.seer_master_start = config.get("access", "seer_master_start")
76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
78        self.ssh_port = config.get("access","ssh_port") or "22"
79        self.boss = config.get("access", "boss")
80        self.ops = config.get("access", "ops")
81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103        elif self.access_type == 'local_emulab':
104            self.start_segment = local_emulab_segment.start_segment
105            self.stop_segment = local_emulab_segment.stop_segment
106        else:
107            self.start_segment = None
108            self.stop_segment = None
109
110        self.restricted = [ ]
111        tb = config.get('access', 'testbed')
112        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
113        else: self.testbed = [ ]
114
115        # authorization information
116        self.auth_type = config.get('access', 'auth_type') \
117                or 'legacy'
118        self.auth_dir = config.get('access', 'auth_dir')
119        accessdb = config.get("access", "accessdb")
120        # initialize the authorization system
121        if self.auth_type == 'legacy':
122            self.access = { }
123            if accessdb:
124                self.legacy_read_access(accessdb, self.legacy_access_tuple)
125        elif self.auth_type == 'abac':
126            self.auth = abac_authorizer(load=self.auth_dir)
127            self.access = [ ]
128            if accessdb:
129                self.read_access(accessdb, self.access_tuple)
130        else:
131            raise service_error(service_error.internal, 
132                    "Unknown auth_type: %s" % self.auth_type)
133
134        # read_state in the base_class
135        self.state_lock.acquire()
136        for a  in ('allocation', 'projects', 'keys', 'types'):
137            if a not in self.state:
138                self.state[a] = { }
139        self.allocation = self.state['allocation']
140        self.projects = self.state['projects']
141        self.keys = self.state['keys']
142        self.types = self.state['types']
143        if self.auth_type == "legacy": 
144            # Add the ownership attributes to the authorizer.  Note that the
145            # indices of the allocation dict are strings, but the attributes are
146            # fedids, so there is a conversion.
147            for k in self.allocation.keys():
148                for o in self.allocation[k].get('owners', []):
149                    self.auth.set_attribute(o, fedid(hexstr=k))
150                if self.allocation[k].has_key('userconfig'):
151                    sfid = self.allocation[k]['userconfig']
152                    fid = fedid(hexstr=sfid)
153                    self.auth.set_attribute(fid, "/%s" % sfid)
154        self.state_lock.release()
155        self.exports = {
156                'SMB': self.export_SMB,
157                'seer': self.export_seer,
158                'tmcd': self.export_tmcd,
159                'userconfig': self.export_userconfig,
160                'project_export': self.export_project_export,
161                'local_seer_control': self.export_local_seer,
162                'seer_master': self.export_seer_master,
163                'hide_hosts': self.export_hide_hosts,
164                }
165
166        if not self.local_seer_image or not self.local_seer_software or \
167                not self.local_seer_start:
168            if 'local_seer_control' in self.exports:
169                del self.exports['local_seer_control']
170
171        if not self.local_seer_image or not self.local_seer_software or \
172                not self.seer_master_start:
173            if 'seer_master' in self.exports:
174                del self.exports['seer_master']
175
176
177        self.soap_services = {\
178            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
179            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
180            'StartSegment': soap_handler("StartSegment", self.StartSegment),
181            'TerminateSegment': soap_handler("TerminateSegment",
182                self.TerminateSegment),
183            }
184        self.xmlrpc_services =  {\
185            'RequestAccess': xmlrpc_handler('RequestAccess',
186                self.RequestAccess),
187            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
188                self.ReleaseAccess),
189            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
190            'TerminateSegment': xmlrpc_handler('TerminateSegment',
191                self.TerminateSegment),
192            }
193
194        self.call_SetValue = service_caller('SetValue')
195        self.call_GetValue = service_caller('GetValue', log=self.log)
196
197        if not config.has_option("allocate", "uri"):
198            self.allocate_project = \
199                allocate_project_local(config, auth)
200        else:
201            self.allocate_project = \
202                allocate_project_remote(config, auth)
203
204
205        # If the project allocator exports services, put them in this object's
206        # maps so that classes that instantiate this can call the services.
207        self.soap_services.update(self.allocate_project.soap_services)
208        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
209
210    @staticmethod
211    def legacy_access_tuple(str):
212        """
213        Convert a string of the form (id[:resources:resouces], id, id) into a
214        tuple of the form (project, user, user) where users may be names or
215        fedids.  The resources strings are obsolete and ignored.
216        """
217        def parse_name(n):
218            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
219            else: return n
220
221        str = str.strip()
222        if str.startswith('(') and str.endswith(')'):
223            str = str[1:-1]
224            names = [ s.strip() for s in str.split(",")]
225            if len(names) > 3:
226                raise self.parse_error("More than three fields in name")
227            first = names[0].split(":")
228            if first == 'fedid:':
229                del first[0]
230                first[0] = fedid(hexstr=first[0])
231            names[0] = first[0]
232
233            for i in range(1,2):
234                names[i] = parse_name(names[i])
235
236            return tuple(names)
237        else:
238            raise self.parse_error('Bad mapping (unbalanced parens)')
239
240    @staticmethod
241    def access_tuple(str):
242        """
243        Convert a string of the form (id, id) into an access_project.  This is
244        called by read_access to convert to local attributes.  It returns
245        a tuple of the form (project, user, user) where the two users are
246        always the same.
247        """
248
249        str = str.strip()
250        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
251            # The slice takes the parens off the string.
252            proj, user = str[1:-1].split(',')
253            return (proj.strip(), user.strip(), user.strip())
254        else:
255            raise self.parse_error(
256                    'Bad mapping (unbalanced parens or more than 1 comma)')
257
258    # RequestAccess support routines
259
260    def legacy_lookup_access(self, req, fid):
261        """
262        Look up the local access control information mapped to this fedid and
263        credentials.  In this case it is a (project, create_user, access_user)
264        triple, and a triple of three booleans indicating which, if any will
265        need to be dynamically created.  Finally a list of owners for that
266        allocation is returned.
267
268        lookup_access_base pulls the first triple out, and it is parsed by this
269        routine into the boolean map.  Owners is always the controlling fedid.
270        """
271        # Return values
272        rp = None
273        ru = None
274        # This maps a valid user to the Emulab projects and users to use
275        found, match = self.legacy_lookup_access_base(req, fid)
276        tb, project, user = match
277       
278        if found == None:
279            raise service_error(service_error.access,
280                    "Access denied - cannot map access")
281
282        # resolve <dynamic> and <same> in found
283        dyn_proj = False
284        dyn_create_user = False
285        dyn_service_user = False
286
287        if found[0] == "<same>":
288            if project != None:
289                rp = project
290            else : 
291                raise service_error(\
292                        service_error.server_config,
293                        "Project matched <same> when no project given")
294        elif found[0] == "<dynamic>":
295            rp = None
296            dyn_proj = True
297        else:
298            rp = found[0]
299
300        if found[1] == "<same>":
301            if user_match == "<any>":
302                if user != None: rcu = user[0]
303                else: raise service_error(\
304                        service_error.server_config,
305                        "Matched <same> on anonymous request")
306            else:
307                rcu = user_match
308        elif found[1] == "<dynamic>":
309            rcu = None
310            dyn_create_user = True
311        else:
312            rcu = found[1]
313       
314        if found[2] == "<same>":
315            if user_match == "<any>":
316                if user != None: rsu = user[0]
317                else: raise service_error(\
318                        service_error.server_config,
319                        "Matched <same> on anonymous request")
320            else:
321                rsu = user_match
322        elif found[2] == "<dynamic>":
323            rsu = None
324            dyn_service_user = True
325        else:
326            rsu = found[2]
327
328        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
329                [ fid ]
330
331    def do_project_allocation(self, dyn, project, user):
332        """
333        Call the project allocation routines and return the info.
334        """
335        if dyn: 
336            # Compose the dynamic project request
337            # (only dynamic, dynamic currently allowed)
338            preq = { 'AllocateProjectRequestBody': \
339                        { 'project' : {\
340                            'user': [ \
341                            { \
342                                'access': [ { 
343                                    'sshPubkey': self.ssh_pubkey_file } ],
344                                 'role': "serviceAccess",\
345                            }, \
346                            { \
347                                'access': [ { 
348                                    'sshPubkey': self.ssh_pubkey_file } ],
349                                 'role': "experimentCreation",\
350                            }, \
351                            ], \
352                            }\
353                        }\
354                    }
355            return self.allocate_project.dynamic_project(preq)
356        else:
357            preq = {'StaticProjectRequestBody' : \
358                    { 'project': \
359                        { 'name' : { 'localname' : project },\
360                          'user' : [ \
361                            {\
362                                'userID': { 'localname' : user }, \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                'role': 'experimentCreation'\
366                            },\
367                            {\
368                                'userID': { 'localname' : user}, \
369                                'access': [ { 
370                                    'sshPubkey': self.ssh_pubkey_file } ],
371                                'role': 'serviceAccess'\
372                            },\
373                        ]}\
374                    }\
375            }
376            return self.allocate_project.static_project(preq)
377
378    def save_project_state(self, aid, ap, dyn, owners):
379        """
380        Parse out and save the information relevant to the project created for
381        this experiment.  That info is largely in ap and owners.  dyn indicates
382        that the project was created dynamically.  Return the user and project
383        names.
384        """
385        self.state_lock.acquire()
386        self.allocation[aid] = { }
387        self.allocation[aid]['auth'] = set()
388        try:
389            pname = ap['project']['name']['localname']
390        except KeyError:
391            pname = None
392
393        if dyn:
394            if not pname:
395                self.state_lock.release()
396                raise service_error(service_error.internal,
397                        "Misformed allocation response?")
398            if pname in self.projects: self.projects[pname] += 1
399            else: self.projects[pname] = 1
400            self.allocation[aid]['project'] = pname
401        else:
402            # sproject is a static project associated with this allocation.
403            self.allocation[aid]['sproject'] = pname
404
405        self.allocation[aid]['keys'] = [ ]
406
407        try:
408            for u in ap['project']['user']:
409                uname = u['userID']['localname']
410                if u['role'] == 'experimentCreation':
411                    self.allocation[aid]['user'] = uname
412                for k in [ k['sshPubkey'] for k in u['access'] \
413                        if k.has_key('sshPubkey') ]:
414                    kv = "%s:%s" % (uname, k)
415                    if self.keys.has_key(kv): self.keys[kv] += 1
416                    else: self.keys[kv] = 1
417                    self.allocation[aid]['keys'].append((uname, k))
418        except KeyError:
419            self.state_lock.release()
420            raise service_error(service_error.internal,
421                    "Misformed allocation response?")
422
423        self.allocation[aid]['owners'] = owners
424        self.write_state()
425        self.state_lock.release()
426        return (pname, uname)
427
428    # End of RequestAccess support routines
429
430    def RequestAccess(self, req, fid):
431        """
432        Handle the access request.  Proxy if not for us.
433
434        Parse out the fields and make the allocations or rejections if for us,
435        otherwise, assuming we're willing to proxy, proxy the request out.
436        """
437
438        def gateway_hardware(h):
439            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
440            else: return h
441
442        def get_export_project(svcs):
443            """
444            if the service requests includes one to export a project, return
445            that project.
446            """
447            rv = None
448            for s in svcs:
449                if s.get('name', '') == 'project_export' and \
450                        s.get('visibility', '') == 'export':
451                    if not rv: 
452                        for a in s.get('fedAttr', []):
453                            if a.get('attribute', '') == 'project' \
454                                    and 'value' in a:
455                                rv = a['value']
456                    else:
457                        raise service_error(service_error, access, 
458                                'Requesting multiple project exports is ' + \
459                                        'not supported');
460            return rv
461
462        # The dance to get into the request body
463        if req.has_key('RequestAccessRequestBody'):
464            req = req['RequestAccessRequestBody']
465        else:
466            raise service_error(service_error.req, "No request!?")
467
468        # if this includes a project export request, construct a filter such
469        # that only the ABAC attributes mapped to that project are checked for
470        # access.
471        if 'service' in req:
472            ep = get_export_project(req['service'])
473            if ep: pf = lambda(a): a.value[0] == ep
474            else: pf = None
475        else:
476            ep = None
477            pf = None
478
479        if self.auth.import_credentials(
480                data_list=req.get('abac_credential', [])):
481            self.auth.save()
482
483        if self.auth_type == "legacy":
484            found, dyn, owners= self.legacy_lookup_access(req, fid)
485            proof = access_proof("me", fid, "create")
486        elif self.auth_type == 'abac':
487            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
488        else:
489            raise service_error(service_error.internal, 
490                    'Unknown auth_type: %s' % self.auth_type)
491        ap = None
492
493        # This only happens in legacy lookups, but if this user has access to
494        # the testbed but not the project to be exported, raise the error.
495        if ep and ep != found[0]:
496            raise service_error(service_error.access,
497                    "Cannot export %s" % ep)
498
499        if self.ssh_pubkey_file:
500            ap = self.do_project_allocation(dyn[1], found[0], found[1])
501        else:
502            raise service_error(service_error.internal, 
503                    "SSH access parameters required")
504        # keep track of what's been added
505        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
506        aid = unicode(allocID)
507
508        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
509
510        services, svc_state = self.export_services(req.get('service',[]),
511                pname, uname)
512        self.state_lock.acquire()
513        # Store services state in global state
514        for k, v in svc_state.items():
515            self.allocation[aid][k] = v
516        self.append_allocation_authorization(aid, 
517                set([(o, allocID) for o in owners]), state_attr='allocation')
518        self.write_state()
519        self.state_lock.release()
520        try:
521            f = open("%s/%s.pem" % (self.certdir, aid), "w")
522            print >>f, alloc_cert
523            f.close()
524        except EnvironmentError, e:
525            raise service_error(service_error.internal, 
526                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
527        resp = self.build_access_response({ 'fedid': allocID } ,
528                ap, services, proof)
529        return resp
530
531    def do_release_project(self, del_project, del_users, del_types):
532        """
533        If a project and users has to be deleted, make the call.
534        """
535        msg = { 'project': { }}
536        if del_project:
537            msg['project']['name']= {'localname': del_project}
538        users = [ ]
539        for u in del_users.keys():
540            users.append({ 'userID': { 'localname': u },\
541                'access' :  \
542                        [ {'sshPubkey' : s } for s in del_users[u]]\
543            })
544        if users: 
545            msg['project']['user'] = users
546        if len(del_types) > 0:
547            msg['resources'] = { 'node': \
548                    [ {'hardware': [ h ] } for h in del_types ]\
549                }
550        if self.allocate_project.release_project:
551            msg = { 'ReleaseProjectRequestBody' : msg}
552            self.allocate_project.release_project(msg)
553
554    def ReleaseAccess(self, req, fid):
555        # The dance to get into the request body
556        if req.has_key('ReleaseAccessRequestBody'):
557            req = req['ReleaseAccessRequestBody']
558        else:
559            raise service_error(service_error.req, "No request!?")
560
561        try:
562            if req['allocID'].has_key('localname'):
563                auth_attr = aid = req['allocID']['localname']
564            elif req['allocID'].has_key('fedid'):
565                aid = unicode(req['allocID']['fedid'])
566                auth_attr = req['allocID']['fedid']
567            else:
568                raise service_error(service_error.req,
569                        "Only localnames and fedids are understood")
570        except KeyError:
571            raise service_error(service_error.req, "Badly formed request")
572
573        self.log.debug("[access] deallocation requested for %s by %s" % \
574                (aid, fid))
575        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
576                with_proof=True)
577        if not access_ok:
578            self.log.debug("[access] deallocation denied for %s", aid)
579            raise service_error(service_error.access, "Access Denied")
580
581        # If we know this allocation, reduce the reference counts and
582        # remove the local allocations.  Otherwise report an error.  If
583        # there is an allocation to delete, del_users will be a dictonary
584        # of sets where the key is the user that owns the keys in the set.
585        # We use a set to avoid duplicates.  del_project is just the name
586        # of any dynamic project to delete.  We're somewhat lazy about
587        # deleting authorization attributes.  Having access to something
588        # that doesn't exist isn't harmful.
589        del_users = { }
590        del_project = None
591        del_types = set()
592
593        self.state_lock.acquire()
594        if aid in self.allocation:
595            self.log.debug("Found allocation for %s" %aid)
596            self.clear_allocation_authorization(aid, state_attr='allocation')
597            for k in self.allocation[aid]['keys']:
598                kk = "%s:%s" % k
599                self.keys[kk] -= 1
600                if self.keys[kk] == 0:
601                    if not del_users.has_key(k[0]):
602                        del_users[k[0]] = set()
603                    del_users[k[0]].add(k[1])
604                    del self.keys[kk]
605
606            if 'project' in self.allocation[aid]:
607                pname = self.allocation[aid]['project']
608                self.projects[pname] -= 1
609                if self.projects[pname] == 0:
610                    del_project = pname
611                    del self.projects[pname]
612
613            if 'types' in self.allocation[aid]:
614                for t in self.allocation[aid]['types']:
615                    self.types[t] -= 1
616                    if self.types[t] == 0:
617                        if not del_project: del_project = t[0]
618                        del_types.add(t[1])
619                        del self.types[t]
620
621            del self.allocation[aid]
622            self.write_state()
623            self.state_lock.release()
624            # If we actually have resources to deallocate, prepare the call.
625            if del_project or del_users:
626                self.do_release_project(del_project, del_users, del_types)
627            # And remove the access cert
628            cf = "%s/%s.pem" % (self.certdir, aid)
629            self.log.debug("Removing %s" % cf)
630            os.remove(cf)
631            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
632        else:
633            self.state_lock.release()
634            raise service_error(service_error.req, "No such allocation")
635
636    # These are subroutines for StartSegment
637    def generate_ns2(self, topo, expfn, softdir, connInfo):
638        """
639        Convert topo into an ns2 file, decorated with appropriate commands for
640        the particular testbed setup.  Convert all requests for software, etc
641        to point at the staged copies on this testbed and add the federation
642        startcommands.
643        """
644        class dragon_commands:
645            """
646            Functor to spit out approrpiate dragon commands for nodes listed in
647            the connectivity description.  The constructor makes a dict mapping
648            dragon nodes to their parameters and the __call__ checks each
649            element in turn for membership.
650            """
651            def __init__(self, map):
652                self.node_info = map
653
654            def __call__(self, e):
655                s = ""
656                if isinstance(e, topdl.Computer):
657                    if self.node_info.has_key(e.name):
658                        info = self.node_info[e.name]
659                        for ifname, vlan, type in info:
660                            for i in e.interface:
661                                if i.name == ifname:
662                                    addr = i.get_attribute('ip4_address')
663                                    subs = i.substrate[0]
664                                    break
665                            else:
666                                raise service_error(service_error.internal,
667                                        "No interface %s on element %s" % \
668                                                (ifname, e.name))
669                            # XXX: do netmask right
670                            if type =='link':
671                                s = ("tb-allow-external ${%s} " + \
672                                        "dragonportal ip %s vlan %s " + \
673                                        "netmask 255.255.255.0\n") % \
674                                        (topdl.to_tcl_name(e.name), addr, vlan)
675                            elif type =='lan':
676                                s = ("tb-allow-external ${%s} " + \
677                                        "dragonportal " + \
678                                        "ip %s vlan %s usurp %s\n") % \
679                                        (topdl.to_tcl_name(e.name), addr, 
680                                                vlan, subs)
681                            else:
682                                raise service_error(service_error_internal,
683                                        "Unknown DRAGON type %s" % type)
684                return s
685
686        class not_dragon:
687            """
688            Return true if a node is in the given map of dragon nodes.
689            """
690            def __init__(self, map):
691                self.nodes = set(map.keys())
692
693            def __call__(self, e):
694                return e.name not in self.nodes
695
696        # Main line of generate_ns2
697        t = topo.clone()
698
699        # Create the map of nodes that need direct connections (dragon
700        # connections) from the connInfo
701        dragon_map = { }
702        for i in [ i for i in connInfo if i['type'] == 'transit']:
703            for a in i.get('fedAttr', []):
704                if a['attribute'] == 'vlan_id':
705                    vlan = a['value']
706                    break
707            else:
708                raise service_error(service_error.internal, "No vlan tag")
709            members = i.get('member', [])
710            if len(members) > 1: type = 'lan'
711            else: type = 'link'
712
713            try:
714                for m in members:
715                    if m['element'] in dragon_map:
716                        dragon_map[m['element']].append(( m['interface'], 
717                            vlan, type))
718                    else:
719                        dragon_map[m['element']] = [( m['interface'], 
720                            vlan, type),]
721            except KeyError:
722                raise service_error(service_error.req,
723                        "Missing connectivity info")
724
725        # Weed out the things we aren't going to instantiate: Segments, portal
726        # substrates, and portal interfaces.  (The copy in the for loop allows
727        # us to delete from e.elements in side the for loop).  While we're
728        # touching all the elements, we also adjust paths from the original
729        # testbed to local testbed paths and put the federation commands into
730        # the start commands
731        for e in [e for e in t.elements]:
732            if isinstance(e, topdl.Segment):
733                t.elements.remove(e)
734            if isinstance(e, topdl.Computer):
735                self.add_kit(e, self.federation_software)
736                if e.get_attribute('portal') and self.portal_startcommand:
737                    # Add local portal support software
738                    self.add_kit(e, self.portal_software)
739                    # Portals never have a user-specified start command
740                    e.set_attribute('startup', self.portal_startcommand)
741                elif self.node_startcommand:
742                    if e.get_attribute('startup'):
743                        e.set_attribute('startup', "%s \\$USER '%s'" % \
744                                (self.node_startcommand, 
745                                    e.get_attribute('startup')))
746                    else:
747                        e.set_attribute('startup', self.node_startcommand)
748
749                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
750                # Remove portal interfaces that do not connect to DRAGON
751                e.interface = [i for i in e.interface \
752                        if not i.get_attribute('portal') or i.name in dinf ]
753            # Fix software paths
754            for s in getattr(e, 'software', []):
755                s.location = re.sub("^.*/", softdir, s.location)
756
757        t.substrates = [ s.clone() for s in t.substrates ]
758        t.incorporate_elements()
759
760        # Customize the ns2 output for local portal commands and images
761        filters = []
762
763        if self.dragon_endpoint:
764            add_filter = not_dragon(dragon_map)
765            filters.append(dragon_commands(dragon_map))
766        else:
767            add_filter = None
768
769        if self.portal_command:
770            filters.append(topdl.generate_portal_command_filter(
771                self.portal_command, add_filter=add_filter))
772
773        if self.portal_image:
774            filters.append(topdl.generate_portal_image_filter(
775                self.portal_image))
776
777        if self.portal_type:
778            filters.append(topdl.generate_portal_hardware_filter(
779                self.portal_type))
780
781        # Convert to ns and write it out
782        expfile = topdl.topology_to_ns2(t, filters)
783        try:
784            f = open(expfn, "w")
785            print >>f, expfile
786            f.close()
787        except EnvironmentError:
788            raise service_error(service_error.internal,
789                    "Cannot write experiment file %s: %s" % (expfn,e))
790
791    def export_store_info(self, cf, proj, ename, connInfo):
792        """
793        For the export requests in the connection info, install the peer names
794        at the experiment controller via SetValue calls.
795        """
796
797        for c in connInfo:
798            for p in [ p for p in c.get('parameter', []) \
799                    if p.get('type', '') == 'output']:
800
801                if p.get('name', '') == 'peer':
802                    k = p.get('key', None)
803                    surl = p.get('store', None)
804                    if surl and k and k.index('/') != -1:
805                        value = "%s.%s.%s%s" % \
806                                (k[k.index('/')+1:], ename, proj, self.domain)
807                        req = { 'name': k, 'value': value }
808                        self.log.debug("Setting %s to %s on %s" % \
809                                (k, value, surl))
810                        self.call_SetValue(surl, req, cf)
811                    else:
812                        self.log.error("Bad export request: %s" % p)
813                elif p.get('name', '') == 'ssh_port':
814                    k = p.get('key', None)
815                    surl = p.get('store', None)
816                    if surl and k:
817                        req = { 'name': k, 'value': self.ssh_port }
818                        self.log.debug("Setting %s to %s on %s" % \
819                                (k, self.ssh_port, surl))
820                        self.call_SetValue(surl, req, cf)
821                    else:
822                        self.log.error("Bad export request: %s" % p)
823                else:
824                    self.log.error("Unknown export parameter: %s" % \
825                            p.get('name'))
826                    continue
827
828    def add_seer_node(self, topo, name, startup):
829        """
830        Add a seer node to the given topology, with the startup command passed
831        in.  Used by configure seer_services.
832        """
833        c_node = topdl.Computer(
834                name=name, 
835                os= topdl.OperatingSystem(
836                    attribute=[
837                    { 'attribute': 'osid', 
838                        'value': self.local_seer_image },
839                    ]),
840                attribute=[
841                    { 'attribute': 'startup', 'value': startup },
842                    ]
843                )
844        self.add_kit(c_node, self.local_seer_software)
845        topo.elements.append(c_node)
846
847    def configure_seer_services(self, services, topo, softdir):
848        """
849        Make changes to the topology required for the seer requests being made.
850        Specifically, add any control or master nodes required and set up the
851        start commands on the nodes to interconnect them.
852        """
853        local_seer = False      # True if we need to add a control node
854        collect_seer = False    # True if there is a seer-master node
855        seer_master= False      # True if we need to add the seer-master
856        for s in services:
857            s_name = s.get('name', '')
858            s_vis = s.get('visibility','')
859
860            if s_name  == 'local_seer_control' and s_vis == 'export':
861                local_seer = True
862            elif s_name == 'seer_master':
863                if s_vis == 'import':
864                    collect_seer = True
865                elif s_vis == 'export':
866                    seer_master = True
867       
868        # We've got the whole picture now, so add nodes if needed and configure
869        # them to interconnect properly.
870        if local_seer or seer_master:
871            # Copy local seer control node software to the tempdir
872            for l, f in self.local_seer_software:
873                base = os.path.basename(f)
874                copy_file(f, "%s/%s" % (softdir, base))
875        # If we're collecting seers somewhere the controllers need to talk to
876        # the master.  In testbeds that export the service, that will be a
877        # local node that we'll add below.  Elsewhere it will be the control
878        # portal that will port forward to the exporting master.
879        if local_seer:
880            if collect_seer:
881                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
882            else:
883                startup = self.local_seer_start
884            self.add_seer_node(topo, 'control', startup)
885        # If this is the seer master, add that node, too.
886        if seer_master:
887            self.add_seer_node(topo, 'seer-master', 
888                    "%s -R -n -R seer-master -R -A -R sink" % \
889                            self.seer_master_start)
890
891    def retrieve_software(self, topo, certfile, softdir):
892        """
893        Collect the software that nodes in the topology need loaded and stage
894        it locally.  This implies retrieving it from the experiment_controller
895        and placing it into softdir.  Certfile is used to prove that this node
896        has access to that data (it's the allocation/segment fedid).  Finally
897        local portal and federation software is also copied to the same staging
898        directory for simplicity - all software needed for experiment creation
899        is in softdir.
900        """
901        sw = set()
902        for e in topo.elements:
903            for s in getattr(e, 'software', []):
904                sw.add(s.location)
905        for s in sw:
906            self.log.debug("Retrieving %s" % s)
907            try:
908                get_url(s, certfile, softdir)
909            except:
910                t, v, st = sys.exc_info()
911                raise service_error(service_error.internal,
912                        "Error retrieving %s: %s" % (s, v))
913
914        # Copy local federation and portal node software to the tempdir
915        for s in (self.federation_software, self.portal_software):
916            for l, f in s:
917                base = os.path.basename(f)
918                copy_file(f, "%s/%s" % (softdir, base))
919
920
921    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
922        """
923        Gather common configuration files, retrieve or create an experiment
924        name and project name, and return the ssh_key filenames.  Create an
925        allocation log bound to the state log variable as well.
926        """
927        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
928                'seer_ca_pem', 'seer_node_pem')
929        ename = None
930        pubkey_base = None
931        secretkey_base = None
932        proj = None
933        user = None
934        alloc_log = None
935        nonce_experiment = False
936        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
937
938        self.state_lock.acquire()
939        if aid in self.allocation:
940            proj = self.allocation[aid].get('project', None)
941            if not proj: 
942                proj = self.allocation[aid].get('sproject', None)
943        self.state_lock.release()
944
945        if not proj:
946            raise service_error(service_error.internal, 
947                    "Can't find project for %s" %aid)
948
949        for a in attrs:
950            if a['attribute'] in configs:
951                try:
952                    self.log.debug("Retrieving %s from %s" % \
953                            (a['attribute'], a['value']))
954                    get_url(a['value'], certfile, tmpdir)
955                except:
956                    t, v, st = sys.exc_info()
957                    raise service_error(service_error.internal,
958                            "Error retrieving %s: %s" % (a.get('value', ""), v))
959            if a['attribute'] == 'ssh_pubkey':
960                pubkey_base = a['value'].rpartition('/')[2]
961            if a['attribute'] == 'ssh_secretkey':
962                secretkey_base = a['value'].rpartition('/')[2]
963            if a['attribute'] == 'experiment_name':
964                ename = a['value']
965
966        # Names longer than the emulab max are discarded
967        if ename and len(ename) <= self.max_name_len:
968            # Clean up the experiment name so that emulab will accept it.
969            ename = re.sub(vchars_re, '-', ename)
970
971        else:
972            ename = ""
973            for i in range(0,5):
974                ename += random.choice(string.ascii_letters)
975            nonce_experiment = True
976            self.log.warn("No experiment name or suggestion too long: " + \
977                    "picked one randomly: %s" % ename)
978
979        if not pubkey_base:
980            raise service_error(service_error.req, 
981                    "No public key attribute")
982
983        if not secretkey_base:
984            raise service_error(service_error.req, 
985                    "No secret key attribute")
986
987        self.state_lock.acquire()
988        if aid in self.allocation:
989            user = self.allocation[aid].get('user', None)
990            self.allocation[aid]['experiment'] = ename
991            self.allocation[aid]['nonce'] = nonce_experiment
992            self.allocation[aid]['log'] = [ ]
993            # Create a logger that logs to the experiment's state object as
994            # well as to the main log file.
995            alloc_log = logging.getLogger('fedd.access.%s' % ename)
996            h = logging.StreamHandler(
997                    list_log.list_log(self.allocation[aid]['log']))
998            # XXX: there should be a global one of these rather than
999            # repeating the code.
1000            h.setFormatter(logging.Formatter(
1001                "%(asctime)s %(name)s %(message)s",
1002                        '%d %b %y %H:%M:%S'))
1003            alloc_log.addHandler(h)
1004            self.write_state()
1005        self.state_lock.release()
1006
1007        if not user:
1008            raise service_error(service_error.internal, 
1009                    "Can't find creation user for %s" %aid)
1010
1011        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1012
1013    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1014        """
1015        Store key bits of experiment state in the global repository, including
1016        the response that may need to be replayed, and return the response.
1017        """
1018        i = 0
1019        # Copy the assigned names into the return topology
1020        for e in topo.elements:
1021            if isinstance(e, topdl.Computer):
1022                if not self.create_debug:
1023                    if e.name in starter.node:
1024                        e.localname.append("%s%s" % \
1025                                (starter.node[e.name][0], self.domain))
1026                        e.status = starter.node[e.name][1]
1027                else:
1028                    # Simple debugging assignment
1029                    e.localname.append("node%d%s" % (i, self.domain))
1030                    e.status = 'active'
1031                    e.operation.extend(('testop1', 'testop2'))
1032                    i += 1
1033
1034        # Grab the log (this is some anal locking, but better safe than
1035        # sorry)
1036        self.state_lock.acquire()
1037        logv = "".join(self.allocation[aid]['log'])
1038        # It's possible that the StartSegment call gets retried (!).
1039        # if the 'started' key is in the allocation, we'll return it rather
1040        # than redo the setup.
1041        self.allocation[aid]['started'] = { 
1042                'allocID': alloc_id,
1043                'allocationLog': logv,
1044                'segmentdescription': { 
1045                    'topdldescription': topo.clone().to_dict()
1046                    },
1047                'proof': proof.to_dict(),
1048                }
1049        retval = copy.copy(self.allocation[aid]['started'])
1050        self.write_state()
1051        self.state_lock.release()
1052        return retval
1053   
1054    # End of StartSegment support routines
1055
1056    def StartSegment(self, req, fid):
1057        err = None  # Any service_error generated after tmpdir is created
1058        rv = None   # Return value from segment creation
1059
1060        try:
1061            req = req['StartSegmentRequestBody']
1062            auth_attr = req['allocID']['fedid']
1063            topref = req['segmentdescription']['topdldescription']
1064        except KeyError:
1065            raise service_error(server_error.req, "Badly formed request")
1066
1067        connInfo = req.get('connection', [])
1068        services = req.get('service', [])
1069        aid = "%s" % auth_attr
1070        attrs = req.get('fedAttr', [])
1071
1072        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1073                with_proof=True)
1074        if not access_ok:
1075            raise service_error(service_error.access, "Access denied")
1076        else:
1077            # See if this is a replay of an earlier succeeded StartSegment -
1078            # sometimes SSL kills 'em.  If so, replay the response rather than
1079            # redoing the allocation.
1080            self.state_lock.acquire()
1081            retval = self.allocation[aid].get('started', None)
1082            self.state_lock.release()
1083            if retval:
1084                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1085                        "replaying response")
1086                return retval
1087
1088        # A new request.  Do it.
1089
1090        if topref: topo = topdl.Topology(**topref)
1091        else:
1092            raise service_error(service_error.req, 
1093                    "Request missing segmentdescription'")
1094       
1095        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1096        try:
1097            tmpdir = tempfile.mkdtemp(prefix="access-")
1098            softdir = "%s/software" % tmpdir
1099            os.mkdir(softdir)
1100        except EnvironmentError:
1101            raise service_error(service_error.internal, "Cannot create tmp dir")
1102
1103        # Try block alllows us to clean up temporary files.
1104        try:
1105            self.retrieve_software(topo, certfile, softdir)
1106            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1107                    self.initialize_experiment_info(attrs, aid, 
1108                            certfile, tmpdir)
1109
1110            if '/' in proj: proj, gid = proj.split('/')
1111            else: gid = None
1112
1113
1114            # Set up userconf and seer if needed
1115            self.configure_userconf(services, tmpdir)
1116            self.configure_seer_services(services, topo, softdir)
1117            # Get and send synch store variables
1118            self.export_store_info(certfile, proj, ename, connInfo)
1119            self.import_store_info(certfile, connInfo)
1120
1121            expfile = "%s/experiment.tcl" % tmpdir
1122
1123            self.generate_portal_configs(topo, pubkey_base, 
1124                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1125            self.generate_ns2(topo, expfile, 
1126                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1127
1128            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1129                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1130                    cert=self.xmlrpc_cert)
1131            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1132        except service_error, e:
1133            err = e
1134        except:
1135            t, v, st = sys.exc_info()
1136            err = service_error(service_error.internal, "%s: %s" % \
1137                    (v, traceback.extract_tb(st)))
1138
1139        # Walk up tmpdir, deleting as we go
1140        if self.cleanup: self.remove_dirs(tmpdir)
1141        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1142
1143        if rv:
1144            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1145                    proof)
1146        elif err:
1147            raise service_error(service_error.federant,
1148                    "Swapin failed: %s" % err)
1149        else:
1150            raise service_error(service_error.federant, "Swapin failed")
1151
1152    def TerminateSegment(self, req, fid):
1153        try:
1154            req = req['TerminateSegmentRequestBody']
1155        except KeyError:
1156            raise service_error(server_error.req, "Badly formed request")
1157
1158        auth_attr = req['allocID']['fedid']
1159        aid = "%s" % auth_attr
1160        attrs = req.get('fedAttr', [])
1161
1162        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1163                with_proof=True)
1164        if not access_ok:
1165            raise service_error(service_error.access, "Access denied")
1166
1167        self.state_lock.acquire()
1168        if aid in self.allocation:
1169            proj = self.allocation[aid].get('project', None)
1170            if not proj: 
1171                proj = self.allocation[aid].get('sproject', None)
1172            user = self.allocation[aid].get('user', None)
1173            ename = self.allocation[aid].get('experiment', None)
1174            nonce = self.allocation[aid].get('nonce', False)
1175        else:
1176            proj = None
1177            user = None
1178            ename = None
1179            nonce = False
1180        self.state_lock.release()
1181
1182        if not proj:
1183            raise service_error(service_error.internal, 
1184                    "Can't find project for %s" % aid)
1185        else:
1186            if '/' in proj: proj, gid = proj.split('/')
1187            else: gid = None
1188
1189        if not user:
1190            raise service_error(service_error.internal, 
1191                    "Can't find creation user for %s" % aid)
1192        if not ename:
1193            raise service_error(service_error.internal, 
1194                    "Can't find experiment name for %s" % aid)
1195        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1196                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1197        stopper(self, user, proj, ename, gid, nonce)
1198        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.