source: fedd/federation/emulab_access.py @ db3da0b

compt_changesinfo-ops
Last change on this file since db3da0b was 1ae1aa2, checked in by Ted Faber <faber@…>, 13 years ago

Reload interface works

  • Property mode set to 100644
File size: 43.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    max_name_len = 19
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        access_base.__init__(self, config, auth)
61
62        self.max_name_len = access.max_name_len
63
64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75        self.seer_master_start = config.get("access", "seer_master_start")
76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
78        self.ssh_port = config.get("access","ssh_port") or "22"
79        self.boss = config.get("access", "boss")
80        self.ops = config.get("access", "ops")
81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103            self.info_segment = proxy_emulab_segment.info_segment
104            self.operation_segment = proxy_emulab_segment.operation_segment
105        elif self.access_type == 'local_emulab':
106            self.start_segment = local_emulab_segment.start_segment
107            self.stop_segment = local_emulab_segment.stop_segment
108            self.info_segment = local_emulab_segment.info_segment
109            self.operation_segment = local_emulab_segment.operation_segment
110        else:
111            self.start_segment = None
112            self.stop_segment = None
113            self.info_segment = None
114            self.operation_segment = None
115
116        self.restricted = [ ]
117        tb = config.get('access', 'testbed')
118        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
119        else: self.testbed = [ ]
120
121        # authorization information
122        self.auth_type = config.get('access', 'auth_type') \
123                or 'legacy'
124        self.auth_dir = config.get('access', 'auth_dir')
125        accessdb = config.get("access", "accessdb")
126        # initialize the authorization system
127        if self.auth_type == 'legacy':
128            self.access = { }
129            if accessdb:
130                self.legacy_read_access(accessdb, self.legacy_access_tuple)
131        elif self.auth_type == 'abac':
132            self.auth = abac_authorizer(load=self.auth_dir)
133            self.access = [ ]
134            if accessdb:
135                self.read_access(accessdb, self.access_tuple)
136        else:
137            raise service_error(service_error.internal, 
138                    "Unknown auth_type: %s" % self.auth_type)
139
140        # read_state in the base_class
141        self.state_lock.acquire()
142        for a  in ('allocation', 'projects', 'keys', 'types'):
143            if a not in self.state:
144                self.state[a] = { }
145        self.allocation = self.state['allocation']
146        self.projects = self.state['projects']
147        self.keys = self.state['keys']
148        self.types = self.state['types']
149        if self.auth_type == "legacy": 
150            # Add the ownership attributes to the authorizer.  Note that the
151            # indices of the allocation dict are strings, but the attributes are
152            # fedids, so there is a conversion.
153            for k in self.allocation.keys():
154                for o in self.allocation[k].get('owners', []):
155                    self.auth.set_attribute(o, fedid(hexstr=k))
156                if self.allocation[k].has_key('userconfig'):
157                    sfid = self.allocation[k]['userconfig']
158                    fid = fedid(hexstr=sfid)
159                    self.auth.set_attribute(fid, "/%s" % sfid)
160        self.state_lock.release()
161        self.exports = {
162                'SMB': self.export_SMB,
163                'seer': self.export_seer,
164                'tmcd': self.export_tmcd,
165                'userconfig': self.export_userconfig,
166                'project_export': self.export_project_export,
167                'local_seer_control': self.export_local_seer,
168                'seer_master': self.export_seer_master,
169                'hide_hosts': self.export_hide_hosts,
170                }
171
172        if not self.local_seer_image or not self.local_seer_software or \
173                not self.local_seer_start:
174            if 'local_seer_control' in self.exports:
175                del self.exports['local_seer_control']
176
177        if not self.local_seer_image or not self.local_seer_software or \
178                not self.seer_master_start:
179            if 'seer_master' in self.exports:
180                del self.exports['seer_master']
181
182
183        self.soap_services = {\
184            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
185            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
186            'StartSegment': soap_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': soap_handler("TerminateSegment",
188                self.TerminateSegment),
189            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
190            'OperationSegment': soap_handler("OperationSegment",
191                self.OperationSegment),
192            }
193        self.xmlrpc_services =  {\
194            'RequestAccess': xmlrpc_handler('RequestAccess',
195                self.RequestAccess),
196            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
197                self.ReleaseAccess),
198            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
199            'TerminateSegment': xmlrpc_handler('TerminateSegment',
200                self.TerminateSegment),
201            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
202            'OperationSegment': xmlrpc_handler("OperationSegment",
203                self.OperationSegment),
204            }
205
206        self.call_SetValue = service_caller('SetValue')
207        self.call_GetValue = service_caller('GetValue', log=self.log)
208
209        if not config.has_option("allocate", "uri"):
210            self.allocate_project = \
211                allocate_project_local(config, auth)
212        else:
213            self.allocate_project = \
214                allocate_project_remote(config, auth)
215
216
217        # If the project allocator exports services, put them in this object's
218        # maps so that classes that instantiate this can call the services.
219        self.soap_services.update(self.allocate_project.soap_services)
220        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
221
222    @staticmethod
223    def legacy_access_tuple(str):
224        """
225        Convert a string of the form (id[:resources:resouces], id, id) into a
226        tuple of the form (project, user, user) where users may be names or
227        fedids.  The resources strings are obsolete and ignored.
228        """
229        def parse_name(n):
230            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
231            else: return n
232
233        str = str.strip()
234        if str.startswith('(') and str.endswith(')'):
235            str = str[1:-1]
236            names = [ s.strip() for s in str.split(",")]
237            if len(names) > 3:
238                raise self.parse_error("More than three fields in name")
239            first = names[0].split(":")
240            if first == 'fedid:':
241                del first[0]
242                first[0] = fedid(hexstr=first[0])
243            names[0] = first[0]
244
245            for i in range(1,2):
246                names[i] = parse_name(names[i])
247
248            return tuple(names)
249        else:
250            raise self.parse_error('Bad mapping (unbalanced parens)')
251
252    @staticmethod
253    def access_tuple(str):
254        """
255        Convert a string of the form (id, id) into an access_project.  This is
256        called by read_access to convert to local attributes.  It returns
257        a tuple of the form (project, user, user) where the two users are
258        always the same.
259        """
260
261        str = str.strip()
262        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
263            # The slice takes the parens off the string.
264            proj, user = str[1:-1].split(',')
265            return (proj.strip(), user.strip(), user.strip())
266        else:
267            raise self.parse_error(
268                    'Bad mapping (unbalanced parens or more than 1 comma)')
269
270    # RequestAccess support routines
271
272    def legacy_lookup_access(self, req, fid):
273        """
274        Look up the local access control information mapped to this fedid and
275        credentials.  In this case it is a (project, create_user, access_user)
276        triple, and a triple of three booleans indicating which, if any will
277        need to be dynamically created.  Finally a list of owners for that
278        allocation is returned.
279
280        lookup_access_base pulls the first triple out, and it is parsed by this
281        routine into the boolean map.  Owners is always the controlling fedid.
282        """
283        # Return values
284        rp = None
285        ru = None
286        # This maps a valid user to the Emulab projects and users to use
287        found, match = self.legacy_lookup_access_base(req, fid)
288        tb, project, user = match
289       
290        if found == None:
291            raise service_error(service_error.access,
292                    "Access denied - cannot map access")
293
294        # resolve <dynamic> and <same> in found
295        dyn_proj = False
296        dyn_create_user = False
297        dyn_service_user = False
298
299        if found[0] == "<same>":
300            if project != None:
301                rp = project
302            else : 
303                raise service_error(\
304                        service_error.server_config,
305                        "Project matched <same> when no project given")
306        elif found[0] == "<dynamic>":
307            rp = None
308            dyn_proj = True
309        else:
310            rp = found[0]
311
312        if found[1] == "<same>":
313            if user_match == "<any>":
314                if user != None: rcu = user[0]
315                else: raise service_error(\
316                        service_error.server_config,
317                        "Matched <same> on anonymous request")
318            else:
319                rcu = user_match
320        elif found[1] == "<dynamic>":
321            rcu = None
322            dyn_create_user = True
323        else:
324            rcu = found[1]
325       
326        if found[2] == "<same>":
327            if user_match == "<any>":
328                if user != None: rsu = user[0]
329                else: raise service_error(\
330                        service_error.server_config,
331                        "Matched <same> on anonymous request")
332            else:
333                rsu = user_match
334        elif found[2] == "<dynamic>":
335            rsu = None
336            dyn_service_user = True
337        else:
338            rsu = found[2]
339
340        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
341                [ fid ]
342
343    def do_project_allocation(self, dyn, project, user):
344        """
345        Call the project allocation routines and return the info.
346        """
347        if dyn: 
348            # Compose the dynamic project request
349            # (only dynamic, dynamic currently allowed)
350            preq = { 'AllocateProjectRequestBody': \
351                        { 'project' : {\
352                            'user': [ \
353                            { \
354                                'access': [ { 
355                                    'sshPubkey': self.ssh_pubkey_file } ],
356                                 'role': "serviceAccess",\
357                            }, \
358                            { \
359                                'access': [ { 
360                                    'sshPubkey': self.ssh_pubkey_file } ],
361                                 'role': "experimentCreation",\
362                            }, \
363                            ], \
364                            }\
365                        }\
366                    }
367            return self.allocate_project.dynamic_project(preq)
368        else:
369            preq = {'StaticProjectRequestBody' : \
370                    { 'project': \
371                        { 'name' : { 'localname' : project },\
372                          'user' : [ \
373                            {\
374                                'userID': { 'localname' : user }, \
375                                'access': [ { 
376                                    'sshPubkey': self.ssh_pubkey_file } ],
377                                'role': 'experimentCreation'\
378                            },\
379                            {\
380                                'userID': { 'localname' : user}, \
381                                'access': [ { 
382                                    'sshPubkey': self.ssh_pubkey_file } ],
383                                'role': 'serviceAccess'\
384                            },\
385                        ]}\
386                    }\
387            }
388            return self.allocate_project.static_project(preq)
389
390    def save_project_state(self, aid, ap, dyn, owners):
391        """
392        Parse out and save the information relevant to the project created for
393        this experiment.  That info is largely in ap and owners.  dyn indicates
394        that the project was created dynamically.  Return the user and project
395        names.
396        """
397        self.state_lock.acquire()
398        self.allocation[aid] = { }
399        self.allocation[aid]['auth'] = set()
400        try:
401            pname = ap['project']['name']['localname']
402        except KeyError:
403            pname = None
404
405        if dyn:
406            if not pname:
407                self.state_lock.release()
408                raise service_error(service_error.internal,
409                        "Misformed allocation response?")
410            if pname in self.projects: self.projects[pname] += 1
411            else: self.projects[pname] = 1
412            self.allocation[aid]['project'] = pname
413        else:
414            # sproject is a static project associated with this allocation.
415            self.allocation[aid]['sproject'] = pname
416
417        self.allocation[aid]['keys'] = [ ]
418
419        try:
420            for u in ap['project']['user']:
421                uname = u['userID']['localname']
422                if u['role'] == 'experimentCreation':
423                    self.allocation[aid]['user'] = uname
424                for k in [ k['sshPubkey'] for k in u['access'] \
425                        if k.has_key('sshPubkey') ]:
426                    kv = "%s:%s" % (uname, k)
427                    if self.keys.has_key(kv): self.keys[kv] += 1
428                    else: self.keys[kv] = 1
429                    self.allocation[aid]['keys'].append((uname, k))
430        except KeyError:
431            self.state_lock.release()
432            raise service_error(service_error.internal,
433                    "Misformed allocation response?")
434
435        self.allocation[aid]['owners'] = owners
436        self.write_state()
437        self.state_lock.release()
438        return (pname, uname)
439
440    # End of RequestAccess support routines
441
442    def RequestAccess(self, req, fid):
443        """
444        Handle the access request.  Proxy if not for us.
445
446        Parse out the fields and make the allocations or rejections if for us,
447        otherwise, assuming we're willing to proxy, proxy the request out.
448        """
449
450        def gateway_hardware(h):
451            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
452            else: return h
453
454        def get_export_project(svcs):
455            """
456            if the service requests includes one to export a project, return
457            that project.
458            """
459            rv = None
460            for s in svcs:
461                if s.get('name', '') == 'project_export' and \
462                        s.get('visibility', '') == 'export':
463                    if not rv: 
464                        for a in s.get('fedAttr', []):
465                            if a.get('attribute', '') == 'project' \
466                                    and 'value' in a:
467                                rv = a['value']
468                    else:
469                        raise service_error(service_error, access, 
470                                'Requesting multiple project exports is ' + \
471                                        'not supported');
472            return rv
473
474        # The dance to get into the request body
475        if req.has_key('RequestAccessRequestBody'):
476            req = req['RequestAccessRequestBody']
477        else:
478            raise service_error(service_error.req, "No request!?")
479
480        # if this includes a project export request, construct a filter such
481        # that only the ABAC attributes mapped to that project are checked for
482        # access.
483        if 'service' in req:
484            ep = get_export_project(req['service'])
485            if ep: pf = lambda(a): a.value[0] == ep
486            else: pf = None
487        else:
488            ep = None
489            pf = None
490
491        if self.auth.import_credentials(
492                data_list=req.get('abac_credential', [])):
493            self.auth.save()
494
495        if self.auth_type == "legacy":
496            found, dyn, owners= self.legacy_lookup_access(req, fid)
497            proof = access_proof("me", fid, "create")
498        elif self.auth_type == 'abac':
499            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
500        else:
501            raise service_error(service_error.internal, 
502                    'Unknown auth_type: %s' % self.auth_type)
503        ap = None
504
505        # This only happens in legacy lookups, but if this user has access to
506        # the testbed but not the project to be exported, raise the error.
507        if ep and ep != found[0]:
508            raise service_error(service_error.access,
509                    "Cannot export %s" % ep)
510
511        if self.ssh_pubkey_file:
512            ap = self.do_project_allocation(dyn[1], found[0], found[1])
513        else:
514            raise service_error(service_error.internal, 
515                    "SSH access parameters required")
516        # keep track of what's been added
517        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
518        aid = unicode(allocID)
519
520        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
521
522        services, svc_state = self.export_services(req.get('service',[]),
523                pname, uname)
524        self.state_lock.acquire()
525        # Store services state in global state
526        for k, v in svc_state.items():
527            self.allocation[aid][k] = v
528        self.append_allocation_authorization(aid, 
529                set([(o, allocID) for o in owners]), state_attr='allocation')
530        self.write_state()
531        self.state_lock.release()
532        try:
533            f = open("%s/%s.pem" % (self.certdir, aid), "w")
534            print >>f, alloc_cert
535            f.close()
536        except EnvironmentError, e:
537            raise service_error(service_error.internal, 
538                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
539        resp = self.build_access_response({ 'fedid': allocID } ,
540                ap, services, proof)
541        return resp
542
543    def do_release_project(self, del_project, del_users, del_types):
544        """
545        If a project and users has to be deleted, make the call.
546        """
547        msg = { 'project': { }}
548        if del_project:
549            msg['project']['name']= {'localname': del_project}
550        users = [ ]
551        for u in del_users.keys():
552            users.append({ 'userID': { 'localname': u },\
553                'access' :  \
554                        [ {'sshPubkey' : s } for s in del_users[u]]\
555            })
556        if users: 
557            msg['project']['user'] = users
558        if len(del_types) > 0:
559            msg['resources'] = { 'node': \
560                    [ {'hardware': [ h ] } for h in del_types ]\
561                }
562        if self.allocate_project.release_project:
563            msg = { 'ReleaseProjectRequestBody' : msg}
564            self.allocate_project.release_project(msg)
565
566    def ReleaseAccess(self, req, fid):
567        # The dance to get into the request body
568        if req.has_key('ReleaseAccessRequestBody'):
569            req = req['ReleaseAccessRequestBody']
570        else:
571            raise service_error(service_error.req, "No request!?")
572
573        try:
574            if req['allocID'].has_key('localname'):
575                auth_attr = aid = req['allocID']['localname']
576            elif req['allocID'].has_key('fedid'):
577                aid = unicode(req['allocID']['fedid'])
578                auth_attr = req['allocID']['fedid']
579            else:
580                raise service_error(service_error.req,
581                        "Only localnames and fedids are understood")
582        except KeyError:
583            raise service_error(service_error.req, "Badly formed request")
584
585        self.log.debug("[access] deallocation requested for %s by %s" % \
586                (aid, fid))
587        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
588                with_proof=True)
589        if not access_ok:
590            self.log.debug("[access] deallocation denied for %s", aid)
591            raise service_error(service_error.access, "Access Denied")
592
593        # If we know this allocation, reduce the reference counts and
594        # remove the local allocations.  Otherwise report an error.  If
595        # there is an allocation to delete, del_users will be a dictonary
596        # of sets where the key is the user that owns the keys in the set.
597        # We use a set to avoid duplicates.  del_project is just the name
598        # of any dynamic project to delete.  We're somewhat lazy about
599        # deleting authorization attributes.  Having access to something
600        # that doesn't exist isn't harmful.
601        del_users = { }
602        del_project = None
603        del_types = set()
604
605        self.state_lock.acquire()
606        if aid in self.allocation:
607            self.log.debug("Found allocation for %s" %aid)
608            self.clear_allocation_authorization(aid, state_attr='allocation')
609            for k in self.allocation[aid]['keys']:
610                kk = "%s:%s" % k
611                self.keys[kk] -= 1
612                if self.keys[kk] == 0:
613                    if not del_users.has_key(k[0]):
614                        del_users[k[0]] = set()
615                    del_users[k[0]].add(k[1])
616                    del self.keys[kk]
617
618            if 'project' in self.allocation[aid]:
619                pname = self.allocation[aid]['project']
620                self.projects[pname] -= 1
621                if self.projects[pname] == 0:
622                    del_project = pname
623                    del self.projects[pname]
624
625            if 'types' in self.allocation[aid]:
626                for t in self.allocation[aid]['types']:
627                    self.types[t] -= 1
628                    if self.types[t] == 0:
629                        if not del_project: del_project = t[0]
630                        del_types.add(t[1])
631                        del self.types[t]
632
633            del self.allocation[aid]
634            self.write_state()
635            self.state_lock.release()
636            # If we actually have resources to deallocate, prepare the call.
637            if del_project or del_users:
638                self.do_release_project(del_project, del_users, del_types)
639            # And remove the access cert
640            cf = "%s/%s.pem" % (self.certdir, aid)
641            self.log.debug("Removing %s" % cf)
642            os.remove(cf)
643            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
644        else:
645            self.state_lock.release()
646            raise service_error(service_error.req, "No such allocation")
647
648    # These are subroutines for StartSegment
649    def generate_ns2(self, topo, expfn, softdir, connInfo):
650        """
651        Convert topo into an ns2 file, decorated with appropriate commands for
652        the particular testbed setup.  Convert all requests for software, etc
653        to point at the staged copies on this testbed and add the federation
654        startcommands.
655        """
656        class dragon_commands:
657            """
658            Functor to spit out approrpiate dragon commands for nodes listed in
659            the connectivity description.  The constructor makes a dict mapping
660            dragon nodes to their parameters and the __call__ checks each
661            element in turn for membership.
662            """
663            def __init__(self, map):
664                self.node_info = map
665
666            def __call__(self, e):
667                s = ""
668                if isinstance(e, topdl.Computer):
669                    if self.node_info.has_key(e.name):
670                        info = self.node_info[e.name]
671                        for ifname, vlan, type in info:
672                            for i in e.interface:
673                                if i.name == ifname:
674                                    addr = i.get_attribute('ip4_address')
675                                    subs = i.substrate[0]
676                                    break
677                            else:
678                                raise service_error(service_error.internal,
679                                        "No interface %s on element %s" % \
680                                                (ifname, e.name))
681                            # XXX: do netmask right
682                            if type =='link':
683                                s = ("tb-allow-external ${%s} " + \
684                                        "dragonportal ip %s vlan %s " + \
685                                        "netmask 255.255.255.0\n") % \
686                                        (topdl.to_tcl_name(e.name), addr, vlan)
687                            elif type =='lan':
688                                s = ("tb-allow-external ${%s} " + \
689                                        "dragonportal " + \
690                                        "ip %s vlan %s usurp %s\n") % \
691                                        (topdl.to_tcl_name(e.name), addr, 
692                                                vlan, subs)
693                            else:
694                                raise service_error(service_error_internal,
695                                        "Unknown DRAGON type %s" % type)
696                return s
697
698        class not_dragon:
699            """
700            Return true if a node is in the given map of dragon nodes.
701            """
702            def __init__(self, map):
703                self.nodes = set(map.keys())
704
705            def __call__(self, e):
706                return e.name not in self.nodes
707
708        # Main line of generate_ns2
709        t = topo.clone()
710
711        # Create the map of nodes that need direct connections (dragon
712        # connections) from the connInfo
713        dragon_map = { }
714        for i in [ i for i in connInfo if i['type'] == 'transit']:
715            for a in i.get('fedAttr', []):
716                if a['attribute'] == 'vlan_id':
717                    vlan = a['value']
718                    break
719            else:
720                raise service_error(service_error.internal, "No vlan tag")
721            members = i.get('member', [])
722            if len(members) > 1: type = 'lan'
723            else: type = 'link'
724
725            try:
726                for m in members:
727                    if m['element'] in dragon_map:
728                        dragon_map[m['element']].append(( m['interface'], 
729                            vlan, type))
730                    else:
731                        dragon_map[m['element']] = [( m['interface'], 
732                            vlan, type),]
733            except KeyError:
734                raise service_error(service_error.req,
735                        "Missing connectivity info")
736
737        # Weed out the things we aren't going to instantiate: Segments, portal
738        # substrates, and portal interfaces.  (The copy in the for loop allows
739        # us to delete from e.elements in side the for loop).  While we're
740        # touching all the elements, we also adjust paths from the original
741        # testbed to local testbed paths and put the federation commands into
742        # the start commands
743        for e in [e for e in t.elements]:
744            if isinstance(e, topdl.Segment):
745                t.elements.remove(e)
746            if isinstance(e, topdl.Computer):
747                self.add_kit(e, self.federation_software)
748                if e.get_attribute('portal') and self.portal_startcommand:
749                    # Add local portal support software
750                    self.add_kit(e, self.portal_software)
751                    # Portals never have a user-specified start command
752                    e.set_attribute('startup', self.portal_startcommand)
753                elif self.node_startcommand:
754                    if e.get_attribute('startup'):
755                        e.set_attribute('startup', "%s \\$USER '%s'" % \
756                                (self.node_startcommand, 
757                                    e.get_attribute('startup')))
758                    else:
759                        e.set_attribute('startup', self.node_startcommand)
760
761                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
762                # Remove portal interfaces that do not connect to DRAGON
763                e.interface = [i for i in e.interface \
764                        if not i.get_attribute('portal') or i.name in dinf ]
765            # Fix software paths
766            for s in getattr(e, 'software', []):
767                s.location = re.sub("^.*/", softdir, s.location)
768
769        t.substrates = [ s.clone() for s in t.substrates ]
770        t.incorporate_elements()
771
772        # Customize the ns2 output for local portal commands and images
773        filters = []
774
775        if self.dragon_endpoint:
776            add_filter = not_dragon(dragon_map)
777            filters.append(dragon_commands(dragon_map))
778        else:
779            add_filter = None
780
781        if self.portal_command:
782            filters.append(topdl.generate_portal_command_filter(
783                self.portal_command, add_filter=add_filter))
784
785        if self.portal_image:
786            filters.append(topdl.generate_portal_image_filter(
787                self.portal_image))
788
789        if self.portal_type:
790            filters.append(topdl.generate_portal_hardware_filter(
791                self.portal_type))
792
793        # Convert to ns and write it out
794        expfile = topdl.topology_to_ns2(t, filters)
795        try:
796            f = open(expfn, "w")
797            print >>f, expfile
798            f.close()
799        except EnvironmentError:
800            raise service_error(service_error.internal,
801                    "Cannot write experiment file %s: %s" % (expfn,e))
802
803    def export_store_info(self, cf, proj, ename, connInfo):
804        """
805        For the export requests in the connection info, install the peer names
806        at the experiment controller via SetValue calls.
807        """
808
809        for c in connInfo:
810            for p in [ p for p in c.get('parameter', []) \
811                    if p.get('type', '') == 'output']:
812
813                if p.get('name', '') == 'peer':
814                    k = p.get('key', None)
815                    surl = p.get('store', None)
816                    if surl and k and k.index('/') != -1:
817                        value = "%s.%s.%s%s" % \
818                                (k[k.index('/')+1:], ename, proj, self.domain)
819                        req = { 'name': k, 'value': value }
820                        self.log.debug("Setting %s to %s on %s" % \
821                                (k, value, surl))
822                        self.call_SetValue(surl, req, cf)
823                    else:
824                        self.log.error("Bad export request: %s" % p)
825                elif p.get('name', '') == 'ssh_port':
826                    k = p.get('key', None)
827                    surl = p.get('store', None)
828                    if surl and k:
829                        req = { 'name': k, 'value': self.ssh_port }
830                        self.log.debug("Setting %s to %s on %s" % \
831                                (k, self.ssh_port, surl))
832                        self.call_SetValue(surl, req, cf)
833                    else:
834                        self.log.error("Bad export request: %s" % p)
835                else:
836                    self.log.error("Unknown export parameter: %s" % \
837                            p.get('name'))
838                    continue
839
840    def add_seer_node(self, topo, name, startup):
841        """
842        Add a seer node to the given topology, with the startup command passed
843        in.  Used by configure seer_services.
844        """
845        c_node = topdl.Computer(
846                name=name, 
847                os= topdl.OperatingSystem(
848                    attribute=[
849                    { 'attribute': 'osid', 
850                        'value': self.local_seer_image },
851                    ]),
852                attribute=[
853                    { 'attribute': 'startup', 'value': startup },
854                    ]
855                )
856        self.add_kit(c_node, self.local_seer_software)
857        topo.elements.append(c_node)
858
859    def configure_seer_services(self, services, topo, softdir):
860        """
861        Make changes to the topology required for the seer requests being made.
862        Specifically, add any control or master nodes required and set up the
863        start commands on the nodes to interconnect them.
864        """
865        local_seer = False      # True if we need to add a control node
866        collect_seer = False    # True if there is a seer-master node
867        seer_master= False      # True if we need to add the seer-master
868        for s in services:
869            s_name = s.get('name', '')
870            s_vis = s.get('visibility','')
871
872            if s_name  == 'local_seer_control' and s_vis == 'export':
873                local_seer = True
874            elif s_name == 'seer_master':
875                if s_vis == 'import':
876                    collect_seer = True
877                elif s_vis == 'export':
878                    seer_master = True
879       
880        # We've got the whole picture now, so add nodes if needed and configure
881        # them to interconnect properly.
882        if local_seer or seer_master:
883            # Copy local seer control node software to the tempdir
884            for l, f in self.local_seer_software:
885                base = os.path.basename(f)
886                copy_file(f, "%s/%s" % (softdir, base))
887        # If we're collecting seers somewhere the controllers need to talk to
888        # the master.  In testbeds that export the service, that will be a
889        # local node that we'll add below.  Elsewhere it will be the control
890        # portal that will port forward to the exporting master.
891        if local_seer:
892            if collect_seer:
893                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
894            else:
895                startup = self.local_seer_start
896            self.add_seer_node(topo, 'control', startup)
897        # If this is the seer master, add that node, too.
898        if seer_master:
899            self.add_seer_node(topo, 'seer-master', 
900                    "%s -R -n -R seer-master -R -A -R sink" % \
901                            self.seer_master_start)
902
903    def retrieve_software(self, topo, certfile, softdir):
904        """
905        Collect the software that nodes in the topology need loaded and stage
906        it locally.  This implies retrieving it from the experiment_controller
907        and placing it into softdir.  Certfile is used to prove that this node
908        has access to that data (it's the allocation/segment fedid).  Finally
909        local portal and federation software is also copied to the same staging
910        directory for simplicity - all software needed for experiment creation
911        is in softdir.
912        """
913        sw = set()
914        for e in topo.elements:
915            for s in getattr(e, 'software', []):
916                sw.add(s.location)
917        for s in sw:
918            self.log.debug("Retrieving %s" % s)
919            try:
920                get_url(s, certfile, softdir)
921            except:
922                t, v, st = sys.exc_info()
923                raise service_error(service_error.internal,
924                        "Error retrieving %s: %s" % (s, v))
925
926        # Copy local federation and portal node software to the tempdir
927        for s in (self.federation_software, self.portal_software):
928            for l, f in s:
929                base = os.path.basename(f)
930                copy_file(f, "%s/%s" % (softdir, base))
931
932
933    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
934        """
935        Gather common configuration files, retrieve or create an experiment
936        name and project name, and return the ssh_key filenames.  Create an
937        allocation log bound to the state log variable as well.
938        """
939        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
940                'seer_ca_pem', 'seer_node_pem')
941        ename = None
942        pubkey_base = None
943        secretkey_base = None
944        proj = None
945        user = None
946        alloc_log = None
947        nonce_experiment = False
948        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
949
950        self.state_lock.acquire()
951        if aid in self.allocation:
952            proj = self.allocation[aid].get('project', None)
953            if not proj: 
954                proj = self.allocation[aid].get('sproject', None)
955        self.state_lock.release()
956
957        if not proj:
958            raise service_error(service_error.internal, 
959                    "Can't find project for %s" %aid)
960
961        for a in attrs:
962            if a['attribute'] in configs:
963                try:
964                    self.log.debug("Retrieving %s from %s" % \
965                            (a['attribute'], a['value']))
966                    get_url(a['value'], certfile, tmpdir)
967                except:
968                    t, v, st = sys.exc_info()
969                    raise service_error(service_error.internal,
970                            "Error retrieving %s: %s" % (a.get('value', ""), v))
971            if a['attribute'] == 'ssh_pubkey':
972                pubkey_base = a['value'].rpartition('/')[2]
973            if a['attribute'] == 'ssh_secretkey':
974                secretkey_base = a['value'].rpartition('/')[2]
975            if a['attribute'] == 'experiment_name':
976                ename = a['value']
977
978        # Names longer than the emulab max are discarded
979        if ename and len(ename) <= self.max_name_len:
980            # Clean up the experiment name so that emulab will accept it.
981            ename = re.sub(vchars_re, '-', ename)
982
983        else:
984            ename = ""
985            for i in range(0,5):
986                ename += random.choice(string.ascii_letters)
987            nonce_experiment = True
988            self.log.warn("No experiment name or suggestion too long: " + \
989                    "picked one randomly: %s" % ename)
990
991        if not pubkey_base:
992            raise service_error(service_error.req, 
993                    "No public key attribute")
994
995        if not secretkey_base:
996            raise service_error(service_error.req, 
997                    "No secret key attribute")
998
999        self.state_lock.acquire()
1000        if aid in self.allocation:
1001            user = self.allocation[aid].get('user', None)
1002            self.allocation[aid]['experiment'] = ename
1003            self.allocation[aid]['nonce'] = nonce_experiment
1004            self.allocation[aid]['log'] = [ ]
1005            # Create a logger that logs to the experiment's state object as
1006            # well as to the main log file.
1007            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1008            h = logging.StreamHandler(
1009                    list_log.list_log(self.allocation[aid]['log']))
1010            # XXX: there should be a global one of these rather than
1011            # repeating the code.
1012            h.setFormatter(logging.Formatter(
1013                "%(asctime)s %(name)s %(message)s",
1014                        '%d %b %y %H:%M:%S'))
1015            alloc_log.addHandler(h)
1016            self.write_state()
1017        self.state_lock.release()
1018
1019        if not user:
1020            raise service_error(service_error.internal, 
1021                    "Can't find creation user for %s" %aid)
1022
1023        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1024
1025    def decorate_topology(self, info, t):
1026        """
1027        Copy the physical mapping and status onto the topology.  Used by
1028        StartSegment and InfoSegment
1029        """
1030        def add_new(ann, attr):
1031            for a in ann:
1032                if a not in attr: attr.append(a)
1033
1034        # Copy the assigned names into the return topology
1035        for e in t.elements:
1036            if isinstance(e, topdl.Computer):
1037                if not self.create_debug:
1038                    if e.name in info.node:
1039                        add_new(("%s%s" % 
1040                            (info.node[e.name].pname, self.domain),),
1041                            e.localname)
1042                        e.status = info.node[e.name].status
1043                        os = info.node[e.name].getOS()
1044                        if os: e.os = [ os ]
1045                else:
1046                    # Simple debugging assignment
1047                    add_new(("node%d%s" % (i, self.domain),), e.localname)
1048                    e.status = 'active'
1049                    add_new(('testop1', 'testop2'), e.operation)
1050                    i += 1
1051
1052
1053    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1054        """
1055        Store key bits of experiment state in the global repository, including
1056        the response that may need to be replayed, and return the response.
1057        """
1058        i = 0
1059        t = topo.clone()
1060        self.decorate_topology(starter, t)
1061        # Grab the log (this is some anal locking, but better safe than
1062        # sorry)
1063        self.state_lock.acquire()
1064        logv = "".join(self.allocation[aid]['log'])
1065        # It's possible that the StartSegment call gets retried (!).
1066        # if the 'started' key is in the allocation, we'll return it rather
1067        # than redo the setup.
1068        self.allocation[aid]['started'] = { 
1069                'allocID': alloc_id,
1070                'allocationLog': logv,
1071                'segmentdescription': { 
1072                    'topdldescription': t.to_dict()
1073                    },
1074                'proof': proof.to_dict(),
1075                }
1076        self.allocation[aid]['topo'] = t
1077        retval = copy.copy(self.allocation[aid]['started'])
1078        self.write_state()
1079        self.state_lock.release()
1080        return retval
1081   
1082    # End of StartSegment support routines
1083
1084    def StartSegment(self, req, fid):
1085        err = None  # Any service_error generated after tmpdir is created
1086        rv = None   # Return value from segment creation
1087
1088        try:
1089            req = req['StartSegmentRequestBody']
1090            auth_attr = req['allocID']['fedid']
1091            topref = req['segmentdescription']['topdldescription']
1092        except KeyError:
1093            raise service_error(server_error.req, "Badly formed request")
1094
1095        connInfo = req.get('connection', [])
1096        services = req.get('service', [])
1097        aid = "%s" % auth_attr
1098        attrs = req.get('fedAttr', [])
1099
1100        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1101                with_proof=True)
1102        if not access_ok:
1103            raise service_error(service_error.access, "Access denied")
1104        else:
1105            # See if this is a replay of an earlier succeeded StartSegment -
1106            # sometimes SSL kills 'em.  If so, replay the response rather than
1107            # redoing the allocation.
1108            self.state_lock.acquire()
1109            retval = self.allocation[aid].get('started', None)
1110            self.state_lock.release()
1111            if retval:
1112                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1113                        "replaying response")
1114                return retval
1115
1116        # A new request.  Do it.
1117
1118        if topref: topo = topdl.Topology(**topref)
1119        else:
1120            raise service_error(service_error.req, 
1121                    "Request missing segmentdescription'")
1122       
1123        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1124        try:
1125            tmpdir = tempfile.mkdtemp(prefix="access-")
1126            softdir = "%s/software" % tmpdir
1127            os.mkdir(softdir)
1128        except EnvironmentError:
1129            raise service_error(service_error.internal, "Cannot create tmp dir")
1130
1131        # Try block alllows us to clean up temporary files.
1132        try:
1133            self.retrieve_software(topo, certfile, softdir)
1134            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1135                    self.initialize_experiment_info(attrs, aid, 
1136                            certfile, tmpdir)
1137
1138            if '/' in proj: proj, gid = proj.split('/')
1139            else: gid = None
1140
1141
1142            # Set up userconf and seer if needed
1143            self.configure_userconf(services, tmpdir)
1144            self.configure_seer_services(services, topo, softdir)
1145            # Get and send synch store variables
1146            self.export_store_info(certfile, proj, ename, connInfo)
1147            self.import_store_info(certfile, connInfo)
1148
1149            expfile = "%s/experiment.tcl" % tmpdir
1150
1151            self.generate_portal_configs(topo, pubkey_base, 
1152                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1153            self.generate_ns2(topo, expfile, 
1154                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1155
1156            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1157                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1158                    cert=self.xmlrpc_cert)
1159            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1160        except service_error, e:
1161            err = e
1162        except:
1163            t, v, st = sys.exc_info()
1164            err = service_error(service_error.internal, "%s: %s" % \
1165                    (v, traceback.extract_tb(st)))
1166
1167        # Walk up tmpdir, deleting as we go
1168        if self.cleanup: self.remove_dirs(tmpdir)
1169        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1170
1171        if rv:
1172            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1173                    proof)
1174        elif err:
1175            raise service_error(service_error.federant,
1176                    "Swapin failed: %s" % err)
1177        else:
1178            raise service_error(service_error.federant, "Swapin failed")
1179
1180    def TerminateSegment(self, req, fid):
1181        try:
1182            req = req['TerminateSegmentRequestBody']
1183        except KeyError:
1184            raise service_error(server_error.req, "Badly formed request")
1185
1186        auth_attr = req['allocID']['fedid']
1187        aid = "%s" % auth_attr
1188        attrs = req.get('fedAttr', [])
1189
1190        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1191                with_proof=True)
1192        if not access_ok:
1193            raise service_error(service_error.access, "Access denied")
1194
1195        self.state_lock.acquire()
1196        if aid in self.allocation:
1197            proj = self.allocation[aid].get('project', None)
1198            if not proj: 
1199                proj = self.allocation[aid].get('sproject', None)
1200            user = self.allocation[aid].get('user', None)
1201            ename = self.allocation[aid].get('experiment', None)
1202            nonce = self.allocation[aid].get('nonce', False)
1203        else:
1204            proj = None
1205            user = None
1206            ename = None
1207            nonce = False
1208        self.state_lock.release()
1209
1210        if not proj:
1211            raise service_error(service_error.internal, 
1212                    "Can't find project for %s" % aid)
1213        else:
1214            if '/' in proj: proj, gid = proj.split('/')
1215            else: gid = None
1216
1217        if not user:
1218            raise service_error(service_error.internal, 
1219                    "Can't find creation user for %s" % aid)
1220        if not ename:
1221            raise service_error(service_error.internal, 
1222                    "Can't find experiment name for %s" % aid)
1223        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1224                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1225        stopper(self, user, proj, ename, gid, nonce)
1226        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1227
1228    def InfoSegment(self, req, fid):
1229        try:
1230            req = req['InfoSegmentRequestBody']
1231        except KeyError:
1232            raise service_error(server_error.req, "Badly formed request")
1233
1234        auth_attr = req['allocID']['fedid']
1235        aid = "%s" % auth_attr
1236
1237        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1238                with_proof=True)
1239        if not access_ok:
1240            raise service_error(service_error.access, "Access denied")
1241
1242        self.state_lock.acquire()
1243        if aid in self.allocation:
1244            topo = self.allocation[aid].get('topo', None)
1245            if topo: topo = topo.clone()
1246            proj = self.allocation[aid].get('project', None)
1247            if not proj: 
1248                proj = self.allocation[aid].get('sproject', None)
1249            user = self.allocation[aid].get('user', None)
1250            ename = self.allocation[aid].get('experiment', None)
1251        else:
1252            proj = None
1253            user = None
1254            ename = None
1255            topo = None
1256        self.state_lock.release()
1257
1258        if not proj:
1259            raise service_error(service_error.internal, 
1260                    "Can't find project for %s" % aid)
1261        else:
1262            if '/' in proj: proj, gid = proj.split('/')
1263            else: gid = None
1264
1265        if not user:
1266            raise service_error(service_error.internal, 
1267                    "Can't find creation user for %s" % aid)
1268        if not ename:
1269            raise service_error(service_error.internal, 
1270                    "Can't find experiment name for %s" % aid)
1271        info = self.info_segment(keyfile=self.ssh_privkey_file,
1272                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1273        info(self, user, proj, ename)
1274        self.decorate_topology(info, topo)
1275        self.state_lock.acquire()
1276        if aid in self.allocation:
1277            self.allocation[aid]['topo'] = topo
1278            self.write_state()
1279        self.state_lock.release()
1280
1281        return { 
1282                'allocID': req['allocID'], 
1283                'segmentdescription': 
1284                    { 'topdldescription' : topo.to_dict() },
1285                'proof': proof.to_dict(),
1286                }
1287
1288    def OperationSegment(self, req, fid):
1289        def get_pname(e):
1290            """
1291            Get the physical name of a node
1292            """
1293            if e.localname:
1294                return re.sub('\..*','', e.localname[0])
1295            else:
1296                return None
1297
1298        try:
1299            req = req['OperationSegmentRequestBody']
1300        except KeyError:
1301            raise service_error(server_error.req, "Badly formed request")
1302
1303        auth_attr = req['allocID']['fedid']
1304        aid = "%s" % auth_attr
1305
1306        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1307                with_proof=True)
1308        if not access_ok:
1309            raise service_error(service_error.access, "Access denied")
1310
1311        op = req.get('operation', None)
1312        targets = req.get('target', None)
1313        params = req.get('parameter', None)
1314
1315        if op is None :
1316            raise service_error(service_error.req, "missing operation")
1317        elif targets is None:
1318            raise service_error(service_error.req, "no targets")
1319
1320        if aid in self.allocation:
1321            topo = self.allocation[aid].get('topo', None)
1322            if topo: topo = topo.clone()
1323        else:
1324            topo = None
1325
1326        targets = copy.copy(targets)
1327        ptargets = { }
1328        for e in topo.elements:
1329            if isinstance(e, topdl.Computer):
1330                if e.name in targets:
1331                    targets.remove(e.name)
1332                    pn = get_pname(e)
1333                    if pn: ptargets[e.name] = pn
1334
1335        status = [ operation_status(t, operation_status.no_target) \
1336                for t in targets]
1337
1338        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1339                debug=self.create_debug, boss=self.boss, 
1340                cert=self.xmlrpc_cert)
1341        ops(self, op, ptargets, params, topo)
1342       
1343        status.extend(ops.status)
1344
1345        return { 
1346                'allocID': req['allocID'], 
1347                'status': [s.to_dict() for s in status],
1348                'proof': proof.to_dict(),
1349                }
Note: See TracBrowser for help on using the repository browser.