source: fedd/federation/emulab_access.py @ 06c1dba

compt_changesinfo-ops
Last change on this file since 06c1dba was 06c1dba, checked in by Ted Faber <faber@…>, 13 years ago

Fisrt step to multi-user - checkpoint

  • Property mode set to 100644
File size: 44.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base, legacy_access):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    max_name_len = 19
53
54    def __init__(self, config=None, auth=None):
55        """
56        Initializer.  Pulls parameters out of the ConfigParser's access section.
57        """
58
59        access_base.__init__(self, config, auth)
60
61        self.max_name_len = access.max_name_len
62
63        self.allow_proxy = config.getboolean("access", "allow_proxy")
64
65        self.domain = config.get("access", "domain")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78        self.boss = config.get("access", "boss")
79        self.ops = config.get("access", "ops")
80        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
81        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
82
83        self.dragon_endpoint = config.get("access", "dragon")
84        self.dragon_vlans = config.get("access", "dragon_vlans")
85        self.deter_internal = config.get("access", "deter_internal")
86
87        self.tunnel_config = config.getboolean("access", "tunnel_config")
88        self.portal_command = config.get("access", "portal_command")
89        self.portal_image = config.get("access", "portal_image")
90        self.portal_type = config.get("access", "portal_type") or "pc"
91        self.portal_startcommand = config.get("access", "portal_startcommand")
92        self.node_startcommand = config.get("access", "node_startcommand")
93
94        self.federation_software = self.software_list(self.federation_software)
95        self.portal_software = self.software_list(self.portal_software)
96        self.local_seer_software = self.software_list(self.local_seer_software)
97
98        self.access_type = self.access_type.lower()
99        self.start_segment = emulab_segment.start_segment
100        self.stop_segment = emulab_segment.stop_segment
101        self.info_segment = emulab_segment.info_segment
102        self.operation_segment = emulab_segment.operation_segment
103
104        self.restricted = [ ]
105        tb = config.get('access', 'testbed')
106        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
107        else: self.testbed = [ ]
108
109        # authorization information
110        self.auth_type = config.get('access', 'auth_type') \
111                or 'legacy'
112        self.auth_dir = config.get('access', 'auth_dir')
113        accessdb = config.get("access", "accessdb")
114        # initialize the authorization system
115        if self.auth_type == 'legacy':
116            self.access = { }
117            if accessdb:
118                self.legacy_read_access(accessdb, self.legacy_access_tuple)
119        elif self.auth_type == 'abac':
120            self.auth = abac_authorizer(load=self.auth_dir)
121            self.access = [ ]
122            if accessdb:
123                self.read_access(accessdb, self.access_tuple)
124        else:
125            raise service_error(service_error.internal, 
126                    "Unknown auth_type: %s" % self.auth_type)
127
128        # read_state in the base_class
129        self.state_lock.acquire()
130        for a  in ('allocation', 'projects', 'keys', 'types'):
131            if a not in self.state:
132                self.state[a] = { }
133        self.allocation = self.state['allocation']
134        self.projects = self.state['projects']
135        self.keys = self.state['keys']
136        self.types = self.state['types']
137        if self.auth_type == "legacy": 
138            # Add the ownership attributes to the authorizer.  Note that the
139            # indices of the allocation dict are strings, but the attributes are
140            # fedids, so there is a conversion.
141            for k in self.allocation.keys():
142                for o in self.allocation[k].get('owners', []):
143                    self.auth.set_attribute(o, fedid(hexstr=k))
144                if self.allocation[k].has_key('userconfig'):
145                    sfid = self.allocation[k]['userconfig']
146                    fid = fedid(hexstr=sfid)
147                    self.auth.set_attribute(fid, "/%s" % sfid)
148        self.state_lock.release()
149        self.exports = {
150                'SMB': self.export_SMB,
151                'seer': self.export_seer,
152                'tmcd': self.export_tmcd,
153                'userconfig': self.export_userconfig,
154                'project_export': self.export_project_export,
155                'local_seer_control': self.export_local_seer,
156                'seer_master': self.export_seer_master,
157                'hide_hosts': self.export_hide_hosts,
158                }
159
160        if not self.local_seer_image or not self.local_seer_software or \
161                not self.local_seer_start:
162            if 'local_seer_control' in self.exports:
163                del self.exports['local_seer_control']
164
165        if not self.local_seer_image or not self.local_seer_software or \
166                not self.seer_master_start:
167            if 'seer_master' in self.exports:
168                del self.exports['seer_master']
169
170
171        self.soap_services = {\
172            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
173            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
174            'StartSegment': soap_handler("StartSegment", self.StartSegment),
175            'TerminateSegment': soap_handler("TerminateSegment",
176                self.TerminateSegment),
177            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
178            'OperationSegment': soap_handler("OperationSegment",
179                self.OperationSegment),
180            }
181        self.xmlrpc_services =  {\
182            'RequestAccess': xmlrpc_handler('RequestAccess',
183                self.RequestAccess),
184            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
185                self.ReleaseAccess),
186            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': xmlrpc_handler('TerminateSegment',
188                self.TerminateSegment),
189            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
190            'OperationSegment': xmlrpc_handler("OperationSegment",
191                self.OperationSegment),
192            }
193
194        self.call_SetValue = service_caller('SetValue')
195        self.call_GetValue = service_caller('GetValue', log=self.log)
196
197        if not config.has_option("allocate", "uri"):
198            self.allocate_project = \
199                allocate_project_local(config, auth)
200        else:
201            self.allocate_project = \
202                allocate_project_remote(config, auth)
203
204
205        # If the project allocator exports services, put them in this object's
206        # maps so that classes that instantiate this can call the services.
207        self.soap_services.update(self.allocate_project.soap_services)
208        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
209
210    @staticmethod
211    def legacy_access_tuple(str):
212        """
213        Convert a string of the form (id[:resources:resouces], id, id) into a
214        tuple of the form (project, user, user) where users may be names or
215        fedids.  The resources strings are obsolete and ignored.
216        """
217        def parse_name(n):
218            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
219            else: return n
220
221        str = str.strip()
222        if str.startswith('(') and str.endswith(')'):
223            str = str[1:-1]
224            names = [ s.strip() for s in str.split(",")]
225            if len(names) > 3:
226                raise self.parse_error("More than three fields in name")
227            first = names[0].split(":")
228            if first == 'fedid:':
229                del first[0]
230                first[0] = fedid(hexstr=first[0])
231            names[0] = first[0]
232
233            for i in range(1,2):
234                names[i] = parse_name(names[i])
235
236            return tuple(names)
237        else:
238            raise self.parse_error('Bad mapping (unbalanced parens)')
239
240    @staticmethod
241    def access_tuple(str):
242        """
243        Convert a string of the form (id, id) into an access_project.  This is
244        called by read_access to convert to local attributes.  It returns
245        a tuple of the form (project, user, user) where the two users are
246        always the same.
247        """
248
249        str = str.strip()
250        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
251            # The slice takes the parens off the string.
252            proj, user = str[1:-1].split(',')
253            return (proj.strip(), user.strip(), user.strip())
254        else:
255            raise self.parse_error(
256                    'Bad mapping (unbalanced parens or more than 1 comma)')
257
258    # RequestAccess support routines
259
260    def legacy_lookup_access(self, req, fid):
261        """
262        Look up the local access control information mapped to this fedid and
263        credentials.  In this case it is a (project, create_user, access_user)
264        triple, and a triple of three booleans indicating which, if any will
265        need to be dynamically created.  Finally a list of owners for that
266        allocation is returned.
267
268        lookup_access_base pulls the first triple out, and it is parsed by this
269        routine into the boolean map.  Owners is always the controlling fedid.
270        """
271        # Return values
272        rp = None
273        ru = None
274        # This maps a valid user to the Emulab projects and users to use
275        found, match = self.legacy_lookup_access_base(req, fid)
276        tb, project, user = match
277       
278        if found == None:
279            raise service_error(service_error.access,
280                    "Access denied - cannot map access")
281
282        # resolve <dynamic> and <same> in found
283        dyn_proj = False
284        dyn_create_user = False
285        dyn_service_user = False
286
287        if found[0] == "<same>":
288            if project != None:
289                rp = project
290            else : 
291                raise service_error(\
292                        service_error.server_config,
293                        "Project matched <same> when no project given")
294        elif found[0] == "<dynamic>":
295            rp = None
296            dyn_proj = True
297        else:
298            rp = found[0]
299
300        if found[1] == "<same>":
301            if user_match == "<any>":
302                if user != None: rcu = user[0]
303                else: raise service_error(\
304                        service_error.server_config,
305                        "Matched <same> on anonymous request")
306            else:
307                rcu = user_match
308        elif found[1] == "<dynamic>":
309            rcu = None
310            dyn_create_user = True
311        else:
312            rcu = found[1]
313       
314        if found[2] == "<same>":
315            if user_match == "<any>":
316                if user != None: rsu = user[0]
317                else: raise service_error(\
318                        service_error.server_config,
319                        "Matched <same> on anonymous request")
320            else:
321                rsu = user_match
322        elif found[2] == "<dynamic>":
323            rsu = None
324            dyn_service_user = True
325        else:
326            rsu = found[2]
327
328        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
329                [ fid ]
330
331    def do_project_allocation(self, dyn, project, user):
332        """
333        Call the project allocation routines and return the info.
334        """
335        if dyn: 
336            # Compose the dynamic project request
337            # (only dynamic, dynamic currently allowed)
338            preq = { 'AllocateProjectRequestBody': \
339                        { 'project' : {\
340                            'user': [ \
341                            { \
342                                'access': [ { 
343                                    'sshPubkey': self.ssh_pubkey_file } ],
344                                 'role': "serviceAccess",\
345                            }, \
346                            { \
347                                'access': [ { 
348                                    'sshPubkey': self.ssh_pubkey_file } ],
349                                 'role': "experimentCreation",\
350                            }, \
351                            ], \
352                            }\
353                        }\
354                    }
355            return self.allocate_project.dynamic_project(preq)
356        else:
357            preq = {'StaticProjectRequestBody' : \
358                    { 'project': \
359                        { 'name' : { 'localname' : project },\
360                          'user' : [ \
361                            {\
362                                'userID': { 'localname' : user }, \
363                                'access': [ { 
364                                    'sshPubkey': self.ssh_pubkey_file } ],
365                                'role': 'experimentCreation'\
366                            },\
367                            {\
368                                'userID': { 'localname' : user}, \
369                                'access': [ { 
370                                    'sshPubkey': self.ssh_pubkey_file } ],
371                                'role': 'serviceAccess'\
372                            },\
373                        ]}\
374                    }\
375            }
376            return self.allocate_project.static_project(preq)
377
378    def save_project_state(self, aid, ap, dyn, owners):
379        """
380        Parse out and save the information relevant to the project created for
381        this experiment.  That info is largely in ap and owners.  dyn indicates
382        that the project was created dynamically.  Return the user and project
383        names.
384        """
385        self.state_lock.acquire()
386        self.allocation[aid] = { }
387        self.allocation[aid]['auth'] = set()
388        try:
389            pname = ap['project']['name']['localname']
390        except KeyError:
391            pname = None
392
393        if dyn:
394            if not pname:
395                self.state_lock.release()
396                raise service_error(service_error.internal,
397                        "Misformed allocation response?")
398            if pname in self.projects: self.projects[pname] += 1
399            else: self.projects[pname] = 1
400            self.allocation[aid]['project'] = pname
401        else:
402            # sproject is a static project associated with this allocation.
403            self.allocation[aid]['sproject'] = pname
404
405        self.allocation[aid]['keys'] = [ ]
406
407        try:
408            for u in ap['project']['user']:
409                uname = u['userID']['localname']
410                if u['role'] == 'experimentCreation':
411                    self.allocation[aid]['user'] = uname
412                for k in [ k['sshPubkey'] for k in u['access'] \
413                        if k.has_key('sshPubkey') ]:
414                    kv = "%s:%s" % (uname, k)
415                    if self.keys.has_key(kv): self.keys[kv] += 1
416                    else: self.keys[kv] = 1
417                    self.allocation[aid]['keys'].append((uname, k))
418        except KeyError:
419            self.state_lock.release()
420            raise service_error(service_error.internal,
421                    "Misformed allocation response?")
422
423        self.allocation[aid]['owners'] = owners
424        self.write_state()
425        self.state_lock.release()
426        return (pname, uname)
427
428    # End of RequestAccess support routines
429
430    def RequestAccess(self, req, fid):
431        """
432        Handle the access request.  Proxy if not for us.
433
434        Parse out the fields and make the allocations or rejections if for us,
435        otherwise, assuming we're willing to proxy, proxy the request out.
436        """
437
438        def gateway_hardware(h):
439            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
440            else: return h
441
442        def get_export_project(svcs):
443            """
444            if the service requests includes one to export a project, return
445            that project.
446            """
447            rv = None
448            for s in svcs:
449                if s.get('name', '') == 'project_export' and \
450                        s.get('visibility', '') == 'export':
451                    if not rv: 
452                        for a in s.get('fedAttr', []):
453                            if a.get('attribute', '') == 'project' \
454                                    and 'value' in a:
455                                rv = a['value']
456                    else:
457                        raise service_error(service_error, access, 
458                                'Requesting multiple project exports is ' + \
459                                        'not supported');
460            return rv
461
462        # The dance to get into the request body
463        if req.has_key('RequestAccessRequestBody'):
464            req = req['RequestAccessRequestBody']
465        else:
466            raise service_error(service_error.req, "No request!?")
467
468        # if this includes a project export request, construct a filter such
469        # that only the ABAC attributes mapped to that project are checked for
470        # access.
471        if 'service' in req:
472            ep = get_export_project(req['service'])
473            if ep: pf = lambda(a): a.value[0] == ep
474            else: pf = None
475        else:
476            ep = None
477            pf = None
478
479        if self.auth.import_credentials(
480                data_list=req.get('abac_credential', [])):
481            self.auth.save()
482
483        if self.auth_type == "legacy":
484            found, dyn, owners= self.legacy_lookup_access(req, fid)
485            proof = access_proof("me", fid, "create")
486        elif self.auth_type == 'abac':
487            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
488        else:
489            raise service_error(service_error.internal, 
490                    'Unknown auth_type: %s' % self.auth_type)
491        ap = None
492
493        # This only happens in legacy lookups, but if this user has access to
494        # the testbed but not the project to be exported, raise the error.
495        if ep and ep != found[0]:
496            raise service_error(service_error.access,
497                    "Cannot export %s" % ep)
498
499        if self.ssh_pubkey_file:
500            ap = self.do_project_allocation(dyn[1], found[0], found[1])
501        else:
502            raise service_error(service_error.internal, 
503                    "SSH access parameters required")
504        # keep track of what's been added
505        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
506        aid = unicode(allocID)
507
508        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
509
510        services, svc_state = self.export_services(req.get('service',[]),
511                pname, uname)
512        self.state_lock.acquire()
513        # Store services state in global state
514        for k, v in svc_state.items():
515            self.allocation[aid][k] = v
516        self.append_allocation_authorization(aid, 
517                set([(o, allocID) for o in owners]), state_attr='allocation')
518        self.write_state()
519        self.state_lock.release()
520        try:
521            f = open("%s/%s.pem" % (self.certdir, aid), "w")
522            print >>f, alloc_cert
523            f.close()
524        except EnvironmentError, e:
525            raise service_error(service_error.internal, 
526                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
527        resp = self.build_access_response({ 'fedid': allocID } ,
528                ap, services, proof)
529        return resp
530
531    def do_release_project(self, del_project, del_users, del_types):
532        """
533        If a project and users has to be deleted, make the call.
534        """
535        msg = { 'project': { }}
536        if del_project:
537            msg['project']['name']= {'localname': del_project}
538        users = [ ]
539        for u in del_users.keys():
540            users.append({ 'userID': { 'localname': u },\
541                'access' :  \
542                        [ {'sshPubkey' : s } for s in del_users[u]]\
543            })
544        if users: 
545            msg['project']['user'] = users
546        if len(del_types) > 0:
547            msg['resources'] = { 'node': \
548                    [ {'hardware': [ h ] } for h in del_types ]\
549                }
550        if self.allocate_project.release_project:
551            msg = { 'ReleaseProjectRequestBody' : msg}
552            self.allocate_project.release_project(msg)
553
554    def ReleaseAccess(self, req, fid):
555        # The dance to get into the request body
556        if req.has_key('ReleaseAccessRequestBody'):
557            req = req['ReleaseAccessRequestBody']
558        else:
559            raise service_error(service_error.req, "No request!?")
560
561        try:
562            if req['allocID'].has_key('localname'):
563                auth_attr = aid = req['allocID']['localname']
564            elif req['allocID'].has_key('fedid'):
565                aid = unicode(req['allocID']['fedid'])
566                auth_attr = req['allocID']['fedid']
567            else:
568                raise service_error(service_error.req,
569                        "Only localnames and fedids are understood")
570        except KeyError:
571            raise service_error(service_error.req, "Badly formed request")
572
573        self.log.debug("[access] deallocation requested for %s by %s" % \
574                (aid, fid))
575        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
576                with_proof=True)
577        if not access_ok:
578            self.log.debug("[access] deallocation denied for %s", aid)
579            raise service_error(service_error.access, "Access Denied")
580
581        # If we know this allocation, reduce the reference counts and
582        # remove the local allocations.  Otherwise report an error.  If
583        # there is an allocation to delete, del_users will be a dictonary
584        # of sets where the key is the user that owns the keys in the set.
585        # We use a set to avoid duplicates.  del_project is just the name
586        # of any dynamic project to delete.  We're somewhat lazy about
587        # deleting authorization attributes.  Having access to something
588        # that doesn't exist isn't harmful.
589        del_users = { }
590        del_project = None
591        del_types = set()
592
593        self.state_lock.acquire()
594        if aid in self.allocation:
595            self.log.debug("Found allocation for %s" %aid)
596            self.clear_allocation_authorization(aid, state_attr='allocation')
597            for k in self.allocation[aid]['keys']:
598                kk = "%s:%s" % k
599                self.keys[kk] -= 1
600                if self.keys[kk] == 0:
601                    if not del_users.has_key(k[0]):
602                        del_users[k[0]] = set()
603                    del_users[k[0]].add(k[1])
604                    del self.keys[kk]
605
606            if 'project' in self.allocation[aid]:
607                pname = self.allocation[aid]['project']
608                self.projects[pname] -= 1
609                if self.projects[pname] == 0:
610                    del_project = pname
611                    del self.projects[pname]
612
613            if 'types' in self.allocation[aid]:
614                for t in self.allocation[aid]['types']:
615                    self.types[t] -= 1
616                    if self.types[t] == 0:
617                        if not del_project: del_project = t[0]
618                        del_types.add(t[1])
619                        del self.types[t]
620
621            del self.allocation[aid]
622            self.write_state()
623            self.state_lock.release()
624            # If we actually have resources to deallocate, prepare the call.
625            if del_project or del_users:
626                self.do_release_project(del_project, del_users, del_types)
627            # And remove the access cert
628            cf = "%s/%s.pem" % (self.certdir, aid)
629            self.log.debug("Removing %s" % cf)
630            os.remove(cf)
631            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
632        else:
633            self.state_lock.release()
634            raise service_error(service_error.req, "No such allocation")
635
636    # These are subroutines for StartSegment
637    def generate_ns2(self, topo, expfn, softdir, connInfo):
638        """
639        Convert topo into an ns2 file, decorated with appropriate commands for
640        the particular testbed setup.  Convert all requests for software, etc
641        to point at the staged copies on this testbed and add the federation
642        startcommands.
643        """
644        class dragon_commands:
645            """
646            Functor to spit out approrpiate dragon commands for nodes listed in
647            the connectivity description.  The constructor makes a dict mapping
648            dragon nodes to their parameters and the __call__ checks each
649            element in turn for membership.
650            """
651            def __init__(self, map):
652                self.node_info = map
653
654            def __call__(self, e):
655                s = ""
656                if isinstance(e, topdl.Computer):
657                    if self.node_info.has_key(e.name):
658                        info = self.node_info[e.name]
659                        for ifname, vlan, type in info:
660                            for i in e.interface:
661                                if i.name == ifname:
662                                    addr = i.get_attribute('ip4_address')
663                                    subs = i.substrate[0]
664                                    break
665                            else:
666                                raise service_error(service_error.internal,
667                                        "No interface %s on element %s" % \
668                                                (ifname, e.name))
669                            # XXX: do netmask right
670                            if type =='link':
671                                s = ("tb-allow-external ${%s} " + \
672                                        "dragonportal ip %s vlan %s " + \
673                                        "netmask 255.255.255.0\n") % \
674                                        (topdl.to_tcl_name(e.name), addr, vlan)
675                            elif type =='lan':
676                                s = ("tb-allow-external ${%s} " + \
677                                        "dragonportal " + \
678                                        "ip %s vlan %s usurp %s\n") % \
679                                        (topdl.to_tcl_name(e.name), addr, 
680                                                vlan, subs)
681                            else:
682                                raise service_error(service_error_internal,
683                                        "Unknown DRAGON type %s" % type)
684                return s
685
686        class not_dragon:
687            """
688            Return true if a node is in the given map of dragon nodes.
689            """
690            def __init__(self, map):
691                self.nodes = set(map.keys())
692
693            def __call__(self, e):
694                return e.name not in self.nodes
695
696        # Main line of generate_ns2
697        t = topo.clone()
698
699        # Create the map of nodes that need direct connections (dragon
700        # connections) from the connInfo
701        dragon_map = { }
702        for i in [ i for i in connInfo if i['type'] == 'transit']:
703            for a in i.get('fedAttr', []):
704                if a['attribute'] == 'vlan_id':
705                    vlan = a['value']
706                    break
707            else:
708                raise service_error(service_error.internal, "No vlan tag")
709            members = i.get('member', [])
710            if len(members) > 1: type = 'lan'
711            else: type = 'link'
712
713            try:
714                for m in members:
715                    if m['element'] in dragon_map:
716                        dragon_map[m['element']].append(( m['interface'], 
717                            vlan, type))
718                    else:
719                        dragon_map[m['element']] = [( m['interface'], 
720                            vlan, type),]
721            except KeyError:
722                raise service_error(service_error.req,
723                        "Missing connectivity info")
724
725        # Weed out the things we aren't going to instantiate: Segments, portal
726        # substrates, and portal interfaces.  (The copy in the for loop allows
727        # us to delete from e.elements in side the for loop).  While we're
728        # touching all the elements, we also adjust paths from the original
729        # testbed to local testbed paths and put the federation commands into
730        # the start commands
731        for e in [e for e in t.elements]:
732            if isinstance(e, topdl.Segment):
733                t.elements.remove(e)
734            if isinstance(e, topdl.Computer):
735                self.add_kit(e, self.federation_software)
736                if e.get_attribute('portal') and self.portal_startcommand:
737                    # Add local portal support software
738                    self.add_kit(e, self.portal_software)
739                    # Portals never have a user-specified start command
740                    e.set_attribute('startup', self.portal_startcommand)
741                elif self.node_startcommand:
742                    if e.get_attribute('startup'):
743                        e.set_attribute('startup', "%s \\$USER '%s'" % \
744                                (self.node_startcommand, 
745                                    e.get_attribute('startup')))
746                    else:
747                        e.set_attribute('startup', self.node_startcommand)
748
749                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
750                # Remove portal interfaces that do not connect to DRAGON
751                e.interface = [i for i in e.interface \
752                        if not i.get_attribute('portal') or i.name in dinf ]
753            # Fix software paths
754            for s in getattr(e, 'software', []):
755                s.location = re.sub("^.*/", softdir, s.location)
756
757        t.substrates = [ s.clone() for s in t.substrates ]
758        t.incorporate_elements()
759
760        # Customize the ns2 output for local portal commands and images
761        filters = []
762
763        if self.dragon_endpoint:
764            add_filter = not_dragon(dragon_map)
765            filters.append(dragon_commands(dragon_map))
766        else:
767            add_filter = None
768
769        if self.portal_command:
770            filters.append(topdl.generate_portal_command_filter(
771                self.portal_command, add_filter=add_filter))
772
773        if self.portal_image:
774            filters.append(topdl.generate_portal_image_filter(
775                self.portal_image))
776
777        if self.portal_type:
778            filters.append(topdl.generate_portal_hardware_filter(
779                self.portal_type))
780
781        # Convert to ns and write it out
782        expfile = topdl.topology_to_ns2(t, filters)
783        try:
784            f = open(expfn, "w")
785            print >>f, expfile
786            f.close()
787        except EnvironmentError:
788            raise service_error(service_error.internal,
789                    "Cannot write experiment file %s: %s" % (expfn,e))
790
791    def export_store_info(self, cf, proj, ename, connInfo):
792        """
793        For the export requests in the connection info, install the peer names
794        at the experiment controller via SetValue calls.
795        """
796
797        for c in connInfo:
798            for p in [ p for p in c.get('parameter', []) \
799                    if p.get('type', '') == 'output']:
800
801                if p.get('name', '') == 'peer':
802                    k = p.get('key', None)
803                    surl = p.get('store', None)
804                    if surl and k and k.index('/') != -1:
805                        value = "%s.%s.%s%s" % \
806                                (k[k.index('/')+1:], ename, proj, self.domain)
807                        req = { 'name': k, 'value': value }
808                        self.log.debug("Setting %s to %s on %s" % \
809                                (k, value, surl))
810                        self.call_SetValue(surl, req, cf)
811                    else:
812                        self.log.error("Bad export request: %s" % p)
813                elif p.get('name', '') == 'ssh_port':
814                    k = p.get('key', None)
815                    surl = p.get('store', None)
816                    if surl and k:
817                        req = { 'name': k, 'value': self.ssh_port }
818                        self.log.debug("Setting %s to %s on %s" % \
819                                (k, self.ssh_port, surl))
820                        self.call_SetValue(surl, req, cf)
821                    else:
822                        self.log.error("Bad export request: %s" % p)
823                else:
824                    self.log.error("Unknown export parameter: %s" % \
825                            p.get('name'))
826                    continue
827
828    def add_seer_node(self, topo, name, startup):
829        """
830        Add a seer node to the given topology, with the startup command passed
831        in.  Used by configure seer_services.
832        """
833        c_node = topdl.Computer(
834                name=name, 
835                os= topdl.OperatingSystem(
836                    attribute=[
837                    { 'attribute': 'osid', 
838                        'value': self.local_seer_image },
839                    ]),
840                attribute=[
841                    { 'attribute': 'startup', 'value': startup },
842                    ]
843                )
844        self.add_kit(c_node, self.local_seer_software)
845        topo.elements.append(c_node)
846
847    def configure_seer_services(self, services, topo, softdir):
848        """
849        Make changes to the topology required for the seer requests being made.
850        Specifically, add any control or master nodes required and set up the
851        start commands on the nodes to interconnect them.
852        """
853        local_seer = False      # True if we need to add a control node
854        collect_seer = False    # True if there is a seer-master node
855        seer_master= False      # True if we need to add the seer-master
856        for s in services:
857            s_name = s.get('name', '')
858            s_vis = s.get('visibility','')
859
860            if s_name  == 'local_seer_control' and s_vis == 'export':
861                local_seer = True
862            elif s_name == 'seer_master':
863                if s_vis == 'import':
864                    collect_seer = True
865                elif s_vis == 'export':
866                    seer_master = True
867       
868        # We've got the whole picture now, so add nodes if needed and configure
869        # them to interconnect properly.
870        if local_seer or seer_master:
871            # Copy local seer control node software to the tempdir
872            for l, f in self.local_seer_software:
873                base = os.path.basename(f)
874                copy_file(f, "%s/%s" % (softdir, base))
875        # If we're collecting seers somewhere the controllers need to talk to
876        # the master.  In testbeds that export the service, that will be a
877        # local node that we'll add below.  Elsewhere it will be the control
878        # portal that will port forward to the exporting master.
879        if local_seer:
880            if collect_seer:
881                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
882            else:
883                startup = self.local_seer_start
884            self.add_seer_node(topo, 'control', startup)
885        # If this is the seer master, add that node, too.
886        if seer_master:
887            self.add_seer_node(topo, 'seer-master', 
888                    "%s -R -n -R seer-master -R -A -R sink" % \
889                            self.seer_master_start)
890
891    def retrieve_software(self, topo, certfile, softdir):
892        """
893        Collect the software that nodes in the topology need loaded and stage
894        it locally.  This implies retrieving it from the experiment_controller
895        and placing it into softdir.  Certfile is used to prove that this node
896        has access to that data (it's the allocation/segment fedid).  Finally
897        local portal and federation software is also copied to the same staging
898        directory for simplicity - all software needed for experiment creation
899        is in softdir.
900        """
901        sw = set()
902        for e in topo.elements:
903            for s in getattr(e, 'software', []):
904                sw.add(s.location)
905        for s in sw:
906            self.log.debug("Retrieving %s" % s)
907            try:
908                get_url(s, certfile, softdir)
909            except:
910                t, v, st = sys.exc_info()
911                raise service_error(service_error.internal,
912                        "Error retrieving %s: %s" % (s, v))
913
914        # Copy local federation and portal node software to the tempdir
915        for s in (self.federation_software, self.portal_software):
916            for l, f in s:
917                base = os.path.basename(f)
918                copy_file(f, "%s/%s" % (softdir, base))
919
920
921    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
922        """
923        Gather common configuration files, retrieve or create an experiment
924        name and project name, and return the ssh_key filenames.  Create an
925        allocation log bound to the state log variable as well.
926        """
927        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
928                'seer_ca_pem', 'seer_node_pem')
929        ename = None
930        pubkey_base = None
931        secretkey_base = None
932        proj = None
933        user = None
934        alloc_log = None
935        nonce_experiment = False
936        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
937
938        self.state_lock.acquire()
939        if aid in self.allocation:
940            proj = self.allocation[aid].get('project', None)
941            if not proj: 
942                proj = self.allocation[aid].get('sproject', None)
943        self.state_lock.release()
944
945        if not proj:
946            raise service_error(service_error.internal, 
947                    "Can't find project for %s" %aid)
948
949        for a in attrs:
950            if a['attribute'] in configs:
951                try:
952                    self.log.debug("Retrieving %s from %s" % \
953                            (a['attribute'], a['value']))
954                    get_url(a['value'], certfile, tmpdir)
955                except:
956                    t, v, st = sys.exc_info()
957                    raise service_error(service_error.internal,
958                            "Error retrieving %s: %s" % (a.get('value', ""), v))
959            if a['attribute'] == 'ssh_pubkey':
960                pubkey_base = a['value'].rpartition('/')[2]
961            if a['attribute'] == 'ssh_secretkey':
962                secretkey_base = a['value'].rpartition('/')[2]
963            if a['attribute'] == 'experiment_name':
964                ename = a['value']
965
966        # Names longer than the emulab max are discarded
967        if ename and len(ename) <= self.max_name_len:
968            # Clean up the experiment name so that emulab will accept it.
969            ename = re.sub(vchars_re, '-', ename)
970
971        else:
972            ename = ""
973            for i in range(0,5):
974                ename += random.choice(string.ascii_letters)
975            nonce_experiment = True
976            self.log.warn("No experiment name or suggestion too long: " + \
977                    "picked one randomly: %s" % ename)
978
979        if not pubkey_base:
980            raise service_error(service_error.req, 
981                    "No public key attribute")
982
983        if not secretkey_base:
984            raise service_error(service_error.req, 
985                    "No secret key attribute")
986
987        self.state_lock.acquire()
988        if aid in self.allocation:
989            user = self.allocation[aid].get('user', None)
990            self.allocation[aid]['experiment'] = ename
991            self.allocation[aid]['nonce'] = nonce_experiment
992            self.allocation[aid]['log'] = [ ]
993            # Create a logger that logs to the experiment's state object as
994            # well as to the main log file.
995            alloc_log = logging.getLogger('fedd.access.%s' % ename)
996            h = logging.StreamHandler(
997                    list_log.list_log(self.allocation[aid]['log']))
998            # XXX: there should be a global one of these rather than
999            # repeating the code.
1000            h.setFormatter(logging.Formatter(
1001                "%(asctime)s %(name)s %(message)s",
1002                        '%d %b %y %H:%M:%S'))
1003            alloc_log.addHandler(h)
1004            self.write_state()
1005        self.state_lock.release()
1006
1007        if not user:
1008            raise service_error(service_error.internal, 
1009                    "Can't find creation user for %s" %aid)
1010
1011        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1012
1013    def decorate_topology(self, info, t):
1014        """
1015        Copy the physical mapping and status onto the topology.  Used by
1016        StartSegment and InfoSegment
1017        """
1018        def add_new(ann, attr):
1019            for a in ann:
1020                if a not in attr: attr.append(a)
1021
1022        def merge_os(os, e):
1023            if len(e.os) == 0:
1024                # No OS at all:
1025                if os.get_attribute('emulab_access:image'):
1026                    os.set_attribute('emulab_access:initial_image', 
1027                            os.get_attribute('emulab_access:image'))
1028                e.os = [ os ]
1029            elif len(e.os) == 1:
1030                # There's one OS, copy the initial image and replace
1031                eos = e.os[0]
1032                initial = eos.get_attribute('emulab_access:initial_image')
1033                if initial:
1034                    os.set_attribute('emulab_access:initial_image', initial)
1035                e.os = [ os] 
1036            else:
1037                # Multiple OSes, replace or append
1038                for eos in e.os:
1039                    if os.name == eos.name:
1040                        eos.version = os.version
1041                        eos.version = os.distribution
1042                        eos.version = os.distributionversion
1043                        for a in os.attribute:
1044                            if eos.get_attribute(a.attribute):
1045                                eos.remove_attribute(a.attribute)
1046                            eos.set_attribute(a.attribute, a.value)
1047                        break
1048                else:
1049                    e.os.append(os)
1050
1051
1052        if t is None: return
1053        # Copy the assigned names into the return topology
1054        for e in t.elements:
1055            if isinstance(e, topdl.Computer):
1056                if not self.create_debug:
1057                    if e.name in info.node:
1058                        add_new(("%s%s" % 
1059                            (info.node[e.name].pname, self.domain),),
1060                            e.localname)
1061                        e.status = info.node[e.name].status
1062                        os = info.node[e.name].getOS()
1063                        if os: merge_os(os, e)
1064                else:
1065                    # Simple debugging assignment
1066                    add_new(("node%d%s" % (i, self.domain),), e.localname)
1067                    e.status = 'active'
1068                    add_new(('testop1', 'testop2'), e.operation)
1069                    i += 1
1070
1071
1072    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1073        """
1074        Store key bits of experiment state in the global repository, including
1075        the response that may need to be replayed, and return the response.
1076        """
1077        i = 0
1078        t = topo.clone()
1079        self.decorate_topology(starter, t)
1080        # Grab the log (this is some anal locking, but better safe than
1081        # sorry)
1082        self.state_lock.acquire()
1083        logv = "".join(self.allocation[aid]['log'])
1084        # It's possible that the StartSegment call gets retried (!).
1085        # if the 'started' key is in the allocation, we'll return it rather
1086        # than redo the setup.
1087        self.allocation[aid]['started'] = { 
1088                'allocID': alloc_id,
1089                'allocationLog': logv,
1090                'segmentdescription': { 
1091                    'topdldescription': t.to_dict()
1092                    },
1093                'proof': proof.to_dict(),
1094                }
1095        self.allocation[aid]['topo'] = t
1096        retval = copy.copy(self.allocation[aid]['started'])
1097        self.write_state()
1098        self.state_lock.release()
1099        return retval
1100   
1101    # End of StartSegment support routines
1102
1103    def StartSegment(self, req, fid):
1104        err = None  # Any service_error generated after tmpdir is created
1105        rv = None   # Return value from segment creation
1106
1107        try:
1108            req = req['StartSegmentRequestBody']
1109            auth_attr = req['allocID']['fedid']
1110            topref = req['segmentdescription']['topdldescription']
1111        except KeyError:
1112            raise service_error(server_error.req, "Badly formed request")
1113
1114        connInfo = req.get('connection', [])
1115        services = req.get('service', [])
1116        aid = "%s" % auth_attr
1117        attrs = req.get('fedAttr', [])
1118
1119        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1120                with_proof=True)
1121        if not access_ok:
1122            raise service_error(service_error.access, "Access denied")
1123        else:
1124            # See if this is a replay of an earlier succeeded StartSegment -
1125            # sometimes SSL kills 'em.  If so, replay the response rather than
1126            # redoing the allocation.
1127            self.state_lock.acquire()
1128            retval = self.allocation[aid].get('started', None)
1129            self.state_lock.release()
1130            if retval:
1131                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1132                        "replaying response")
1133                return retval
1134
1135        # A new request.  Do it.
1136
1137        if topref: topo = topdl.Topology(**topref)
1138        else:
1139            raise service_error(service_error.req, 
1140                    "Request missing segmentdescription'")
1141       
1142        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1143        try:
1144            tmpdir = tempfile.mkdtemp(prefix="access-")
1145            softdir = "%s/software" % tmpdir
1146            os.mkdir(softdir)
1147        except EnvironmentError:
1148            raise service_error(service_error.internal, "Cannot create tmp dir")
1149
1150        # Try block alllows us to clean up temporary files.
1151        try:
1152            self.retrieve_software(topo, certfile, softdir)
1153            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1154                    self.initialize_experiment_info(attrs, aid, 
1155                            certfile, tmpdir)
1156
1157            if '/' in proj: proj, gid = proj.split('/')
1158            else: gid = None
1159
1160
1161            # Set up userconf and seer if needed
1162            self.configure_userconf(services, tmpdir)
1163            self.configure_seer_services(services, topo, softdir)
1164            # Get and send synch store variables
1165            self.export_store_info(certfile, proj, ename, connInfo)
1166            self.import_store_info(certfile, connInfo)
1167
1168            expfile = "%s/experiment.tcl" % tmpdir
1169
1170            self.generate_portal_configs(topo, pubkey_base, 
1171                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1172            self.generate_ns2(topo, expfile, 
1173                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1174
1175            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1176                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1177                    cert=self.xmlrpc_cert)
1178            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1179        except service_error, e:
1180            err = e
1181        except:
1182            t, v, st = sys.exc_info()
1183            err = service_error(service_error.internal, "%s: %s" % \
1184                    (v, traceback.extract_tb(st)))
1185
1186        # Walk up tmpdir, deleting as we go
1187        if self.cleanup: self.remove_dirs(tmpdir)
1188        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1189
1190        if rv:
1191            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1192                    proof)
1193        elif err:
1194            raise service_error(service_error.federant,
1195                    "Swapin failed: %s" % err)
1196        else:
1197            raise service_error(service_error.federant, "Swapin failed")
1198
1199    def TerminateSegment(self, req, fid):
1200        try:
1201            req = req['TerminateSegmentRequestBody']
1202        except KeyError:
1203            raise service_error(server_error.req, "Badly formed request")
1204
1205        auth_attr = req['allocID']['fedid']
1206        aid = "%s" % auth_attr
1207        attrs = req.get('fedAttr', [])
1208
1209        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1210                with_proof=True)
1211        if not access_ok:
1212            raise service_error(service_error.access, "Access denied")
1213
1214        self.state_lock.acquire()
1215        if aid in self.allocation:
1216            proj = self.allocation[aid].get('project', None)
1217            if not proj: 
1218                proj = self.allocation[aid].get('sproject', None)
1219            user = self.allocation[aid].get('user', None)
1220            ename = self.allocation[aid].get('experiment', None)
1221            nonce = self.allocation[aid].get('nonce', False)
1222        else:
1223            proj = None
1224            user = None
1225            ename = None
1226            nonce = False
1227        self.state_lock.release()
1228
1229        if not proj:
1230            raise service_error(service_error.internal, 
1231                    "Can't find project for %s" % aid)
1232        else:
1233            if '/' in proj: proj, gid = proj.split('/')
1234            else: gid = None
1235
1236        if not user:
1237            raise service_error(service_error.internal, 
1238                    "Can't find creation user for %s" % aid)
1239        if not ename:
1240            raise service_error(service_error.internal, 
1241                    "Can't find experiment name for %s" % aid)
1242        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1243                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1244        stopper(self, user, proj, ename, gid, nonce)
1245        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1246
1247    def InfoSegment(self, req, fid):
1248        try:
1249            req = req['InfoSegmentRequestBody']
1250        except KeyError:
1251            raise service_error(server_error.req, "Badly formed request")
1252
1253        auth_attr = req['allocID']['fedid']
1254        aid = "%s" % auth_attr
1255
1256        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1257                with_proof=True)
1258        if not access_ok:
1259            raise service_error(service_error.access, "Access denied")
1260
1261        self.state_lock.acquire()
1262        if aid in self.allocation:
1263            topo = self.allocation[aid].get('topo', None)
1264            if topo: topo = topo.clone()
1265            proj = self.allocation[aid].get('project', None)
1266            if not proj: 
1267                proj = self.allocation[aid].get('sproject', None)
1268            user = self.allocation[aid].get('user', None)
1269            ename = self.allocation[aid].get('experiment', None)
1270        else:
1271            proj = None
1272            user = None
1273            ename = None
1274            topo = None
1275        self.state_lock.release()
1276
1277        if not proj:
1278            raise service_error(service_error.internal, 
1279                    "Can't find project for %s" % aid)
1280        else:
1281            if '/' in proj: proj, gid = proj.split('/')
1282            else: gid = None
1283
1284        if not user:
1285            raise service_error(service_error.internal, 
1286                    "Can't find creation user for %s" % aid)
1287        if not ename:
1288            raise service_error(service_error.internal, 
1289                    "Can't find experiment name for %s" % aid)
1290        info = self.info_segment(keyfile=self.ssh_privkey_file,
1291                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1292        info(self, user, proj, ename)
1293        self.decorate_topology(info, topo)
1294        self.state_lock.acquire()
1295        if aid in self.allocation:
1296            self.allocation[aid]['topo'] = topo
1297            self.write_state()
1298        self.state_lock.release()
1299
1300        rv = { 
1301                'allocID': req['allocID'], 
1302                'proof': proof.to_dict(),
1303                }
1304        if topo:
1305            rv['segmentdescription'] = { 'topdldescription' : topo.to_dict() }
1306        return rv
1307
1308    def OperationSegment(self, req, fid):
1309        def get_pname(e):
1310            """
1311            Get the physical name of a node
1312            """
1313            if e.localname:
1314                return re.sub('\..*','', e.localname[0])
1315            else:
1316                return None
1317
1318        try:
1319            req = req['OperationSegmentRequestBody']
1320        except KeyError:
1321            raise service_error(server_error.req, "Badly formed request")
1322
1323        auth_attr = req['allocID']['fedid']
1324        aid = "%s" % auth_attr
1325
1326        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1327                with_proof=True)
1328        if not access_ok:
1329            raise service_error(service_error.access, "Access denied")
1330
1331        op = req.get('operation', None)
1332        targets = req.get('target', None)
1333        params = req.get('parameter', None)
1334
1335        if op is None :
1336            raise service_error(service_error.req, "missing operation")
1337        elif targets is None:
1338            raise service_error(service_error.req, "no targets")
1339
1340        if aid in self.allocation:
1341            topo = self.allocation[aid].get('topo', None)
1342            if topo: topo = topo.clone()
1343        else:
1344            topo = None
1345
1346        targets = copy.copy(targets)
1347        ptargets = { }
1348        for e in topo.elements:
1349            if isinstance(e, topdl.Computer):
1350                if e.name in targets:
1351                    targets.remove(e.name)
1352                    pn = get_pname(e)
1353                    if pn: ptargets[e.name] = pn
1354
1355        status = [ operation_status(t, operation_status.no_target) \
1356                for t in targets]
1357
1358        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1359                debug=self.create_debug, boss=self.boss, 
1360                cert=self.xmlrpc_cert)
1361        ops(self, op, ptargets, params, topo)
1362       
1363        status.extend(ops.status)
1364
1365        return { 
1366                'allocID': req['allocID'], 
1367                'status': [s.to_dict() for s in status],
1368                'proof': proof.to_dict(),
1369                }
Note: See TracBrowser for help on using the repository browser.