source: fedd/federation/emulab_access.py @ 2c1fd21

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 2c1fd21 was 2c1fd21, checked in by Ted Faber <faber@…>, 14 years ago

Importing is a relatively access semantics free endeavour, but export needs to be in each access controller.

  • Property mode set to 100644
File size: 37.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.boss = config.get("access", "boss")
62        self.ops = config.get("access", "ops")
63        self.domain = config.get("access", "domain")
64        self.fileserver = config.get("access", "fileserver")
65        self.eventserver = config.get("access", "eventserver")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        if config.has_option("access", "accessdb"):
108            self.read_access(config.get("access", "accessdb"))
109
110        if not self.local_seer_image or not self.local_seer_software:
111            self.exports.discard('local_seer_control')
112            self.exports.discard('seer_master')
113
114        if not self.local_seer_start:
115            self.exports.discard('local_seer_control')
116
117        if not self.seer_master_start:
118            self.exports.discard('seer_master')
119
120        tb = config.get('access', 'testbed')
121        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
122        else: self.testbed = [ ]
123
124        if config.has_option("access", "accessdb"):
125            self.read_access(config.get("access", "accessdb"), 
126                    self.make_access_project)
127
128        # read_state in the base_class
129        self.state_lock.acquire()
130        for a  in ('allocation', 'projects', 'keys', 'types'):
131            if a not in self.state:
132                self.state[a] = { }
133        self.allocation = self.state['allocation']
134        self.projects = self.state['projects']
135        self.keys = self.state['keys']
136        self.types = self.state['types']
137        # Add the ownership attributes to the authorizer.  Note that the
138        # indices of the allocation dict are strings, but the attributes are
139        # fedids, so there is a conversion.
140        for k in self.allocation.keys():
141            for o in self.allocation[k].get('owners', []):
142                self.auth.set_attribute(o, fedid(hexstr=k))
143            if self.allocation[k].has_key('userconfig'):
144                sfid = self.allocation[k]['userconfig']
145                fid = fedid(hexstr=sfid)
146                self.auth.set_attribute(fid, "/%s" % sfid)
147        self.state_lock.release()
148
149
150        self.soap_services = {\
151            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
152            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
153            'StartSegment': soap_handler("StartSegment", self.StartSegment),
154            'TerminateSegment': soap_handler("TerminateSegment",
155                self.TerminateSegment),
156            }
157        self.xmlrpc_services =  {\
158            'RequestAccess': xmlrpc_handler('RequestAccess',
159                self.RequestAccess),
160            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
161                self.ReleaseAccess),
162            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': xmlrpc_handler('TerminateSegment',
164                self.TerminateSegment),
165            }
166
167        self.call_SetValue = service_caller('SetValue')
168        self.call_GetValue = service_caller('GetValue', log=self.log)
169
170        if not config.has_option("allocate", "uri"):
171            self.allocate_project = \
172                allocate_project_local(config, auth)
173        else:
174            self.allocate_project = \
175                allocate_project_remote(config, auth)
176
177
178        # If the project allocator exports services, put them in this object's
179        # maps so that classes that instantiate this can call the services.
180        self.soap_services.update(self.allocate_project.soap_services)
181        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
182
183    @staticmethod
184    def make_access_project(str):
185        """
186        Convert a string of the form (id[:resources:resouces], id, id) into an
187        access_project.  This is called by read_access to convert to local
188        attributes.  It returns a tuple of the form (project, user, user) where
189        users may be names or fedids.
190        """
191        def parse_name(n):
192            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
193            else: return n
194
195        str = str.strip()
196        if str.startswith('(') and str.endswith(')'):
197            str = str[1:-1]
198            names = [ s.strip() for s in str.split(",")]
199            if len(names) > 3:
200                raise self.parse_error("More than three fields in name")
201            first = names[0].split(":")
202            if first == 'fedid:':
203                del first[0]
204                first[0] = fedid(hexstr=first[0])
205            names[0] = access_project(first[0], first[1:])
206
207            for i in range(1,2):
208                names[i] = parse_name(names[i])
209
210            return tuple(names)
211        else:
212            raise self.parse_error('Bad mapping (unbalanced parens)')
213
214
215    # RequestAccess support routines
216
217    def lookup_access(self, req, fid):
218        """
219        Look up the local access control information mapped to this fedid and
220        credentials.  In this case it is a (project, create_user, access_user)
221        triple, and a triple of three booleans indicating which, if any will
222        need to be dynamically created.  Finally a list of owners for that
223        allocation is returned.
224
225        lookup_access_base pulls the first triple out, and it is parsed by this
226        routine into the boolean map.  Owners is always the controlling fedid.
227        """
228        # Return values
229        rp = access_project(None, ())
230        ru = None
231        # This maps a valid user to the Emulab projects and users to use
232        found, match = self.lookup_access_base(req, fid)
233        tb, project, user = match
234       
235        if found == None:
236            raise service_error(service_error.access,
237                    "Access denied - cannot map access")
238
239        # resolve <dynamic> and <same> in found
240        dyn_proj = False
241        dyn_create_user = False
242        dyn_service_user = False
243
244        if found[0].name == "<same>":
245            if project != None:
246                rp.name = project
247            else : 
248                raise service_error(\
249                        service_error.server_config,
250                        "Project matched <same> when no project given")
251        elif found[0].name == "<dynamic>":
252            rp.name = None
253            dyn_proj = True
254        else:
255            rp.name = found[0].name
256        rp.node_types = found[0].node_types;
257
258        if found[1] == "<same>":
259            if user_match == "<any>":
260                if user != None: rcu = user[0]
261                else: raise service_error(\
262                        service_error.server_config,
263                        "Matched <same> on anonymous request")
264            else:
265                rcu = user_match
266        elif found[1] == "<dynamic>":
267            rcu = None
268            dyn_create_user = True
269        else:
270            rcu = found[1]
271       
272        if found[2] == "<same>":
273            if user_match == "<any>":
274                if user != None: rsu = user[0]
275                else: raise service_error(\
276                        service_error.server_config,
277                        "Matched <same> on anonymous request")
278            else:
279                rsu = user_match
280        elif found[2] == "<dynamic>":
281            rsu = None
282            dyn_service_user = True
283        else:
284            rsu = found[2]
285
286        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
287                [ fid ]
288
289    def do_project_allocation(self, dyn, project, user):
290        """
291        Call the project allocation routines and return the info.
292        """
293        if dyn: 
294            # Compose the dynamic project request
295            # (only dynamic, dynamic currently allowed)
296            preq = { 'AllocateProjectRequestBody': \
297                        { 'project' : {\
298                            'user': [ \
299                            { \
300                                'access': [ { 
301                                    'sshPubkey': self.ssh_pubkey_file } ],
302                                 'role': "serviceAccess",\
303                            }, \
304                            { \
305                                'access': [ { 
306                                    'sshPubkey': self.ssh_pubkey_file } ],
307                                 'role': "experimentCreation",\
308                            }, \
309                            ], \
310                            }\
311                        }\
312                    }
313            return self.allocate_project.dynamic_project(preq)
314        else:
315            preq = {'StaticProjectRequestBody' : \
316                    { 'project': \
317                        { 'name' : { 'localname' : project },\
318                          'user' : [ \
319                            {\
320                                'userID': { 'localname' : user }, \
321                                'access': [ { 
322                                    'sshPubkey': self.ssh_pubkey_file } ],
323                                'role': 'experimentCreation'\
324                            },\
325                            {\
326                                'userID': { 'localname' : user}, \
327                                'access': [ { 
328                                    'sshPubkey': self.ssh_pubkey_file } ],
329                                'role': 'serviceAccess'\
330                            },\
331                        ]}\
332                    }\
333            }
334            return self.allocate_project.static_project(preq)
335
336    def save_project_state(self, aid, ap, dyn, owners):
337        """
338        Parse out and save the information relevant to the project created for
339        this experiment.  That info is largely in ap and owners.  dyn indicates
340        that the project was created dynamically.  Return the user and project
341        names.
342        """
343        self.state_lock.acquire()
344        self.allocation[aid] = { }
345        try:
346            pname = ap['project']['name']['localname']
347        except KeyError:
348            pname = None
349
350        if dyn:
351            if not pname:
352                self.state_lock.release()
353                raise service_error(service_error.internal,
354                        "Misformed allocation response?")
355            if pname in self.projects: self.projects[pname] += 1
356            else: self.projects[pname] = 1
357            self.allocation[aid]['project'] = pname
358        else:
359            # sproject is a static project associated with this allocation.
360            self.allocation[aid]['sproject'] = pname
361
362        self.allocation[aid]['keys'] = [ ]
363
364        try:
365            for u in ap['project']['user']:
366                uname = u['userID']['localname']
367                if u['role'] == 'experimentCreation':
368                    self.allocation[aid]['user'] = uname
369                for k in [ k['sshPubkey'] for k in u['access'] \
370                        if k.has_key('sshPubkey') ]:
371                    kv = "%s:%s" % (uname, k)
372                    if self.keys.has_key(kv): self.keys[kv] += 1
373                    else: self.keys[kv] = 1
374                    self.allocation[aid]['keys'].append((uname, k))
375        except KeyError:
376            self.state_lock.release()
377            raise service_error(service_error.internal,
378                    "Misformed allocation response?")
379
380        self.allocation[aid]['owners'] = owners
381        self.write_state()
382        self.state_lock.release()
383        return (pname, uname)
384
385    # End of RequestAccess support routines
386
387    def RequestAccess(self, req, fid):
388        """
389        Handle the access request.  Proxy if not for us.
390
391        Parse out the fields and make the allocations or rejections if for us,
392        otherwise, assuming we're willing to proxy, proxy the request out.
393        """
394
395        def gateway_hardware(h):
396            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
397            else: return h
398
399        def get_export_project(svcs):
400            """
401            if the service requests includes one to export a project, return
402            that project.
403            """
404            rv = None
405            for s in svcs:
406                if s.get('name', '') == 'project_export' and \
407                        s.get('visibility', '') == 'export':
408                    if not rv: 
409                        for a in s.get('feddAttr', []):
410                            if a.get('attribute', '') == 'project' \
411                                    and 'value' in a:
412                                rv = a['value']
413                    else:
414                        raise service_error(service_error, access, 
415                                'Requesting multiple project exports is ' + \
416                                        'not supported');
417            return rv
418
419        # The dance to get into the request body
420        if req.has_key('RequestAccessRequestBody'):
421            req = req['RequestAccessRequestBody']
422        else:
423            raise service_error(service_error.req, "No request!?")
424
425
426        found, dyn, owners = self.lookup_access(req, fid)
427        ap = None
428
429        # if this includes a project export request and the exported
430        # project is not the access project, access denied.
431        if 'service' in req:
432            ep = get_export_project(req['service'])
433            if ep and ep != found[0].name:
434                raise service_error(service_error.access,
435                        "Cannot export %s" % ep)
436
437        if self.ssh_pubkey_file:
438            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
439        else:
440            raise service_error(service_error.internal, 
441                    "SSH access parameters required")
442        # keep track of what's been added
443        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
444        aid = unicode(allocID)
445
446        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
447
448        services, svc_state = self.export_services(req.get('service',[]),
449                pname, uname)
450        self.state_lock.acquire()
451        # Store services state in global state
452        for k, v in svc_state.items():
453            self.allocation[aid][k] = v
454        self.write_state()
455        self.state_lock.release()
456        # Give the owners the right to change this allocation
457        for o in owners:
458            self.auth.set_attribute(o, allocID)
459        try:
460            f = open("%s/%s.pem" % (self.certdir, aid), "w")
461            print >>f, alloc_cert
462            f.close()
463        except EnvironmentError, e:
464            raise service_error(service_error.internal, 
465                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
466        resp = self.build_access_response({ 'fedid': allocID } ,
467                ap, services)
468        return resp
469
470    def do_release_project(self, del_project, del_users, del_types):
471        """
472        If a project and users has to be deleted, make the call.
473        """
474        msg = { 'project': { }}
475        if del_project:
476            msg['project']['name']= {'localname': del_project}
477        users = [ ]
478        for u in del_users.keys():
479            users.append({ 'userID': { 'localname': u },\
480                'access' :  \
481                        [ {'sshPubkey' : s } for s in del_users[u]]\
482            })
483        if users: 
484            msg['project']['user'] = users
485        if len(del_types) > 0:
486            msg['resources'] = { 'node': \
487                    [ {'hardware': [ h ] } for h in del_types ]\
488                }
489        if self.allocate_project.release_project:
490            msg = { 'ReleaseProjectRequestBody' : msg}
491            self.allocate_project.release_project(msg)
492
493    def ReleaseAccess(self, req, fid):
494        # The dance to get into the request body
495        if req.has_key('ReleaseAccessRequestBody'):
496            req = req['ReleaseAccessRequestBody']
497        else:
498            raise service_error(service_error.req, "No request!?")
499
500        try:
501            if req['allocID'].has_key('localname'):
502                auth_attr = aid = req['allocID']['localname']
503            elif req['allocID'].has_key('fedid'):
504                aid = unicode(req['allocID']['fedid'])
505                auth_attr = req['allocID']['fedid']
506            else:
507                raise service_error(service_error.req,
508                        "Only localnames and fedids are understood")
509        except KeyError:
510            raise service_error(service_error.req, "Badly formed request")
511
512        self.log.debug("[access] deallocation requested for %s", aid)
513        if not self.auth.check_attribute(fid, auth_attr):
514            self.log.debug("[access] deallocation denied for %s", aid)
515            raise service_error(service_error.access, "Access Denied")
516
517        # If we know this allocation, reduce the reference counts and
518        # remove the local allocations.  Otherwise report an error.  If
519        # there is an allocation to delete, del_users will be a dictonary
520        # of sets where the key is the user that owns the keys in the set.
521        # We use a set to avoid duplicates.  del_project is just the name
522        # of any dynamic project to delete.  We're somewhat lazy about
523        # deleting authorization attributes.  Having access to something
524        # that doesn't exist isn't harmful.
525        del_users = { }
526        del_project = None
527        del_types = set()
528
529        self.state_lock.acquire()
530        if aid in self.allocation:
531            self.log.debug("Found allocation for %s" %aid)
532            for k in self.allocation[aid]['keys']:
533                kk = "%s:%s" % k
534                self.keys[kk] -= 1
535                if self.keys[kk] == 0:
536                    if not del_users.has_key(k[0]):
537                        del_users[k[0]] = set()
538                    del_users[k[0]].add(k[1])
539                    del self.keys[kk]
540
541            if 'project' in self.allocation[aid]:
542                pname = self.allocation[aid]['project']
543                self.projects[pname] -= 1
544                if self.projects[pname] == 0:
545                    del_project = pname
546                    del self.projects[pname]
547
548            if 'types' in self.allocation[aid]:
549                for t in self.allocation[aid]['types']:
550                    self.types[t] -= 1
551                    if self.types[t] == 0:
552                        if not del_project: del_project = t[0]
553                        del_types.add(t[1])
554                        del self.types[t]
555
556            del self.allocation[aid]
557            self.write_state()
558            self.state_lock.release()
559            # If we actually have resources to deallocate, prepare the call.
560            if del_project or del_users:
561                self.do_release_project(del_project, del_users, del_types)
562            # And remove the access cert
563            cf = "%s/%s.pem" % (self.certdir, aid)
564            self.log.debug("Removing %s" % cf)
565            os.remove(cf)
566            return { 'allocID': req['allocID'] } 
567        else:
568            self.state_lock.release()
569            raise service_error(service_error.req, "No such allocation")
570
571    # These are subroutines for StartSegment
572    def generate_ns2(self, topo, expfn, softdir, connInfo):
573        """
574        Convert topo into an ns2 file, decorated with appropriate commands for
575        the particular testbed setup.  Convert all requests for software, etc
576        to point at the staged copies on this testbed and add the federation
577        startcommands.
578        """
579        class dragon_commands:
580            """
581            Functor to spit out approrpiate dragon commands for nodes listed in
582            the connectivity description.  The constructor makes a dict mapping
583            dragon nodes to their parameters and the __call__ checks each
584            element in turn for membership.
585            """
586            def __init__(self, map):
587                self.node_info = map
588
589            def __call__(self, e):
590                s = ""
591                if isinstance(e, topdl.Computer):
592                    if self.node_info.has_key(e.name):
593                        info = self.node_info[e.name]
594                        for ifname, vlan, type in info:
595                            for i in e.interface:
596                                if i.name == ifname:
597                                    addr = i.get_attribute('ip4_address')
598                                    subs = i.substrate[0]
599                                    break
600                            else:
601                                raise service_error(service_error.internal,
602                                        "No interface %s on element %s" % \
603                                                (ifname, e.name))
604                            # XXX: do netmask right
605                            if type =='link':
606                                s = ("tb-allow-external ${%s} " + \
607                                        "dragonportal ip %s vlan %s " + \
608                                        "netmask 255.255.255.0\n") % \
609                                        (e.name, addr, vlan)
610                            elif type =='lan':
611                                s = ("tb-allow-external ${%s} " + \
612                                        "dragonportal " + \
613                                        "ip %s vlan %s usurp %s\n") % \
614                                        (e.name, addr, vlan, subs)
615                            else:
616                                raise service_error(service_error_internal,
617                                        "Unknown DRAGON type %s" % type)
618                return s
619
620        class not_dragon:
621            """
622            Return true if a node is in the given map of dragon nodes.
623            """
624            def __init__(self, map):
625                self.nodes = set(map.keys())
626
627            def __call__(self, e):
628                return e.name not in self.nodes
629
630        # Main line of generate_ns2
631        t = topo.clone()
632
633        # Create the map of nodes that need direct connections (dragon
634        # connections) from the connInfo
635        dragon_map = { }
636        for i in [ i for i in connInfo if i['type'] == 'transit']:
637            for a in i.get('fedAttr', []):
638                if a['attribute'] == 'vlan_id':
639                    vlan = a['value']
640                    break
641            else:
642                raise service_error(service_error.internal, "No vlan tag")
643            members = i.get('member', [])
644            if len(members) > 1: type = 'lan'
645            else: type = 'link'
646
647            try:
648                for m in members:
649                    if m['element'] in dragon_map:
650                        dragon_map[m['element']].append(( m['interface'], 
651                            vlan, type))
652                    else:
653                        dragon_map[m['element']] = [( m['interface'], 
654                            vlan, type),]
655            except KeyError:
656                raise service_error(service_error.req,
657                        "Missing connectivity info")
658
659        # Weed out the things we aren't going to instantiate: Segments, portal
660        # substrates, and portal interfaces.  (The copy in the for loop allows
661        # us to delete from e.elements in side the for loop).  While we're
662        # touching all the elements, we also adjust paths from the original
663        # testbed to local testbed paths and put the federation commands into
664        # the start commands
665        for e in [e for e in t.elements]:
666            if isinstance(e, topdl.Segment):
667                t.elements.remove(e)
668            if isinstance(e, topdl.Computer):
669                self.add_kit(e, self.federation_software)
670                if e.get_attribute('portal') and self.portal_startcommand:
671                    # Add local portal support software
672                    self.add_kit(e, self.portal_software)
673                    # Portals never have a user-specified start command
674                    e.set_attribute('startup', self.portal_startcommand)
675                elif self.node_startcommand:
676                    if e.get_attribute('startup'):
677                        e.set_attribute('startup', "%s \\$USER '%s'" % \
678                                (self.node_startcommand, 
679                                    e.get_attribute('startup')))
680                    else:
681                        e.set_attribute('startup', self.node_startcommand)
682
683                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
684                # Remove portal interfaces that do not connect to DRAGON
685                e.interface = [i for i in e.interface \
686                        if not i.get_attribute('portal') or i.name in dinf ]
687            # Fix software paths
688            for s in getattr(e, 'software', []):
689                s.location = re.sub("^.*/", softdir, s.location)
690
691        t.substrates = [ s.clone() for s in t.substrates ]
692        t.incorporate_elements()
693
694        # Customize the ns2 output for local portal commands and images
695        filters = []
696
697        if self.dragon_endpoint:
698            add_filter = not_dragon(dragon_map)
699            filters.append(dragon_commands(dragon_map))
700        else:
701            add_filter = None
702
703        if self.portal_command:
704            filters.append(topdl.generate_portal_command_filter(
705                self.portal_command, add_filter=add_filter))
706
707        if self.portal_image:
708            filters.append(topdl.generate_portal_image_filter(
709                self.portal_image))
710
711        if self.portal_type:
712            filters.append(topdl.generate_portal_hardware_filter(
713                self.portal_type))
714
715        # Convert to ns and write it out
716        expfile = topdl.topology_to_ns2(t, filters)
717        try:
718            f = open(expfn, "w")
719            print >>f, expfile
720            f.close()
721        except EnvironmentError:
722            raise service_error(service_error.internal,
723                    "Cannot write experiment file %s: %s" % (expfn,e))
724
725    def export_store_info(self, cf, proj, ename, connInfo):
726        """
727        For the export requests in the connection info, install the peer names
728        at the experiment controller via SetValue calls.
729        """
730
731        for c in connInfo:
732            for p in [ p for p in c.get('parameter', []) \
733                    if p.get('type', '') == 'output']:
734
735                if p.get('name', '') == 'peer':
736                    k = p.get('key', None)
737                    surl = p.get('store', None)
738                    if surl and k and k.index('/') != -1:
739                        value = "%s.%s.%s%s" % \
740                                (k[k.index('/')+1:], ename, proj, self.domain)
741                        req = { 'name': k, 'value': value }
742                        self.log.debug("Setting %s to %s on %s" % \
743                                (k, value, surl))
744                        self.call_SetValue(surl, req, cf)
745                    else:
746                        self.log.error("Bad export request: %s" % p)
747                elif p.get('name', '') == 'ssh_port':
748                    k = p.get('key', None)
749                    surl = p.get('store', None)
750                    if surl and k:
751                        req = { 'name': k, 'value': self.ssh_port }
752                        self.log.debug("Setting %s to %s on %s" % \
753                                (k, self.ssh_port, surl))
754                        self.call_SetValue(surl, req, cf)
755                    else:
756                        self.log.error("Bad export request: %s" % p)
757                else:
758                    self.log.error("Unknown export parameter: %s" % \
759                            p.get('name'))
760                    continue
761
762
763    def configure_userconf(self, services):
764        """
765        If the userconf service was imported, collect the configuration data.
766        """
767        for s in services:
768            s_name = s.get('name', '')
769            s_vis = s.get('visibility','')
770            if s_name  == 'userconfig' and s_vis == 'import':
771                # Collect ther server and certificate info.
772                u = s.get('server', None)
773                for a in s.get('fedAttr', []):
774                    if a.get('attribute',"") == 'cert':
775                        cert = a.get('value', None)
776                        break
777                else:
778                    cert = None
779
780                if cert:
781                    # Make a temporary certificate file for get_url.  The
782                    # finally clause removes it whether something goes
783                    # wrong (including an exception from get_url) or not.
784                    try:
785                        tfos, tn = tempfile.mkstemp(suffix=".pem")
786                        tf = os.fdopen(tfos, 'w')
787                        print >>tf, cert
788                        tf.close()
789                        self.log.debug("Getting userconf info: %s" % u)
790                        get_url(u, tn, tmpdir, "userconf")
791                        self.log.debug("Got userconf info: %s" % u)
792                    except EnvironmentError, e:
793                        raise service_error(service.error.internal, 
794                                "Cannot create temp file for " + 
795                                "userconfig certificates: %s e")
796                    except:
797                        t, v, st = sys.exc_info()
798                        raise service_error(service_error.internal,
799                                "Error retrieving %s: %s" % (s, v))
800                    finally:
801                        if tn: os.remove(tn)
802                else:
803                    raise service_error(service_error.req,
804                            "No certificate for retreiving userconfig")
805                break
806
807    def add_seer_node(self, topo, name, startup):
808        """
809        Add a seer node to the given topology, with the startup command passed
810        in.  Used by configure seer_services.
811        """
812        c_node = topdl.Computer(
813                name=name, 
814                os= topdl.OperatingSystem(
815                    attribute=[
816                    { 'attribute': 'osid', 
817                        'value': self.local_seer_image },
818                    ]),
819                attribute=[
820                    { 'attribute': 'startup', 'value': startup },
821                    ]
822                )
823        self.add_kit(c_node, self.local_seer_software)
824        topo.elements.append(c_node)
825
826    def configure_seer_services(self, services, topo, softdir):
827        """
828        Make changes to the topology required for the seer requests being made.
829        Specifically, add any control or master nodes required and set up the
830        start commands on the nodes to interconnect them.
831        """
832        local_seer = False      # True if we need to add a control node
833        collect_seer = False    # True if there is a seer-master node
834        seer_master= False      # True if we need to add the seer-master
835        for s in services:
836            s_name = s.get('name', '')
837            s_vis = s.get('visibility','')
838
839            if s_name  == 'local_seer_control' and s_vis == 'export':
840                local_seer = True
841            elif s_name == 'seer_master':
842                if s_vis == 'import':
843                    collect_seer = True
844                elif s_vis == 'export':
845                    seer_master = True
846       
847        # We've got the whole picture now, so add nodes if needed and configure
848        # them to interconnect properly.
849        if local_seer or seer_master:
850            # Copy local seer control node software to the tempdir
851            for l, f in self.local_seer_software:
852                base = os.path.basename(f)
853                copy_file(f, "%s/%s" % (softdir, base))
854        # If we're collecting seers somewhere the controllers need to talk to
855        # the master.  In testbeds that export the service, that will be a
856        # local node that we'll add below.  Elsewhere it will be the control
857        # portal that will port forward to the exporting master.
858        if local_seer:
859            if collect_seer:
860                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
861            else:
862                startup = self.local_seer_start
863            self.add_seer_node(topo, 'control', startup)
864        # If this is the seer master, add that node, too.
865        if seer_master:
866            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
867
868    def retrieve_software(self, topo, certfile, softdir):
869        """
870        Collect the software that nodes in the topology need loaded and stage
871        it locally.  This implies retrieving it from the experiment_controller
872        and placing it into softdir.  Certfile is used to prove that this node
873        has access to that data (it's the allocation/segment fedid).  Finally
874        local portal and federation software is also copied to the same staging
875        directory for simplicity - all software needed for experiment creation
876        is in softdir.
877        """
878        sw = set()
879        for e in topo.elements:
880            for s in getattr(e, 'software', []):
881                sw.add(s.location)
882        for s in sw:
883            self.log.debug("Retrieving %s" % s)
884            try:
885                get_url(s, certfile, softdir)
886            except:
887                t, v, st = sys.exc_info()
888                raise service_error(service_error.internal,
889                        "Error retrieving %s: %s" % (s, v))
890
891        # Copy local federation and portal node software to the tempdir
892        for s in (self.federation_software, self.portal_software):
893            for l, f in s:
894                base = os.path.basename(f)
895                copy_file(f, "%s/%s" % (softdir, base))
896
897
898    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
899        """
900        Gather common configuration files, retrieve or create an experiment
901        name and project name, and return the ssh_key filenames.  Create an
902        allocation log bound to the state log variable as well.
903        """
904        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
905        ename = None
906        pubkey_base = None
907        secretkey_base = None
908        proj = None
909        user = None
910        alloc_log = None
911
912        for a in attrs:
913            if a['attribute'] in configs:
914                try:
915                    self.log.debug("Retrieving %s from %s" % \
916                            (a['attribute'], a['value']))
917                    get_url(a['value'], certfile, tmpdir)
918                except:
919                    t, v, st = sys.exc_info()
920                    raise service_error(service_error.internal,
921                            "Error retrieving %s: %s" % (a.get('value', ""), v))
922            if a['attribute'] == 'ssh_pubkey':
923                pubkey_base = a['value'].rpartition('/')[2]
924            if a['attribute'] == 'ssh_secretkey':
925                secretkey_base = a['value'].rpartition('/')[2]
926            if a['attribute'] == 'experiment_name':
927                ename = a['value']
928
929        if not ename:
930            ename = ""
931            for i in range(0,5):
932                ename += random.choice(string.ascii_letters)
933            self.log.warn("No experiment name: picked one randomly: %s" \
934                    % ename)
935
936        if not pubkey_base:
937            raise service_error(service_error.req, 
938                    "No public key attribute")
939
940        if not secretkey_base:
941            raise service_error(service_error.req, 
942                    "No secret key attribute")
943
944        self.state_lock.acquire()
945        if aid in self.allocation:
946            proj = self.allocation[aid].get('project', None)
947            if not proj: 
948                proj = self.allocation[aid].get('sproject', None)
949            user = self.allocation[aid].get('user', None)
950            self.allocation[aid]['experiment'] = ename
951            self.allocation[aid]['log'] = [ ]
952            # Create a logger that logs to the experiment's state object as
953            # well as to the main log file.
954            alloc_log = logging.getLogger('fedd.access.%s' % ename)
955            h = logging.StreamHandler(
956                    list_log.list_log(self.allocation[aid]['log']))
957            # XXX: there should be a global one of these rather than
958            # repeating the code.
959            h.setFormatter(logging.Formatter(
960                "%(asctime)s %(name)s %(message)s",
961                        '%d %b %y %H:%M:%S'))
962            alloc_log.addHandler(h)
963            self.write_state()
964        self.state_lock.release()
965
966        if not proj:
967            raise service_error(service_error.internal, 
968                    "Can't find project for %s" %aid)
969
970        if not user:
971            raise service_error(service_error.internal, 
972                    "Can't find creation user for %s" %aid)
973
974        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
975
976    def finalize_experiment(self, starter, topo, aid, alloc_id):
977        """
978        Store key bits of experiment state in the global repository, including
979        the response that may need to be replayed, and return the response.
980        """
981        # Copy the assigned names into the return topology
982        embedding = [ ]
983        for n in starter.node:
984            embedding.append({ 
985                'toponame': n,
986                'physname': ["%s%s" %  (starter.node[n], self.domain)],
987                })
988        # Grab the log (this is some anal locking, but better safe than
989        # sorry)
990        self.state_lock.acquire()
991        logv = "".join(self.allocation[aid]['log'])
992        # It's possible that the StartSegment call gets retried (!).
993        # if the 'started' key is in the allocation, we'll return it rather
994        # than redo the setup.
995        self.allocation[aid]['started'] = { 
996                'allocID': alloc_id,
997                'allocationLog': logv,
998                'segmentdescription': { 
999                    'topdldescription': topo.clone().to_dict()
1000                    },
1001                'embedding': embedding
1002                }
1003        retval = copy.copy(self.allocation[aid]['started'])
1004        self.write_state()
1005        self.state_lock.release()
1006        return retval
1007
1008    def remove_dirs(self, dir):
1009        """
1010        Remove the directory tree and all files rooted at dir.  Log any errors,
1011        but continue.
1012        """
1013        self.log.debug("[removedirs]: removing %s" % dir)
1014        try:
1015            for path, dirs, files in os.walk(dir, topdown=False):
1016                for f in files:
1017                    os.remove(os.path.join(path, f))
1018                for d in dirs:
1019                    os.rmdir(os.path.join(path, d))
1020            os.rmdir(dir)
1021        except EnvironmentError, e:
1022            self.log.error("Error deleting directory tree in %s" % e);
1023   
1024    # End of StartSegment support routines
1025
1026    def StartSegment(self, req, fid):
1027        err = None  # Any service_error generated after tmpdir is created
1028        rv = None   # Return value from segment creation
1029
1030        try:
1031            req = req['StartSegmentRequestBody']
1032            auth_attr = req['allocID']['fedid']
1033            topref = req['segmentdescription']['topdldescription']
1034        except KeyError:
1035            raise service_error(server_error.req, "Badly formed request")
1036
1037        connInfo = req.get('connection', [])
1038        services = req.get('service', [])
1039        aid = "%s" % auth_attr
1040        attrs = req.get('fedAttr', [])
1041        if not self.auth.check_attribute(fid, auth_attr):
1042            raise service_error(service_error.access, "Access denied")
1043        else:
1044            # See if this is a replay of an earlier succeeded StartSegment -
1045            # sometimes SSL kills 'em.  If so, replay the response rather than
1046            # redoing the allocation.
1047            self.state_lock.acquire()
1048            retval = self.allocation[aid].get('started', None)
1049            self.state_lock.release()
1050            if retval:
1051                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1052                        "replaying response")
1053                return retval
1054
1055        # A new request.  Do it.
1056
1057        if topref: topo = topdl.Topology(**topref)
1058        else:
1059            raise service_error(service_error.req, 
1060                    "Request missing segmentdescription'")
1061       
1062        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1063        try:
1064            tmpdir = tempfile.mkdtemp(prefix="access-")
1065            softdir = "%s/software" % tmpdir
1066            os.mkdir(softdir)
1067        except EnvironmentError:
1068            raise service_error(service_error.internal, "Cannot create tmp dir")
1069
1070        # Try block alllows us to clean up temporary files.
1071        try:
1072            self.retrieve_software(topo, certfile, softdir)
1073            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1074                    self.initialize_experiment_info(attrs, aid, 
1075                            certfile, tmpdir)
1076
1077            # Set up userconf and seer if needed
1078            self.configure_userconf(services)
1079            self.configure_seer_services(services, topo, softdir)
1080            # Get and send synch store variables
1081            self.export_store_info(certfile, proj, ename, connInfo)
1082            self.import_store_info(certfile, connInfo)
1083
1084            expfile = "%s/experiment.tcl" % tmpdir
1085
1086            self.generate_portal_configs(topo, pubkey_base, 
1087                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1088            self.generate_ns2(topo, expfile, 
1089                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1090
1091            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1092                    debug=self.create_debug, log=alloc_log)
1093            rv = starter(self, ename, proj, user, expfile, tmpdir)
1094        except service_error, e:
1095            err = e
1096        except:
1097            t, v, st = sys.exc_info()
1098            err = service_error(service_error.internal, "%s: %s" % \
1099                    (v, traceback.extract_tb(st)))
1100
1101        # Walk up tmpdir, deleting as we go
1102        if self.cleanup: self.remove_dirs(tmpdir)
1103        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1104
1105        if rv:
1106            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1107        elif err:
1108            raise service_error(service_error.federant,
1109                    "Swapin failed: %s" % err)
1110        else:
1111            raise service_error(service_error.federant, "Swapin failed")
1112
1113    def TerminateSegment(self, req, fid):
1114        try:
1115            req = req['TerminateSegmentRequestBody']
1116        except KeyError:
1117            raise service_error(server_error.req, "Badly formed request")
1118
1119        auth_attr = req['allocID']['fedid']
1120        aid = "%s" % auth_attr
1121        attrs = req.get('fedAttr', [])
1122        if not self.auth.check_attribute(fid, auth_attr):
1123            raise service_error(service_error.access, "Access denied")
1124
1125        self.state_lock.acquire()
1126        if aid in self.allocation:
1127            proj = self.allocation[aid].get('project', None)
1128            if not proj: 
1129                proj = self.allocation[aid].get('sproject', None)
1130            user = self.allocation[aid].get('user', None)
1131            ename = self.allocation[aid].get('experiment', None)
1132        else:
1133            proj = None
1134            user = None
1135            ename = None
1136        self.state_lock.release()
1137
1138        if not proj:
1139            raise service_error(service_error.internal, 
1140                    "Can't find project for %s" % aid)
1141
1142        if not user:
1143            raise service_error(service_error.internal, 
1144                    "Can't find creation user for %s" % aid)
1145        if not ename:
1146            raise service_error(service_error.internal, 
1147                    "Can't find experiment name for %s" % aid)
1148        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1149                debug=self.create_debug)
1150        stopper(self, user, proj, ename)
1151        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.