source: fedd/federation/emulab_access.py @ b709861

compt_changesinfo-ops
Last change on this file since b709861 was b709861, checked in by Ted Faber <faber@…>, 12 years ago

Rebooting works

  • Property mode set to 100644
File size: 43.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18from legacy_access import legacy_access
19
20from util import *
21from allocate_project import allocate_project_local, allocate_project_remote
22from fedid import fedid, generate_fedid
23from authorizer import authorizer, abac_authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26from proof import proof as access_proof
27
28import httplib
29import tempfile
30from urlparse import urlparse
31
32import topdl
33import list_log
34import proxy_emulab_segment
35import local_emulab_segment
36
37
38# Make log messages disappear if noone configures a fedd logger
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.access")
43fl.addHandler(nullHandler())
44
45class access(access_base, legacy_access):
46    """
47    The implementation of access control based on mapping users to projects.
48
49    Users can be mapped to existing projects or have projects created
50    dynamically.  This implements both direct requests and proxies.
51    """
52
53    max_name_len = 19
54
55    def __init__(self, config=None, auth=None):
56        """
57        Initializer.  Pulls parameters out of the ConfigParser's access section.
58        """
59
60        access_base.__init__(self, config, auth)
61
62        self.max_name_len = access.max_name_len
63
64        self.allow_proxy = config.getboolean("access", "allow_proxy")
65
66        self.domain = config.get("access", "domain")
67        self.userconfdir = config.get("access","userconfdir")
68        self.userconfcmd = config.get("access","userconfcmd")
69        self.userconfurl = config.get("access","userconfurl")
70        self.federation_software = config.get("access", "federation_software")
71        self.portal_software = config.get("access", "portal_software")
72        self.local_seer_software = config.get("access", "local_seer_software")
73        self.local_seer_image = config.get("access", "local_seer_image")
74        self.local_seer_start = config.get("access", "local_seer_start")
75        self.seer_master_start = config.get("access", "seer_master_start")
76        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
77        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
78        self.ssh_port = config.get("access","ssh_port") or "22"
79        self.boss = config.get("access", "boss")
80        self.ops = config.get("access", "ops")
81        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
82        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
83
84        self.dragon_endpoint = config.get("access", "dragon")
85        self.dragon_vlans = config.get("access", "dragon_vlans")
86        self.deter_internal = config.get("access", "deter_internal")
87
88        self.tunnel_config = config.getboolean("access", "tunnel_config")
89        self.portal_command = config.get("access", "portal_command")
90        self.portal_image = config.get("access", "portal_image")
91        self.portal_type = config.get("access", "portal_type") or "pc"
92        self.portal_startcommand = config.get("access", "portal_startcommand")
93        self.node_startcommand = config.get("access", "node_startcommand")
94
95        self.federation_software = self.software_list(self.federation_software)
96        self.portal_software = self.software_list(self.portal_software)
97        self.local_seer_software = self.software_list(self.local_seer_software)
98
99        self.access_type = self.access_type.lower()
100        if self.access_type == 'remote_emulab':
101            self.start_segment = proxy_emulab_segment.start_segment
102            self.stop_segment = proxy_emulab_segment.stop_segment
103            self.info_segment = proxy_emulab_segment.info_segment
104            self.operation_segment = proxy_emulab_segment.operation_segment
105        elif self.access_type == 'local_emulab':
106            self.start_segment = local_emulab_segment.start_segment
107            self.stop_segment = local_emulab_segment.stop_segment
108            self.info_segment = local_emulab_segment.info_segment
109            self.operation_segment = local_emulab_segment.operation_segment
110        else:
111            self.start_segment = None
112            self.stop_segment = None
113            self.info_segment = None
114            self.operation_segment = None
115
116        self.restricted = [ ]
117        tb = config.get('access', 'testbed')
118        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
119        else: self.testbed = [ ]
120
121        # authorization information
122        self.auth_type = config.get('access', 'auth_type') \
123                or 'legacy'
124        self.auth_dir = config.get('access', 'auth_dir')
125        accessdb = config.get("access", "accessdb")
126        # initialize the authorization system
127        if self.auth_type == 'legacy':
128            self.access = { }
129            if accessdb:
130                self.legacy_read_access(accessdb, self.legacy_access_tuple)
131        elif self.auth_type == 'abac':
132            self.auth = abac_authorizer(load=self.auth_dir)
133            self.access = [ ]
134            if accessdb:
135                self.read_access(accessdb, self.access_tuple)
136        else:
137            raise service_error(service_error.internal, 
138                    "Unknown auth_type: %s" % self.auth_type)
139
140        # read_state in the base_class
141        self.state_lock.acquire()
142        for a  in ('allocation', 'projects', 'keys', 'types'):
143            if a not in self.state:
144                self.state[a] = { }
145        self.allocation = self.state['allocation']
146        self.projects = self.state['projects']
147        self.keys = self.state['keys']
148        self.types = self.state['types']
149        if self.auth_type == "legacy": 
150            # Add the ownership attributes to the authorizer.  Note that the
151            # indices of the allocation dict are strings, but the attributes are
152            # fedids, so there is a conversion.
153            for k in self.allocation.keys():
154                for o in self.allocation[k].get('owners', []):
155                    self.auth.set_attribute(o, fedid(hexstr=k))
156                if self.allocation[k].has_key('userconfig'):
157                    sfid = self.allocation[k]['userconfig']
158                    fid = fedid(hexstr=sfid)
159                    self.auth.set_attribute(fid, "/%s" % sfid)
160        self.state_lock.release()
161        self.exports = {
162                'SMB': self.export_SMB,
163                'seer': self.export_seer,
164                'tmcd': self.export_tmcd,
165                'userconfig': self.export_userconfig,
166                'project_export': self.export_project_export,
167                'local_seer_control': self.export_local_seer,
168                'seer_master': self.export_seer_master,
169                'hide_hosts': self.export_hide_hosts,
170                }
171
172        if not self.local_seer_image or not self.local_seer_software or \
173                not self.local_seer_start:
174            if 'local_seer_control' in self.exports:
175                del self.exports['local_seer_control']
176
177        if not self.local_seer_image or not self.local_seer_software or \
178                not self.seer_master_start:
179            if 'seer_master' in self.exports:
180                del self.exports['seer_master']
181
182
183        self.soap_services = {\
184            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
185            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
186            'StartSegment': soap_handler("StartSegment", self.StartSegment),
187            'TerminateSegment': soap_handler("TerminateSegment",
188                self.TerminateSegment),
189            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
190            'OperationSegment': soap_handler("OperationSegment",
191                self.OperationSegment),
192            }
193        self.xmlrpc_services =  {\
194            'RequestAccess': xmlrpc_handler('RequestAccess',
195                self.RequestAccess),
196            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
197                self.ReleaseAccess),
198            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
199            'TerminateSegment': xmlrpc_handler('TerminateSegment',
200                self.TerminateSegment),
201            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
202            'OperationSegment': xmlrpc_handler("OperationSegment",
203                self.OperationSegment),
204            }
205
206        self.call_SetValue = service_caller('SetValue')
207        self.call_GetValue = service_caller('GetValue', log=self.log)
208
209        if not config.has_option("allocate", "uri"):
210            self.allocate_project = \
211                allocate_project_local(config, auth)
212        else:
213            self.allocate_project = \
214                allocate_project_remote(config, auth)
215
216
217        # If the project allocator exports services, put them in this object's
218        # maps so that classes that instantiate this can call the services.
219        self.soap_services.update(self.allocate_project.soap_services)
220        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
221
222    @staticmethod
223    def legacy_access_tuple(str):
224        """
225        Convert a string of the form (id[:resources:resouces], id, id) into a
226        tuple of the form (project, user, user) where users may be names or
227        fedids.  The resources strings are obsolete and ignored.
228        """
229        def parse_name(n):
230            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
231            else: return n
232
233        str = str.strip()
234        if str.startswith('(') and str.endswith(')'):
235            str = str[1:-1]
236            names = [ s.strip() for s in str.split(",")]
237            if len(names) > 3:
238                raise self.parse_error("More than three fields in name")
239            first = names[0].split(":")
240            if first == 'fedid:':
241                del first[0]
242                first[0] = fedid(hexstr=first[0])
243            names[0] = first[0]
244
245            for i in range(1,2):
246                names[i] = parse_name(names[i])
247
248            return tuple(names)
249        else:
250            raise self.parse_error('Bad mapping (unbalanced parens)')
251
252    @staticmethod
253    def access_tuple(str):
254        """
255        Convert a string of the form (id, id) into an access_project.  This is
256        called by read_access to convert to local attributes.  It returns
257        a tuple of the form (project, user, user) where the two users are
258        always the same.
259        """
260
261        str = str.strip()
262        if str.startswith('(') and str.endswith(')') and str.count(',') == 1:
263            # The slice takes the parens off the string.
264            proj, user = str[1:-1].split(',')
265            return (proj.strip(), user.strip(), user.strip())
266        else:
267            raise self.parse_error(
268                    'Bad mapping (unbalanced parens or more than 1 comma)')
269
270    # RequestAccess support routines
271
272    def legacy_lookup_access(self, req, fid):
273        """
274        Look up the local access control information mapped to this fedid and
275        credentials.  In this case it is a (project, create_user, access_user)
276        triple, and a triple of three booleans indicating which, if any will
277        need to be dynamically created.  Finally a list of owners for that
278        allocation is returned.
279
280        lookup_access_base pulls the first triple out, and it is parsed by this
281        routine into the boolean map.  Owners is always the controlling fedid.
282        """
283        # Return values
284        rp = None
285        ru = None
286        # This maps a valid user to the Emulab projects and users to use
287        found, match = self.legacy_lookup_access_base(req, fid)
288        tb, project, user = match
289       
290        if found == None:
291            raise service_error(service_error.access,
292                    "Access denied - cannot map access")
293
294        # resolve <dynamic> and <same> in found
295        dyn_proj = False
296        dyn_create_user = False
297        dyn_service_user = False
298
299        if found[0] == "<same>":
300            if project != None:
301                rp = project
302            else : 
303                raise service_error(\
304                        service_error.server_config,
305                        "Project matched <same> when no project given")
306        elif found[0] == "<dynamic>":
307            rp = None
308            dyn_proj = True
309        else:
310            rp = found[0]
311
312        if found[1] == "<same>":
313            if user_match == "<any>":
314                if user != None: rcu = user[0]
315                else: raise service_error(\
316                        service_error.server_config,
317                        "Matched <same> on anonymous request")
318            else:
319                rcu = user_match
320        elif found[1] == "<dynamic>":
321            rcu = None
322            dyn_create_user = True
323        else:
324            rcu = found[1]
325       
326        if found[2] == "<same>":
327            if user_match == "<any>":
328                if user != None: rsu = user[0]
329                else: raise service_error(\
330                        service_error.server_config,
331                        "Matched <same> on anonymous request")
332            else:
333                rsu = user_match
334        elif found[2] == "<dynamic>":
335            rsu = None
336            dyn_service_user = True
337        else:
338            rsu = found[2]
339
340        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
341                [ fid ]
342
343    def do_project_allocation(self, dyn, project, user):
344        """
345        Call the project allocation routines and return the info.
346        """
347        if dyn: 
348            # Compose the dynamic project request
349            # (only dynamic, dynamic currently allowed)
350            preq = { 'AllocateProjectRequestBody': \
351                        { 'project' : {\
352                            'user': [ \
353                            { \
354                                'access': [ { 
355                                    'sshPubkey': self.ssh_pubkey_file } ],
356                                 'role': "serviceAccess",\
357                            }, \
358                            { \
359                                'access': [ { 
360                                    'sshPubkey': self.ssh_pubkey_file } ],
361                                 'role': "experimentCreation",\
362                            }, \
363                            ], \
364                            }\
365                        }\
366                    }
367            return self.allocate_project.dynamic_project(preq)
368        else:
369            preq = {'StaticProjectRequestBody' : \
370                    { 'project': \
371                        { 'name' : { 'localname' : project },\
372                          'user' : [ \
373                            {\
374                                'userID': { 'localname' : user }, \
375                                'access': [ { 
376                                    'sshPubkey': self.ssh_pubkey_file } ],
377                                'role': 'experimentCreation'\
378                            },\
379                            {\
380                                'userID': { 'localname' : user}, \
381                                'access': [ { 
382                                    'sshPubkey': self.ssh_pubkey_file } ],
383                                'role': 'serviceAccess'\
384                            },\
385                        ]}\
386                    }\
387            }
388            return self.allocate_project.static_project(preq)
389
390    def save_project_state(self, aid, ap, dyn, owners):
391        """
392        Parse out and save the information relevant to the project created for
393        this experiment.  That info is largely in ap and owners.  dyn indicates
394        that the project was created dynamically.  Return the user and project
395        names.
396        """
397        self.state_lock.acquire()
398        self.allocation[aid] = { }
399        self.allocation[aid]['auth'] = set()
400        try:
401            pname = ap['project']['name']['localname']
402        except KeyError:
403            pname = None
404
405        if dyn:
406            if not pname:
407                self.state_lock.release()
408                raise service_error(service_error.internal,
409                        "Misformed allocation response?")
410            if pname in self.projects: self.projects[pname] += 1
411            else: self.projects[pname] = 1
412            self.allocation[aid]['project'] = pname
413        else:
414            # sproject is a static project associated with this allocation.
415            self.allocation[aid]['sproject'] = pname
416
417        self.allocation[aid]['keys'] = [ ]
418
419        try:
420            for u in ap['project']['user']:
421                uname = u['userID']['localname']
422                if u['role'] == 'experimentCreation':
423                    self.allocation[aid]['user'] = uname
424                for k in [ k['sshPubkey'] for k in u['access'] \
425                        if k.has_key('sshPubkey') ]:
426                    kv = "%s:%s" % (uname, k)
427                    if self.keys.has_key(kv): self.keys[kv] += 1
428                    else: self.keys[kv] = 1
429                    self.allocation[aid]['keys'].append((uname, k))
430        except KeyError:
431            self.state_lock.release()
432            raise service_error(service_error.internal,
433                    "Misformed allocation response?")
434
435        self.allocation[aid]['owners'] = owners
436        self.write_state()
437        self.state_lock.release()
438        return (pname, uname)
439
440    # End of RequestAccess support routines
441
442    def RequestAccess(self, req, fid):
443        """
444        Handle the access request.  Proxy if not for us.
445
446        Parse out the fields and make the allocations or rejections if for us,
447        otherwise, assuming we're willing to proxy, proxy the request out.
448        """
449
450        def gateway_hardware(h):
451            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
452            else: return h
453
454        def get_export_project(svcs):
455            """
456            if the service requests includes one to export a project, return
457            that project.
458            """
459            rv = None
460            for s in svcs:
461                if s.get('name', '') == 'project_export' and \
462                        s.get('visibility', '') == 'export':
463                    if not rv: 
464                        for a in s.get('fedAttr', []):
465                            if a.get('attribute', '') == 'project' \
466                                    and 'value' in a:
467                                rv = a['value']
468                    else:
469                        raise service_error(service_error, access, 
470                                'Requesting multiple project exports is ' + \
471                                        'not supported');
472            return rv
473
474        # The dance to get into the request body
475        if req.has_key('RequestAccessRequestBody'):
476            req = req['RequestAccessRequestBody']
477        else:
478            raise service_error(service_error.req, "No request!?")
479
480        # if this includes a project export request, construct a filter such
481        # that only the ABAC attributes mapped to that project are checked for
482        # access.
483        if 'service' in req:
484            ep = get_export_project(req['service'])
485            if ep: pf = lambda(a): a.value[0] == ep
486            else: pf = None
487        else:
488            ep = None
489            pf = None
490
491        if self.auth.import_credentials(
492                data_list=req.get('abac_credential', [])):
493            self.auth.save()
494
495        if self.auth_type == "legacy":
496            found, dyn, owners= self.legacy_lookup_access(req, fid)
497            proof = access_proof("me", fid, "create")
498        elif self.auth_type == 'abac':
499            found, dyn, owners, proof = self.lookup_access(req, fid, filter=pf)
500        else:
501            raise service_error(service_error.internal, 
502                    'Unknown auth_type: %s' % self.auth_type)
503        ap = None
504
505        # This only happens in legacy lookups, but if this user has access to
506        # the testbed but not the project to be exported, raise the error.
507        if ep and ep != found[0]:
508            raise service_error(service_error.access,
509                    "Cannot export %s" % ep)
510
511        if self.ssh_pubkey_file:
512            ap = self.do_project_allocation(dyn[1], found[0], found[1])
513        else:
514            raise service_error(service_error.internal, 
515                    "SSH access parameters required")
516        # keep track of what's been added
517        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
518        aid = unicode(allocID)
519
520        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
521
522        services, svc_state = self.export_services(req.get('service',[]),
523                pname, uname)
524        self.state_lock.acquire()
525        # Store services state in global state
526        for k, v in svc_state.items():
527            self.allocation[aid][k] = v
528        self.append_allocation_authorization(aid, 
529                set([(o, allocID) for o in owners]), state_attr='allocation')
530        self.write_state()
531        self.state_lock.release()
532        try:
533            f = open("%s/%s.pem" % (self.certdir, aid), "w")
534            print >>f, alloc_cert
535            f.close()
536        except EnvironmentError, e:
537            raise service_error(service_error.internal, 
538                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
539        resp = self.build_access_response({ 'fedid': allocID } ,
540                ap, services, proof)
541        return resp
542
543    def do_release_project(self, del_project, del_users, del_types):
544        """
545        If a project and users has to be deleted, make the call.
546        """
547        msg = { 'project': { }}
548        if del_project:
549            msg['project']['name']= {'localname': del_project}
550        users = [ ]
551        for u in del_users.keys():
552            users.append({ 'userID': { 'localname': u },\
553                'access' :  \
554                        [ {'sshPubkey' : s } for s in del_users[u]]\
555            })
556        if users: 
557            msg['project']['user'] = users
558        if len(del_types) > 0:
559            msg['resources'] = { 'node': \
560                    [ {'hardware': [ h ] } for h in del_types ]\
561                }
562        if self.allocate_project.release_project:
563            msg = { 'ReleaseProjectRequestBody' : msg}
564            self.allocate_project.release_project(msg)
565
566    def ReleaseAccess(self, req, fid):
567        # The dance to get into the request body
568        if req.has_key('ReleaseAccessRequestBody'):
569            req = req['ReleaseAccessRequestBody']
570        else:
571            raise service_error(service_error.req, "No request!?")
572
573        try:
574            if req['allocID'].has_key('localname'):
575                auth_attr = aid = req['allocID']['localname']
576            elif req['allocID'].has_key('fedid'):
577                aid = unicode(req['allocID']['fedid'])
578                auth_attr = req['allocID']['fedid']
579            else:
580                raise service_error(service_error.req,
581                        "Only localnames and fedids are understood")
582        except KeyError:
583            raise service_error(service_error.req, "Badly formed request")
584
585        self.log.debug("[access] deallocation requested for %s by %s" % \
586                (aid, fid))
587        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
588                with_proof=True)
589        if not access_ok:
590            self.log.debug("[access] deallocation denied for %s", aid)
591            raise service_error(service_error.access, "Access Denied")
592
593        # If we know this allocation, reduce the reference counts and
594        # remove the local allocations.  Otherwise report an error.  If
595        # there is an allocation to delete, del_users will be a dictonary
596        # of sets where the key is the user that owns the keys in the set.
597        # We use a set to avoid duplicates.  del_project is just the name
598        # of any dynamic project to delete.  We're somewhat lazy about
599        # deleting authorization attributes.  Having access to something
600        # that doesn't exist isn't harmful.
601        del_users = { }
602        del_project = None
603        del_types = set()
604
605        self.state_lock.acquire()
606        if aid in self.allocation:
607            self.log.debug("Found allocation for %s" %aid)
608            self.clear_allocation_authorization(aid, state_attr='allocation')
609            for k in self.allocation[aid]['keys']:
610                kk = "%s:%s" % k
611                self.keys[kk] -= 1
612                if self.keys[kk] == 0:
613                    if not del_users.has_key(k[0]):
614                        del_users[k[0]] = set()
615                    del_users[k[0]].add(k[1])
616                    del self.keys[kk]
617
618            if 'project' in self.allocation[aid]:
619                pname = self.allocation[aid]['project']
620                self.projects[pname] -= 1
621                if self.projects[pname] == 0:
622                    del_project = pname
623                    del self.projects[pname]
624
625            if 'types' in self.allocation[aid]:
626                for t in self.allocation[aid]['types']:
627                    self.types[t] -= 1
628                    if self.types[t] == 0:
629                        if not del_project: del_project = t[0]
630                        del_types.add(t[1])
631                        del self.types[t]
632
633            del self.allocation[aid]
634            self.write_state()
635            self.state_lock.release()
636            # If we actually have resources to deallocate, prepare the call.
637            if del_project or del_users:
638                self.do_release_project(del_project, del_users, del_types)
639            # And remove the access cert
640            cf = "%s/%s.pem" % (self.certdir, aid)
641            self.log.debug("Removing %s" % cf)
642            os.remove(cf)
643            return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
644        else:
645            self.state_lock.release()
646            raise service_error(service_error.req, "No such allocation")
647
648    # These are subroutines for StartSegment
649    def generate_ns2(self, topo, expfn, softdir, connInfo):
650        """
651        Convert topo into an ns2 file, decorated with appropriate commands for
652        the particular testbed setup.  Convert all requests for software, etc
653        to point at the staged copies on this testbed and add the federation
654        startcommands.
655        """
656        class dragon_commands:
657            """
658            Functor to spit out approrpiate dragon commands for nodes listed in
659            the connectivity description.  The constructor makes a dict mapping
660            dragon nodes to their parameters and the __call__ checks each
661            element in turn for membership.
662            """
663            def __init__(self, map):
664                self.node_info = map
665
666            def __call__(self, e):
667                s = ""
668                if isinstance(e, topdl.Computer):
669                    if self.node_info.has_key(e.name):
670                        info = self.node_info[e.name]
671                        for ifname, vlan, type in info:
672                            for i in e.interface:
673                                if i.name == ifname:
674                                    addr = i.get_attribute('ip4_address')
675                                    subs = i.substrate[0]
676                                    break
677                            else:
678                                raise service_error(service_error.internal,
679                                        "No interface %s on element %s" % \
680                                                (ifname, e.name))
681                            # XXX: do netmask right
682                            if type =='link':
683                                s = ("tb-allow-external ${%s} " + \
684                                        "dragonportal ip %s vlan %s " + \
685                                        "netmask 255.255.255.0\n") % \
686                                        (topdl.to_tcl_name(e.name), addr, vlan)
687                            elif type =='lan':
688                                s = ("tb-allow-external ${%s} " + \
689                                        "dragonportal " + \
690                                        "ip %s vlan %s usurp %s\n") % \
691                                        (topdl.to_tcl_name(e.name), addr, 
692                                                vlan, subs)
693                            else:
694                                raise service_error(service_error_internal,
695                                        "Unknown DRAGON type %s" % type)
696                return s
697
698        class not_dragon:
699            """
700            Return true if a node is in the given map of dragon nodes.
701            """
702            def __init__(self, map):
703                self.nodes = set(map.keys())
704
705            def __call__(self, e):
706                return e.name not in self.nodes
707
708        # Main line of generate_ns2
709        t = topo.clone()
710
711        # Create the map of nodes that need direct connections (dragon
712        # connections) from the connInfo
713        dragon_map = { }
714        for i in [ i for i in connInfo if i['type'] == 'transit']:
715            for a in i.get('fedAttr', []):
716                if a['attribute'] == 'vlan_id':
717                    vlan = a['value']
718                    break
719            else:
720                raise service_error(service_error.internal, "No vlan tag")
721            members = i.get('member', [])
722            if len(members) > 1: type = 'lan'
723            else: type = 'link'
724
725            try:
726                for m in members:
727                    if m['element'] in dragon_map:
728                        dragon_map[m['element']].append(( m['interface'], 
729                            vlan, type))
730                    else:
731                        dragon_map[m['element']] = [( m['interface'], 
732                            vlan, type),]
733            except KeyError:
734                raise service_error(service_error.req,
735                        "Missing connectivity info")
736
737        # Weed out the things we aren't going to instantiate: Segments, portal
738        # substrates, and portal interfaces.  (The copy in the for loop allows
739        # us to delete from e.elements in side the for loop).  While we're
740        # touching all the elements, we also adjust paths from the original
741        # testbed to local testbed paths and put the federation commands into
742        # the start commands
743        for e in [e for e in t.elements]:
744            if isinstance(e, topdl.Segment):
745                t.elements.remove(e)
746            if isinstance(e, topdl.Computer):
747                self.add_kit(e, self.federation_software)
748                if e.get_attribute('portal') and self.portal_startcommand:
749                    # Add local portal support software
750                    self.add_kit(e, self.portal_software)
751                    # Portals never have a user-specified start command
752                    e.set_attribute('startup', self.portal_startcommand)
753                elif self.node_startcommand:
754                    if e.get_attribute('startup'):
755                        e.set_attribute('startup', "%s \\$USER '%s'" % \
756                                (self.node_startcommand, 
757                                    e.get_attribute('startup')))
758                    else:
759                        e.set_attribute('startup', self.node_startcommand)
760
761                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
762                # Remove portal interfaces that do not connect to DRAGON
763                e.interface = [i for i in e.interface \
764                        if not i.get_attribute('portal') or i.name in dinf ]
765            # Fix software paths
766            for s in getattr(e, 'software', []):
767                s.location = re.sub("^.*/", softdir, s.location)
768
769        t.substrates = [ s.clone() for s in t.substrates ]
770        t.incorporate_elements()
771
772        # Customize the ns2 output for local portal commands and images
773        filters = []
774
775        if self.dragon_endpoint:
776            add_filter = not_dragon(dragon_map)
777            filters.append(dragon_commands(dragon_map))
778        else:
779            add_filter = None
780
781        if self.portal_command:
782            filters.append(topdl.generate_portal_command_filter(
783                self.portal_command, add_filter=add_filter))
784
785        if self.portal_image:
786            filters.append(topdl.generate_portal_image_filter(
787                self.portal_image))
788
789        if self.portal_type:
790            filters.append(topdl.generate_portal_hardware_filter(
791                self.portal_type))
792
793        # Convert to ns and write it out
794        expfile = topdl.topology_to_ns2(t, filters)
795        try:
796            f = open(expfn, "w")
797            print >>f, expfile
798            f.close()
799        except EnvironmentError:
800            raise service_error(service_error.internal,
801                    "Cannot write experiment file %s: %s" % (expfn,e))
802
803    def export_store_info(self, cf, proj, ename, connInfo):
804        """
805        For the export requests in the connection info, install the peer names
806        at the experiment controller via SetValue calls.
807        """
808
809        for c in connInfo:
810            for p in [ p for p in c.get('parameter', []) \
811                    if p.get('type', '') == 'output']:
812
813                if p.get('name', '') == 'peer':
814                    k = p.get('key', None)
815                    surl = p.get('store', None)
816                    if surl and k and k.index('/') != -1:
817                        value = "%s.%s.%s%s" % \
818                                (k[k.index('/')+1:], ename, proj, self.domain)
819                        req = { 'name': k, 'value': value }
820                        self.log.debug("Setting %s to %s on %s" % \
821                                (k, value, surl))
822                        self.call_SetValue(surl, req, cf)
823                    else:
824                        self.log.error("Bad export request: %s" % p)
825                elif p.get('name', '') == 'ssh_port':
826                    k = p.get('key', None)
827                    surl = p.get('store', None)
828                    if surl and k:
829                        req = { 'name': k, 'value': self.ssh_port }
830                        self.log.debug("Setting %s to %s on %s" % \
831                                (k, self.ssh_port, surl))
832                        self.call_SetValue(surl, req, cf)
833                    else:
834                        self.log.error("Bad export request: %s" % p)
835                else:
836                    self.log.error("Unknown export parameter: %s" % \
837                            p.get('name'))
838                    continue
839
840    def add_seer_node(self, topo, name, startup):
841        """
842        Add a seer node to the given topology, with the startup command passed
843        in.  Used by configure seer_services.
844        """
845        c_node = topdl.Computer(
846                name=name, 
847                os= topdl.OperatingSystem(
848                    attribute=[
849                    { 'attribute': 'osid', 
850                        'value': self.local_seer_image },
851                    ]),
852                attribute=[
853                    { 'attribute': 'startup', 'value': startup },
854                    ]
855                )
856        self.add_kit(c_node, self.local_seer_software)
857        topo.elements.append(c_node)
858
859    def configure_seer_services(self, services, topo, softdir):
860        """
861        Make changes to the topology required for the seer requests being made.
862        Specifically, add any control or master nodes required and set up the
863        start commands on the nodes to interconnect them.
864        """
865        local_seer = False      # True if we need to add a control node
866        collect_seer = False    # True if there is a seer-master node
867        seer_master= False      # True if we need to add the seer-master
868        for s in services:
869            s_name = s.get('name', '')
870            s_vis = s.get('visibility','')
871
872            if s_name  == 'local_seer_control' and s_vis == 'export':
873                local_seer = True
874            elif s_name == 'seer_master':
875                if s_vis == 'import':
876                    collect_seer = True
877                elif s_vis == 'export':
878                    seer_master = True
879       
880        # We've got the whole picture now, so add nodes if needed and configure
881        # them to interconnect properly.
882        if local_seer or seer_master:
883            # Copy local seer control node software to the tempdir
884            for l, f in self.local_seer_software:
885                base = os.path.basename(f)
886                copy_file(f, "%s/%s" % (softdir, base))
887        # If we're collecting seers somewhere the controllers need to talk to
888        # the master.  In testbeds that export the service, that will be a
889        # local node that we'll add below.  Elsewhere it will be the control
890        # portal that will port forward to the exporting master.
891        if local_seer:
892            if collect_seer:
893                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
894            else:
895                startup = self.local_seer_start
896            self.add_seer_node(topo, 'control', startup)
897        # If this is the seer master, add that node, too.
898        if seer_master:
899            self.add_seer_node(topo, 'seer-master', 
900                    "%s -R -n -R seer-master -R -A -R sink" % \
901                            self.seer_master_start)
902
903    def retrieve_software(self, topo, certfile, softdir):
904        """
905        Collect the software that nodes in the topology need loaded and stage
906        it locally.  This implies retrieving it from the experiment_controller
907        and placing it into softdir.  Certfile is used to prove that this node
908        has access to that data (it's the allocation/segment fedid).  Finally
909        local portal and federation software is also copied to the same staging
910        directory for simplicity - all software needed for experiment creation
911        is in softdir.
912        """
913        sw = set()
914        for e in topo.elements:
915            for s in getattr(e, 'software', []):
916                sw.add(s.location)
917        for s in sw:
918            self.log.debug("Retrieving %s" % s)
919            try:
920                get_url(s, certfile, softdir)
921            except:
922                t, v, st = sys.exc_info()
923                raise service_error(service_error.internal,
924                        "Error retrieving %s: %s" % (s, v))
925
926        # Copy local federation and portal node software to the tempdir
927        for s in (self.federation_software, self.portal_software):
928            for l, f in s:
929                base = os.path.basename(f)
930                copy_file(f, "%s/%s" % (softdir, base))
931
932
933    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
934        """
935        Gather common configuration files, retrieve or create an experiment
936        name and project name, and return the ssh_key filenames.  Create an
937        allocation log bound to the state log variable as well.
938        """
939        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey', 
940                'seer_ca_pem', 'seer_node_pem')
941        ename = None
942        pubkey_base = None
943        secretkey_base = None
944        proj = None
945        user = None
946        alloc_log = None
947        nonce_experiment = False
948        vchars_re = '[^' + string.ascii_letters + string.digits  + '-]'
949
950        self.state_lock.acquire()
951        if aid in self.allocation:
952            proj = self.allocation[aid].get('project', None)
953            if not proj: 
954                proj = self.allocation[aid].get('sproject', None)
955        self.state_lock.release()
956
957        if not proj:
958            raise service_error(service_error.internal, 
959                    "Can't find project for %s" %aid)
960
961        for a in attrs:
962            if a['attribute'] in configs:
963                try:
964                    self.log.debug("Retrieving %s from %s" % \
965                            (a['attribute'], a['value']))
966                    get_url(a['value'], certfile, tmpdir)
967                except:
968                    t, v, st = sys.exc_info()
969                    raise service_error(service_error.internal,
970                            "Error retrieving %s: %s" % (a.get('value', ""), v))
971            if a['attribute'] == 'ssh_pubkey':
972                pubkey_base = a['value'].rpartition('/')[2]
973            if a['attribute'] == 'ssh_secretkey':
974                secretkey_base = a['value'].rpartition('/')[2]
975            if a['attribute'] == 'experiment_name':
976                ename = a['value']
977
978        # Names longer than the emulab max are discarded
979        if ename and len(ename) <= self.max_name_len:
980            # Clean up the experiment name so that emulab will accept it.
981            ename = re.sub(vchars_re, '-', ename)
982
983        else:
984            ename = ""
985            for i in range(0,5):
986                ename += random.choice(string.ascii_letters)
987            nonce_experiment = True
988            self.log.warn("No experiment name or suggestion too long: " + \
989                    "picked one randomly: %s" % ename)
990
991        if not pubkey_base:
992            raise service_error(service_error.req, 
993                    "No public key attribute")
994
995        if not secretkey_base:
996            raise service_error(service_error.req, 
997                    "No secret key attribute")
998
999        self.state_lock.acquire()
1000        if aid in self.allocation:
1001            user = self.allocation[aid].get('user', None)
1002            self.allocation[aid]['experiment'] = ename
1003            self.allocation[aid]['nonce'] = nonce_experiment
1004            self.allocation[aid]['log'] = [ ]
1005            # Create a logger that logs to the experiment's state object as
1006            # well as to the main log file.
1007            alloc_log = logging.getLogger('fedd.access.%s' % ename)
1008            h = logging.StreamHandler(
1009                    list_log.list_log(self.allocation[aid]['log']))
1010            # XXX: there should be a global one of these rather than
1011            # repeating the code.
1012            h.setFormatter(logging.Formatter(
1013                "%(asctime)s %(name)s %(message)s",
1014                        '%d %b %y %H:%M:%S'))
1015            alloc_log.addHandler(h)
1016            self.write_state()
1017        self.state_lock.release()
1018
1019        if not user:
1020            raise service_error(service_error.internal, 
1021                    "Can't find creation user for %s" %aid)
1022
1023        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
1024
1025    def decorate_topology(self, info, t):
1026        """
1027        Copy the physical mapping and status onto the topology.  Used by
1028        StartSegment and InfoSegment
1029        """
1030        def add_new(ann, attr):
1031            for a in ann:
1032                if a not in attr: attr.append(a)
1033
1034        # Copy the assigned names into the return topology
1035        for e in t.elements:
1036            if isinstance(e, topdl.Computer):
1037                if not self.create_debug:
1038                    if e.name in info.node:
1039                        add_new(("%s%s" % (info.node[e.name][0], self.domain),),
1040                                e.localname)
1041                        e.status = info.node[e.name][1]
1042                else:
1043                    # Simple debugging assignment
1044                    add_new(("node%d%s" % (i, self.domain),), e.localname)
1045                    e.status = 'active'
1046                    add_new(('testop1', 'testop2'), e.operation)
1047                    i += 1
1048
1049
1050    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
1051        """
1052        Store key bits of experiment state in the global repository, including
1053        the response that may need to be replayed, and return the response.
1054        """
1055        i = 0
1056        t = topo.clone()
1057        self.decorate_topology(starter, t)
1058        # Grab the log (this is some anal locking, but better safe than
1059        # sorry)
1060        self.state_lock.acquire()
1061        logv = "".join(self.allocation[aid]['log'])
1062        # It's possible that the StartSegment call gets retried (!).
1063        # if the 'started' key is in the allocation, we'll return it rather
1064        # than redo the setup.
1065        self.allocation[aid]['started'] = { 
1066                'allocID': alloc_id,
1067                'allocationLog': logv,
1068                'segmentdescription': { 
1069                    'topdldescription': t.to_dict()
1070                    },
1071                'proof': proof.to_dict(),
1072                }
1073        self.allocation[aid]['topo'] = t
1074        retval = copy.copy(self.allocation[aid]['started'])
1075        self.write_state()
1076        self.state_lock.release()
1077        return retval
1078   
1079    # End of StartSegment support routines
1080
1081    def StartSegment(self, req, fid):
1082        err = None  # Any service_error generated after tmpdir is created
1083        rv = None   # Return value from segment creation
1084
1085        try:
1086            req = req['StartSegmentRequestBody']
1087            auth_attr = req['allocID']['fedid']
1088            topref = req['segmentdescription']['topdldescription']
1089        except KeyError:
1090            raise service_error(server_error.req, "Badly formed request")
1091
1092        connInfo = req.get('connection', [])
1093        services = req.get('service', [])
1094        aid = "%s" % auth_attr
1095        attrs = req.get('fedAttr', [])
1096
1097        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1098                with_proof=True)
1099        if not access_ok:
1100            raise service_error(service_error.access, "Access denied")
1101        else:
1102            # See if this is a replay of an earlier succeeded StartSegment -
1103            # sometimes SSL kills 'em.  If so, replay the response rather than
1104            # redoing the allocation.
1105            self.state_lock.acquire()
1106            retval = self.allocation[aid].get('started', None)
1107            self.state_lock.release()
1108            if retval:
1109                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1110                        "replaying response")
1111                return retval
1112
1113        # A new request.  Do it.
1114
1115        if topref: topo = topdl.Topology(**topref)
1116        else:
1117            raise service_error(service_error.req, 
1118                    "Request missing segmentdescription'")
1119       
1120        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1121        try:
1122            tmpdir = tempfile.mkdtemp(prefix="access-")
1123            softdir = "%s/software" % tmpdir
1124            os.mkdir(softdir)
1125        except EnvironmentError:
1126            raise service_error(service_error.internal, "Cannot create tmp dir")
1127
1128        # Try block alllows us to clean up temporary files.
1129        try:
1130            self.retrieve_software(topo, certfile, softdir)
1131            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1132                    self.initialize_experiment_info(attrs, aid, 
1133                            certfile, tmpdir)
1134
1135            if '/' in proj: proj, gid = proj.split('/')
1136            else: gid = None
1137
1138
1139            # Set up userconf and seer if needed
1140            self.configure_userconf(services, tmpdir)
1141            self.configure_seer_services(services, topo, softdir)
1142            # Get and send synch store variables
1143            self.export_store_info(certfile, proj, ename, connInfo)
1144            self.import_store_info(certfile, connInfo)
1145
1146            expfile = "%s/experiment.tcl" % tmpdir
1147
1148            self.generate_portal_configs(topo, pubkey_base, 
1149                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1150            self.generate_ns2(topo, expfile, 
1151                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1152
1153            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1154                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1155                    cert=self.xmlrpc_cert)
1156            rv = starter(self, ename, proj, user, expfile, tmpdir, gid=gid)
1157        except service_error, e:
1158            err = e
1159        except:
1160            t, v, st = sys.exc_info()
1161            err = service_error(service_error.internal, "%s: %s" % \
1162                    (v, traceback.extract_tb(st)))
1163
1164        # Walk up tmpdir, deleting as we go
1165        if self.cleanup: self.remove_dirs(tmpdir)
1166        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1167
1168        if rv:
1169            return self.finalize_experiment(starter, topo, aid, req['allocID'],
1170                    proof)
1171        elif err:
1172            raise service_error(service_error.federant,
1173                    "Swapin failed: %s" % err)
1174        else:
1175            raise service_error(service_error.federant, "Swapin failed")
1176
1177    def TerminateSegment(self, req, fid):
1178        try:
1179            req = req['TerminateSegmentRequestBody']
1180        except KeyError:
1181            raise service_error(server_error.req, "Badly formed request")
1182
1183        auth_attr = req['allocID']['fedid']
1184        aid = "%s" % auth_attr
1185        attrs = req.get('fedAttr', [])
1186
1187        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1188                with_proof=True)
1189        if not access_ok:
1190            raise service_error(service_error.access, "Access denied")
1191
1192        self.state_lock.acquire()
1193        if aid in self.allocation:
1194            proj = self.allocation[aid].get('project', None)
1195            if not proj: 
1196                proj = self.allocation[aid].get('sproject', None)
1197            user = self.allocation[aid].get('user', None)
1198            ename = self.allocation[aid].get('experiment', None)
1199            nonce = self.allocation[aid].get('nonce', False)
1200        else:
1201            proj = None
1202            user = None
1203            ename = None
1204            nonce = False
1205        self.state_lock.release()
1206
1207        if not proj:
1208            raise service_error(service_error.internal, 
1209                    "Can't find project for %s" % aid)
1210        else:
1211            if '/' in proj: proj, gid = proj.split('/')
1212            else: gid = None
1213
1214        if not user:
1215            raise service_error(service_error.internal, 
1216                    "Can't find creation user for %s" % aid)
1217        if not ename:
1218            raise service_error(service_error.internal, 
1219                    "Can't find experiment name for %s" % aid)
1220        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1221                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1222        stopper(self, user, proj, ename, gid, nonce)
1223        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
1224
1225    def InfoSegment(self, req, fid):
1226        try:
1227            req = req['InfoSegmentRequestBody']
1228        except KeyError:
1229            raise service_error(server_error.req, "Badly formed request")
1230
1231        auth_attr = req['allocID']['fedid']
1232        aid = "%s" % auth_attr
1233
1234        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1235                with_proof=True)
1236        if not access_ok:
1237            raise service_error(service_error.access, "Access denied")
1238
1239        self.state_lock.acquire()
1240        if aid in self.allocation:
1241            topo = self.allocation[aid].get('topo', None)
1242            if topo: topo = topo.clone()
1243            proj = self.allocation[aid].get('project', None)
1244            if not proj: 
1245                proj = self.allocation[aid].get('sproject', None)
1246            user = self.allocation[aid].get('user', None)
1247            ename = self.allocation[aid].get('experiment', None)
1248        else:
1249            proj = None
1250            user = None
1251            ename = None
1252            topo = None
1253        self.state_lock.release()
1254
1255        if not proj:
1256            raise service_error(service_error.internal, 
1257                    "Can't find project for %s" % aid)
1258        else:
1259            if '/' in proj: proj, gid = proj.split('/')
1260            else: gid = None
1261
1262        if not user:
1263            raise service_error(service_error.internal, 
1264                    "Can't find creation user for %s" % aid)
1265        if not ename:
1266            raise service_error(service_error.internal, 
1267                    "Can't find experiment name for %s" % aid)
1268        info = self.info_segment(keyfile=self.ssh_privkey_file,
1269                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1270        info(self, user, proj, ename)
1271        self.decorate_topology(info, topo)
1272        return { 
1273                'allocID': req['allocID'], 
1274                'segmentdescription': 
1275                    { 'topdldescription' : topo.to_dict() },
1276                'proof': proof.to_dict(),
1277                }
1278
1279    def OperationSegment(self, req, fid):
1280        def get_pname(e):
1281            """
1282            Get the physical name of a node
1283            """
1284            if e.localname:
1285                return re.sub('\..*','', e.localname[0])
1286            else:
1287                return None
1288
1289        try:
1290            req = req['OperationSegmentRequestBody']
1291        except KeyError:
1292            raise service_error(server_error.req, "Badly formed request")
1293
1294        auth_attr = req['allocID']['fedid']
1295        aid = "%s" % auth_attr
1296
1297        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
1298                with_proof=True)
1299        if not access_ok:
1300            raise service_error(service_error.access, "Access denied")
1301
1302        op = req.get('operation', None)
1303        targets = req.get('target', None)
1304        params = req.get('parameter', None)
1305
1306        if op is None :
1307            raise service_error(service_error.req, "missing operation")
1308        elif targets is None:
1309            raise service_error(service_error.req, "no targets")
1310
1311        if aid in self.allocation:
1312            topo = self.allocation[aid].get('topo', None)
1313            if topo: topo = topo.clone()
1314        else:
1315            topo = None
1316
1317        targets = copy.copy(targets)
1318        ptargets = { }
1319        for e in topo.elements:
1320            if isinstance(e, topdl.Computer):
1321                if e.name in targets:
1322                    targets.remove(e.name)
1323                    pn = get_pname(e)
1324                    if pn: ptargets[e.name] = pn
1325
1326        status = [ operation_status(t, operation_status.no_target) \
1327                for t in targets]
1328
1329        ops = self.operation_segment(keyfile=self.ssh_privkey_file,
1330                debug=self.create_debug, boss=self.boss, 
1331                cert=self.xmlrpc_cert)
1332        ops(self, op, ptargets, params)
1333       
1334        status.extend(ops.status)
1335
1336        return { 
1337                'allocID': req['allocID'], 
1338                'status': [s.to_dict() for s in status],
1339                'proof': proof.to_dict(),
1340                }
Note: See TracBrowser for help on using the repository browser.