source: fedd/federation/emulab_access.py @ 5bf359d

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5bf359d was 06cc65b, checked in by Ted Faber <faber@…>, 14 years ago

more refactoring - beaking code into smaller chunks for digestibility

  • Property mode set to 100644
File size: 36.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.boss = config.get("access", "boss")
62        self.ops = config.get("access", "ops")
63        self.domain = config.get("access", "domain")
64        self.fileserver = config.get("access", "fileserver")
65        self.eventserver = config.get("access", "eventserver")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        if config.has_option("access", "accessdb"):
108            self.read_access(config.get("access", "accessdb"))
109
110        if not self.local_seer_image or not self.local_seer_software:
111            self.exports.discard('local_seer_control')
112            self.exports.discard('seer_master')
113
114        if not self.local_seer_start:
115            self.exports.discard('local_seer_control')
116
117        if not self.seer_master_start:
118            self.exports.discard('seer_master')
119
120        tb = config.get('access', 'testbed')
121        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
122        else: self.testbed = [ ]
123
124        if config.has_option("access", "accessdb"):
125            self.read_access(config.get("access", "accessdb"), 
126                    self.make_access_project)
127
128        # read_state in the base_class
129        self.state_lock.acquire()
130        for a  in ('allocation', 'projects', 'keys', 'types'):
131            if a not in self.state:
132                self.state[a] = { }
133        self.allocation = self.state['allocation']
134        self.projects = self.state['projects']
135        self.keys = self.state['keys']
136        self.types = self.state['types']
137        # Add the ownership attributes to the authorizer.  Note that the
138        # indices of the allocation dict are strings, but the attributes are
139        # fedids, so there is a conversion.
140        for k in self.allocation.keys():
141            for o in self.allocation[k].get('owners', []):
142                self.auth.set_attribute(o, fedid(hexstr=k))
143            if self.allocation[k].has_key('userconfig'):
144                sfid = self.allocation[k]['userconfig']
145                fid = fedid(hexstr=sfid)
146                self.auth.set_attribute(fid, "/%s" % sfid)
147        self.state_lock.release()
148
149
150        self.soap_services = {\
151            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
152            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
153            'StartSegment': soap_handler("StartSegment", self.StartSegment),
154            'TerminateSegment': soap_handler("TerminateSegment",
155                self.TerminateSegment),
156            }
157        self.xmlrpc_services =  {\
158            'RequestAccess': xmlrpc_handler('RequestAccess',
159                self.RequestAccess),
160            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
161                self.ReleaseAccess),
162            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': xmlrpc_handler('TerminateSegment',
164                self.TerminateSegment),
165            }
166
167        self.call_SetValue = service_caller('SetValue')
168        self.call_GetValue = service_caller('GetValue', log=self.log)
169
170        if not config.has_option("allocate", "uri"):
171            self.allocate_project = \
172                allocate_project_local(config, auth)
173        else:
174            self.allocate_project = \
175                allocate_project_remote(config, auth)
176
177
178        # If the project allocator exports services, put them in this object's
179        # maps so that classes that instantiate this can call the services.
180        self.soap_services.update(self.allocate_project.soap_services)
181        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
182
183    @staticmethod
184    def make_access_project(str):
185        """
186        Convert a string of the form (id[:resources:resouces], id, id) into an
187        access_project.  This is called by read_access to convert to local
188        attributes.  It returns a tuple of the form (project, user, user) where
189        users may be names or fedids.
190        """
191        def parse_name(n):
192            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
193            else: return n
194
195        str = str.strip()
196        if str.startswith('(') and str.endswith(')'):
197            str = str[1:-1]
198            names = [ s.strip() for s in str.split(",")]
199            if len(names) > 3:
200                raise self.parse_error("More than three fields in name")
201            first = names[0].split(":")
202            if first == 'fedid:':
203                del first[0]
204                first[0] = fedid(hexstr=first[0])
205            names[0] = access_project(first[0], first[1:])
206
207            for i in range(1,2):
208                names[i] = parse_name(names[i])
209
210            return tuple(names)
211        else:
212            raise self.parse_error('Bad mapping (unbalanced parens)')
213
214
215    # RequestAccess support routines
216
217    def lookup_access(self, req, fid):
218        """
219        Look up the local access control information mapped to this fedid and
220        credentials.  In this case it is a (project, create_user, access_user)
221        triple, and a triple of three booleans indicating which, if any will
222        need to be dynamically created.  Finally a list of owners for that
223        allocation is returned.
224
225        lookup_access_base pulls the first triple out, and it is parsed by this
226        routine into the boolean map.  Owners is always the controlling fedid.
227        """
228        # Return values
229        rp = access_project(None, ())
230        ru = None
231        # This maps a valid user to the Emulab projects and users to use
232        found, match = self.lookup_access_base(req, fid)
233        tb, project, user = match
234       
235        if found == None:
236            raise service_error(service_error.access,
237                    "Access denied - cannot map access")
238
239        # resolve <dynamic> and <same> in found
240        dyn_proj = False
241        dyn_create_user = False
242        dyn_service_user = False
243
244        if found[0].name == "<same>":
245            if project != None:
246                rp.name = project
247            else : 
248                raise service_error(\
249                        service_error.server_config,
250                        "Project matched <same> when no project given")
251        elif found[0].name == "<dynamic>":
252            rp.name = None
253            dyn_proj = True
254        else:
255            rp.name = found[0].name
256        rp.node_types = found[0].node_types;
257
258        if found[1] == "<same>":
259            if user_match == "<any>":
260                if user != None: rcu = user[0]
261                else: raise service_error(\
262                        service_error.server_config,
263                        "Matched <same> on anonymous request")
264            else:
265                rcu = user_match
266        elif found[1] == "<dynamic>":
267            rcu = None
268            dyn_create_user = True
269        else:
270            rcu = found[1]
271       
272        if found[2] == "<same>":
273            if user_match == "<any>":
274                if user != None: rsu = user[0]
275                else: raise service_error(\
276                        service_error.server_config,
277                        "Matched <same> on anonymous request")
278            else:
279                rsu = user_match
280        elif found[2] == "<dynamic>":
281            rsu = None
282            dyn_service_user = True
283        else:
284            rsu = found[2]
285
286        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
287                [ fid ]
288
289    def do_project_allocation(self, dyn, project, user):
290        """
291        Call the project allocation routines and return the info.
292        """
293        if dyn: 
294            # Compose the dynamic project request
295            # (only dynamic, dynamic currently allowed)
296            preq = { 'AllocateProjectRequestBody': \
297                        { 'project' : {\
298                            'user': [ \
299                            { \
300                                'access': [ { 
301                                    'sshPubkey': self.ssh_pubkey_file } ],
302                                 'role': "serviceAccess",\
303                            }, \
304                            { \
305                                'access': [ { 
306                                    'sshPubkey': self.ssh_pubkey_file } ],
307                                 'role': "experimentCreation",\
308                            }, \
309                            ], \
310                            }\
311                        }\
312                    }
313            return self.allocate_project.dynamic_project(preq)
314        else:
315            preq = {'StaticProjectRequestBody' : \
316                    { 'project': \
317                        { 'name' : { 'localname' : project },\
318                          'user' : [ \
319                            {\
320                                'userID': { 'localname' : user }, \
321                                'access': [ { 
322                                    'sshPubkey': self.ssh_pubkey_file } ],
323                                'role': 'experimentCreation'\
324                            },\
325                            {\
326                                'userID': { 'localname' : user}, \
327                                'access': [ { 
328                                    'sshPubkey': self.ssh_pubkey_file } ],
329                                'role': 'serviceAccess'\
330                            },\
331                        ]}\
332                    }\
333            }
334            return self.allocate_project.static_project(preq)
335
336    def save_project_state(self, aid, ap, dyn, owners):
337        """
338        Parse out and save the information relevant to the project created for
339        this experiment.  That info is largely in ap and owners.  dyn indicates
340        that the project was created dynamically.  Return the user and project
341        names.
342        """
343        self.state_lock.acquire()
344        self.allocation[aid] = { }
345        try:
346            pname = ap['project']['name']['localname']
347        except KeyError:
348            pname = None
349
350        if dyn:
351            if not pname:
352                self.state_lock.release()
353                raise service_error(service_error.internal,
354                        "Misformed allocation response?")
355            if pname in self.projects: self.projects[pname] += 1
356            else: self.projects[pname] = 1
357            self.allocation[aid]['project'] = pname
358        else:
359            # sproject is a static project associated with this allocation.
360            self.allocation[aid]['sproject'] = pname
361
362        self.allocation[aid]['keys'] = [ ]
363
364        try:
365            for u in ap['project']['user']:
366                uname = u['userID']['localname']
367                if u['role'] == 'experimentCreation':
368                    self.allocation[aid]['user'] = uname
369                for k in [ k['sshPubkey'] for k in u['access'] \
370                        if k.has_key('sshPubkey') ]:
371                    kv = "%s:%s" % (uname, k)
372                    if self.keys.has_key(kv): self.keys[kv] += 1
373                    else: self.keys[kv] = 1
374                    self.allocation[aid]['keys'].append((uname, k))
375        except KeyError:
376            self.state_lock.release()
377            raise service_error(service_error.internal,
378                    "Misformed allocation response?")
379
380        self.allocation[aid]['owners'] = owners
381        self.write_state()
382        self.state_lock.release()
383        return (pname, uname)
384
385    # End of RequestAccess support routines
386
387    def RequestAccess(self, req, fid):
388        """
389        Handle the access request.  Proxy if not for us.
390
391        Parse out the fields and make the allocations or rejections if for us,
392        otherwise, assuming we're willing to proxy, proxy the request out.
393        """
394
395        def gateway_hardware(h):
396            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
397            else: return h
398
399        def get_export_project(svcs):
400            """
401            if the service requests includes one to export a project, return
402            that project.
403            """
404            rv = None
405            for s in svcs:
406                if s.get('name', '') == 'project_export' and \
407                        s.get('visibility', '') == 'export':
408                    if not rv: 
409                        for a in s.get('feddAttr', []):
410                            if a.get('attribute', '') == 'project' \
411                                    and 'value' in a:
412                                rv = a['value']
413                    else:
414                        raise service_error(service_error, access, 
415                                'Requesting multiple project exports is ' + \
416                                        'not supported');
417            return rv
418
419        # The dance to get into the request body
420        if req.has_key('RequestAccessRequestBody'):
421            req = req['RequestAccessRequestBody']
422        else:
423            raise service_error(service_error.req, "No request!?")
424
425
426        found, dyn, owners = self.lookup_access(req, fid)
427        ap = None
428
429        # if this includes a project export request and the exported
430        # project is not the access project, access denied.
431        if 'service' in req:
432            ep = get_export_project(req['service'])
433            if ep and ep != found[0].name:
434                raise service_error(service_error.access,
435                        "Cannot export %s" % ep)
436
437        if self.ssh_pubkey_file:
438            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
439        else:
440            raise service_error(service_error.internal, 
441                    "SSH access parameters required")
442        # keep track of what's been added
443        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
444        aid = unicode(allocID)
445
446        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
447
448        services, svc_state = self.export_services(req.get('service',[]),
449                pname, uname)
450        self.state_lock.acquire()
451        # Store services state in global state
452        for k, v in svc_state.items():
453            self.allocation[aid][k] = v
454        self.write_state()
455        self.state_lock.release()
456        # Give the owners the right to change this allocation
457        for o in owners:
458            self.auth.set_attribute(o, allocID)
459        try:
460            f = open("%s/%s.pem" % (self.certdir, aid), "w")
461            print >>f, alloc_cert
462            f.close()
463        except EnvironmentError, e:
464            raise service_error(service_error.internal, 
465                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
466        resp = self.build_access_response({ 'fedid': allocID } ,
467                ap, services)
468        return resp
469
470    def do_release_project(self, del_project, del_users, del_types):
471        """
472        If a project and users has to be deleted, make the call.
473        """
474        msg = { 'project': { }}
475        if del_project:
476            msg['project']['name']= {'localname': del_project}
477        users = [ ]
478        for u in del_users.keys():
479            users.append({ 'userID': { 'localname': u },\
480                'access' :  \
481                        [ {'sshPubkey' : s } for s in del_users[u]]\
482            })
483        if users: 
484            msg['project']['user'] = users
485        if len(del_types) > 0:
486            msg['resources'] = { 'node': \
487                    [ {'hardware': [ h ] } for h in del_types ]\
488                }
489        if self.allocate_project.release_project:
490            msg = { 'ReleaseProjectRequestBody' : msg}
491            self.allocate_project.release_project(msg)
492
493    def ReleaseAccess(self, req, fid):
494        # The dance to get into the request body
495        if req.has_key('ReleaseAccessRequestBody'):
496            req = req['ReleaseAccessRequestBody']
497        else:
498            raise service_error(service_error.req, "No request!?")
499
500        try:
501            if req['allocID'].has_key('localname'):
502                auth_attr = aid = req['allocID']['localname']
503            elif req['allocID'].has_key('fedid'):
504                aid = unicode(req['allocID']['fedid'])
505                auth_attr = req['allocID']['fedid']
506            else:
507                raise service_error(service_error.req,
508                        "Only localnames and fedids are understood")
509        except KeyError:
510            raise service_error(service_error.req, "Badly formed request")
511
512        self.log.debug("[access] deallocation requested for %s", aid)
513        if not self.auth.check_attribute(fid, auth_attr):
514            self.log.debug("[access] deallocation denied for %s", aid)
515            raise service_error(service_error.access, "Access Denied")
516
517        # If we know this allocation, reduce the reference counts and
518        # remove the local allocations.  Otherwise report an error.  If
519        # there is an allocation to delete, del_users will be a dictonary
520        # of sets where the key is the user that owns the keys in the set.
521        # We use a set to avoid duplicates.  del_project is just the name
522        # of any dynamic project to delete.  We're somewhat lazy about
523        # deleting authorization attributes.  Having access to something
524        # that doesn't exist isn't harmful.
525        del_users = { }
526        del_project = None
527        del_types = set()
528
529        self.state_lock.acquire()
530        if aid in self.allocation:
531            self.log.debug("Found allocation for %s" %aid)
532            for k in self.allocation[aid]['keys']:
533                kk = "%s:%s" % k
534                self.keys[kk] -= 1
535                if self.keys[kk] == 0:
536                    if not del_users.has_key(k[0]):
537                        del_users[k[0]] = set()
538                    del_users[k[0]].add(k[1])
539                    del self.keys[kk]
540
541            if 'project' in self.allocation[aid]:
542                pname = self.allocation[aid]['project']
543                self.projects[pname] -= 1
544                if self.projects[pname] == 0:
545                    del_project = pname
546                    del self.projects[pname]
547
548            if 'types' in self.allocation[aid]:
549                for t in self.allocation[aid]['types']:
550                    self.types[t] -= 1
551                    if self.types[t] == 0:
552                        if not del_project: del_project = t[0]
553                        del_types.add(t[1])
554                        del self.types[t]
555
556            del self.allocation[aid]
557            self.write_state()
558            self.state_lock.release()
559            # If we actually have resources to deallocate, prepare the call.
560            if del_project or del_users:
561                self.do_release_project(del_project, del_users, del_types)
562            # And remove the access cert
563            cf = "%s/%s.pem" % (self.certdir, aid)
564            self.log.debug("Removing %s" % cf)
565            os.remove(cf)
566            return { 'allocID': req['allocID'] } 
567        else:
568            self.state_lock.release()
569            raise service_error(service_error.req, "No such allocation")
570
571    # These are subroutines for StartSegment
572    def generate_ns2(self, topo, expfn, softdir, connInfo):
573        """
574        Convert topo into an ns2 file, decorated with appropriate commands for
575        the particular testbed setup.  Convert all requests for software, etc
576        to point at the staged copies on this testbed and add the federation
577        startcommands.
578        """
579        class dragon_commands:
580            """
581            Functor to spit out approrpiate dragon commands for nodes listed in
582            the connectivity description.  The constructor makes a dict mapping
583            dragon nodes to their parameters and the __call__ checks each
584            element in turn for membership.
585            """
586            def __init__(self, map):
587                self.node_info = map
588
589            def __call__(self, e):
590                s = ""
591                if isinstance(e, topdl.Computer):
592                    if self.node_info.has_key(e.name):
593                        info = self.node_info[e.name]
594                        for ifname, vlan, type in info:
595                            for i in e.interface:
596                                if i.name == ifname:
597                                    addr = i.get_attribute('ip4_address')
598                                    subs = i.substrate[0]
599                                    break
600                            else:
601                                raise service_error(service_error.internal,
602                                        "No interface %s on element %s" % \
603                                                (ifname, e.name))
604                            # XXX: do netmask right
605                            if type =='link':
606                                s = ("tb-allow-external ${%s} " + \
607                                        "dragonportal ip %s vlan %s " + \
608                                        "netmask 255.255.255.0\n") % \
609                                        (e.name, addr, vlan)
610                            elif type =='lan':
611                                s = ("tb-allow-external ${%s} " + \
612                                        "dragonportal " + \
613                                        "ip %s vlan %s usurp %s\n") % \
614                                        (e.name, addr, vlan, subs)
615                            else:
616                                raise service_error(service_error_internal,
617                                        "Unknown DRAGON type %s" % type)
618                return s
619
620        class not_dragon:
621            """
622            Return true if a node is in the given map of dragon nodes.
623            """
624            def __init__(self, map):
625                self.nodes = set(map.keys())
626
627            def __call__(self, e):
628                return e.name not in self.nodes
629
630        # Main line of generate_ns2
631        t = topo.clone()
632
633        # Create the map of nodes that need direct connections (dragon
634        # connections) from the connInfo
635        dragon_map = { }
636        for i in [ i for i in connInfo if i['type'] == 'transit']:
637            for a in i.get('fedAttr', []):
638                if a['attribute'] == 'vlan_id':
639                    vlan = a['value']
640                    break
641            else:
642                raise service_error(service_error.internal, "No vlan tag")
643            members = i.get('member', [])
644            if len(members) > 1: type = 'lan'
645            else: type = 'link'
646
647            try:
648                for m in members:
649                    if m['element'] in dragon_map:
650                        dragon_map[m['element']].append(( m['interface'], 
651                            vlan, type))
652                    else:
653                        dragon_map[m['element']] = [( m['interface'], 
654                            vlan, type),]
655            except KeyError:
656                raise service_error(service_error.req,
657                        "Missing connectivity info")
658
659        # Weed out the things we aren't going to instantiate: Segments, portal
660        # substrates, and portal interfaces.  (The copy in the for loop allows
661        # us to delete from e.elements in side the for loop).  While we're
662        # touching all the elements, we also adjust paths from the original
663        # testbed to local testbed paths and put the federation commands into
664        # the start commands
665        for e in [e for e in t.elements]:
666            if isinstance(e, topdl.Segment):
667                t.elements.remove(e)
668            if isinstance(e, topdl.Computer):
669                self.add_kit(e, self.federation_software)
670                if e.get_attribute('portal') and self.portal_startcommand:
671                    # Add local portal support software
672                    self.add_kit(e, self.portal_software)
673                    # Portals never have a user-specified start command
674                    e.set_attribute('startup', self.portal_startcommand)
675                elif self.node_startcommand:
676                    if e.get_attribute('startup'):
677                        e.set_attribute('startup', "%s \\$USER '%s'" % \
678                                (self.node_startcommand, 
679                                    e.get_attribute('startup')))
680                    else:
681                        e.set_attribute('startup', self.node_startcommand)
682
683                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
684                # Remove portal interfaces that do not connect to DRAGON
685                e.interface = [i for i in e.interface \
686                        if not i.get_attribute('portal') or i.name in dinf ]
687            # Fix software paths
688            for s in getattr(e, 'software', []):
689                s.location = re.sub("^.*/", softdir, s.location)
690
691        t.substrates = [ s.clone() for s in t.substrates ]
692        t.incorporate_elements()
693
694        # Customize the ns2 output for local portal commands and images
695        filters = []
696
697        if self.dragon_endpoint:
698            add_filter = not_dragon(dragon_map)
699            filters.append(dragon_commands(dragon_map))
700        else:
701            add_filter = None
702
703        if self.portal_command:
704            filters.append(topdl.generate_portal_command_filter(
705                self.portal_command, add_filter=add_filter))
706
707        if self.portal_image:
708            filters.append(topdl.generate_portal_image_filter(
709                self.portal_image))
710
711        if self.portal_type:
712            filters.append(topdl.generate_portal_hardware_filter(
713                self.portal_type))
714
715        # Convert to ns and write it out
716        expfile = topdl.topology_to_ns2(t, filters)
717        try:
718            f = open(expfn, "w")
719            print >>f, expfile
720            f.close()
721        except EnvironmentError:
722            raise service_error(service_error.internal,
723                    "Cannot write experiment file %s: %s" % (expfn,e))
724
725    def configure_userconf(self, services):
726        """
727        If the userconf service was imported, collect the configuration data.
728        """
729        for s in services:
730            s_name = s.get('name', '')
731            s_vis = s.get('visibility','')
732            if s_name  == 'userconfig' and s_vis == 'import':
733                # Collect ther server and certificate info.
734                u = s.get('server', None)
735                for a in s.get('fedAttr', []):
736                    if a.get('attribute',"") == 'cert':
737                        cert = a.get('value', None)
738                        break
739                else:
740                    cert = None
741
742                if cert:
743                    # Make a temporary certificate file for get_url.  The
744                    # finally clause removes it whether something goes
745                    # wrong (including an exception from get_url) or not.
746                    try:
747                        tfos, tn = tempfile.mkstemp(suffix=".pem")
748                        tf = os.fdopen(tfos, 'w')
749                        print >>tf, cert
750                        tf.close()
751                        self.log.debug("Getting userconf info: %s" % u)
752                        get_url(u, tn, tmpdir, "userconf")
753                        self.log.debug("Got userconf info: %s" % u)
754                    except EnvironmentError, e:
755                        raise service_error(service.error.internal, 
756                                "Cannot create temp file for " + 
757                                "userconfig certificates: %s e")
758                    except:
759                        t, v, st = sys.exc_info()
760                        raise service_error(service_error.internal,
761                                "Error retrieving %s: %s" % (s, v))
762                    finally:
763                        if tn: os.remove(tn)
764                else:
765                    raise service_error(service_error.req,
766                            "No certificate for retreiving userconfig")
767                break
768
769    def add_seer_node(self, topo, name, startup):
770        """
771        Add a seer node to the given topology, with the startup command passed
772        in.  Used by configure seer_services.
773        """
774        c_node = topdl.Computer(
775                name=name, 
776                os= topdl.OperatingSystem(
777                    attribute=[
778                    { 'attribute': 'osid', 
779                        'value': self.local_seer_image },
780                    ]),
781                attribute=[
782                    { 'attribute': 'startup', 'value': startup },
783                    ]
784                )
785        self.add_kit(c_node, self.local_seer_software)
786        topo.elements.append(c_node)
787
788    def configure_seer_services(self, services, topo, softdir):
789        """
790        Make changes to the topology required for the seer requests being made.
791        Specifically, add any control or master nodes required and set up the
792        start commands on the nodes to interconnect them.
793        """
794        local_seer = False      # True if we need to add a control node
795        collect_seer = False    # True if there is a seer-master node
796        seer_master= False      # True if we need to add the seer-master
797        for s in services:
798            s_name = s.get('name', '')
799            s_vis = s.get('visibility','')
800
801            if s_name  == 'local_seer_control' and s_vis == 'export':
802                local_seer = True
803            elif s_name == 'seer_master':
804                if s_vis == 'import':
805                    collect_seer = True
806                elif s_vis == 'export':
807                    seer_master = True
808       
809        # We've got the whole picture now, so add nodes if needed and configure
810        # them to interconnect properly.
811        if local_seer or seer_master:
812            # Copy local seer control node software to the tempdir
813            for l, f in self.local_seer_software:
814                base = os.path.basename(f)
815                copy_file(f, "%s/%s" % (softdir, base))
816        # If we're collecting seers somewhere the controllers need to talk to
817        # the master.  In testbeds that export the service, that will be a
818        # local node that we'll add below.  Elsewhere it will be the control
819        # portal that will port forward to the exporting master.
820        if local_seer:
821            if collect_seer:
822                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
823            else:
824                startup = self.local_seer_start
825            self.add_seer_node(topo, 'control', startup)
826        # If this is the seer master, add that node, too.
827        if seer_master:
828            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
829
830    def retrieve_software(self, topo, certfile, softdir):
831        """
832        Collect the software that nodes in the topology need loaded and stage
833        it locally.  This implies retrieving it from the experiment_controller
834        and placing it into softdir.  Certfile is used to prove that this node
835        has access to that data (it's the allocation/segment fedid).  Finally
836        local portal and federation software is also copied to the same staging
837        directory for simplicity - all software needed for experiment creation
838        is in softdir.
839        """
840        sw = set()
841        for e in topo.elements:
842            for s in getattr(e, 'software', []):
843                sw.add(s.location)
844        for s in sw:
845            self.log.debug("Retrieving %s" % s)
846            try:
847                get_url(s, certfile, softdir)
848            except:
849                t, v, st = sys.exc_info()
850                raise service_error(service_error.internal,
851                        "Error retrieving %s: %s" % (s, v))
852
853        # Copy local federation and portal node software to the tempdir
854        for s in (self.federation_software, self.portal_software):
855            for l, f in s:
856                base = os.path.basename(f)
857                copy_file(f, "%s/%s" % (softdir, base))
858
859
860    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
861        """
862        Gather common configuration files, retrieve or create an experiment
863        name and project name, and return the ssh_key filenames.  Create an
864        allocation log bound to the state log variable as well.
865        """
866        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
867        ename = None
868        pubkey_base = None
869        secretkey_base = None
870        proj = None
871        user = None
872        alloc_log = None
873
874        for a in attrs:
875            if a['attribute'] in configs:
876                try:
877                    self.log.debug("Retrieving %s from %s" % \
878                            (a['attribute'], a['value']))
879                    get_url(a['value'], certfile, tmpdir)
880                except:
881                    t, v, st = sys.exc_info()
882                    raise service_error(service_error.internal,
883                            "Error retrieving %s: %s" % (a.get('value', ""), v))
884            if a['attribute'] == 'ssh_pubkey':
885                pubkey_base = a['value'].rpartition('/')[2]
886            if a['attribute'] == 'ssh_secretkey':
887                secretkey_base = a['value'].rpartition('/')[2]
888            if a['attribute'] == 'experiment_name':
889                ename = a['value']
890
891        if not ename:
892            ename = ""
893            for i in range(0,5):
894                ename += random.choice(string.ascii_letters)
895            self.log.warn("No experiment name: picked one randomly: %s" \
896                    % ename)
897
898        if not pubkey_base:
899            raise service_error(service_error.req, 
900                    "No public key attribute")
901
902        if not secretkey_base:
903            raise service_error(service_error.req, 
904                    "No secret key attribute")
905
906        self.state_lock.acquire()
907        if aid in self.allocation:
908            proj = self.allocation[aid].get('project', None)
909            if not proj: 
910                proj = self.allocation[aid].get('sproject', None)
911            user = self.allocation[aid].get('user', None)
912            self.allocation[aid]['experiment'] = ename
913            self.allocation[aid]['log'] = [ ]
914            # Create a logger that logs to the experiment's state object as
915            # well as to the main log file.
916            alloc_log = logging.getLogger('fedd.access.%s' % ename)
917            h = logging.StreamHandler(
918                    list_log.list_log(self.allocation[aid]['log']))
919            # XXX: there should be a global one of these rather than
920            # repeating the code.
921            h.setFormatter(logging.Formatter(
922                "%(asctime)s %(name)s %(message)s",
923                        '%d %b %y %H:%M:%S'))
924            alloc_log.addHandler(h)
925            self.write_state()
926        self.state_lock.release()
927
928        if not proj:
929            raise service_error(service_error.internal, 
930                    "Can't find project for %s" %aid)
931
932        if not user:
933            raise service_error(service_error.internal, 
934                    "Can't find creation user for %s" %aid)
935
936        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
937
938    def finalize_experiment(self, starter, topo, aid, alloc_id):
939        """
940        Store key bits of experiment state in the global repository, including
941        the response that may need to be replayed, and return the response.
942        """
943        # Copy the assigned names into the return topology
944        embedding = [ ]
945        for n in starter.node:
946            embedding.append({ 
947                'toponame': n,
948                'physname': ["%s%s" %  (starter.node[n], self.domain)],
949                })
950        # Grab the log (this is some anal locking, but better safe than
951        # sorry)
952        self.state_lock.acquire()
953        logv = "".join(self.allocation[aid]['log'])
954        # It's possible that the StartSegment call gets retried (!).
955        # if the 'started' key is in the allocation, we'll return it rather
956        # than redo the setup.
957        self.allocation[aid]['started'] = { 
958                'allocID': alloc_id,
959                'allocationLog': logv,
960                'segmentdescription': { 
961                    'topdldescription': topo.clone().to_dict()
962                    },
963                'embedding': embedding
964                }
965        retval = copy.copy(self.allocation[aid]['started'])
966        self.write_state()
967        self.state_lock.release()
968        return retval
969
970    def remove_dirs(self, dir):
971        """
972        Remove the directory tree and all files rooted at dir.  Log any errors,
973        but continue.
974        """
975        self.log.debug("[removedirs]: removing %s" % dir)
976        try:
977            for path, dirs, files in os.walk(dir, topdown=False):
978                for f in files:
979                    os.remove(os.path.join(path, f))
980                for d in dirs:
981                    os.rmdir(os.path.join(path, d))
982            os.rmdir(dir)
983        except EnvironmentError, e:
984            self.log.error("Error deleting directory tree in %s" % e);
985   
986    # End of StartSegment support routines
987
988    def StartSegment(self, req, fid):
989        err = None  # Any service_error generated after tmpdir is created
990        rv = None   # Return value from segment creation
991
992        try:
993            req = req['StartSegmentRequestBody']
994            auth_attr = req['allocID']['fedid']
995            topref = req['segmentdescription']['topdldescription']
996        except KeyError:
997            raise service_error(server_error.req, "Badly formed request")
998
999        connInfo = req.get('connection', [])
1000        services = req.get('service', [])
1001        aid = "%s" % auth_attr
1002        attrs = req.get('fedAttr', [])
1003        if not self.auth.check_attribute(fid, auth_attr):
1004            raise service_error(service_error.access, "Access denied")
1005        else:
1006            # See if this is a replay of an earlier succeeded StartSegment -
1007            # sometimes SSL kills 'em.  If so, replay the response rather than
1008            # redoing the allocation.
1009            self.state_lock.acquire()
1010            retval = self.allocation[aid].get('started', None)
1011            self.state_lock.release()
1012            if retval:
1013                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1014                        "replaying response")
1015                return retval
1016
1017        # A new request.  Do it.
1018
1019        if topref: topo = topdl.Topology(**topref)
1020        else:
1021            raise service_error(service_error.req, 
1022                    "Request missing segmentdescription'")
1023       
1024        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1025        try:
1026            tmpdir = tempfile.mkdtemp(prefix="access-")
1027            softdir = "%s/software" % tmpdir
1028            os.mkdir(softdir)
1029        except EnvironmentError:
1030            raise service_error(service_error.internal, "Cannot create tmp dir")
1031
1032        # Try block alllows us to clean up temporary files.
1033        try:
1034            self.retrieve_software(topo, certfile, softdir)
1035            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1036                    self.initialize_experiment_info(attrs, aid, 
1037                            certfile, tmpdir)
1038
1039            # Set up userconf and seer if needed
1040            self.configure_userconf(services)
1041            self.configure_seer_services(services, topo, softdir)
1042            # Get and send synch store variables
1043            self.export_store_info(certfile, proj, ename, connInfo)
1044            self.import_store_info(certfile, connInfo)
1045
1046            expfile = "%s/experiment.tcl" % tmpdir
1047
1048            self.generate_portal_configs(topo, pubkey_base, 
1049                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1050            self.generate_ns2(topo, expfile, 
1051                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1052
1053            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1054                    debug=self.create_debug, log=alloc_log)
1055            rv = starter(self, ename, proj, user, expfile, tmpdir)
1056        except service_error, e:
1057            err = e
1058        except:
1059            t, v, st = sys.exc_info()
1060            err = service_error(service_error.internal, "%s: %s" % \
1061                    (v, traceback.extract_tb(st)))
1062
1063        # Walk up tmpdir, deleting as we go
1064        if self.cleanup: self.remove_dirs(tmpdir)
1065        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1066
1067        if rv:
1068            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1069        elif err:
1070            raise service_error(service_error.federant,
1071                    "Swapin failed: %s" % err)
1072        else:
1073            raise service_error(service_error.federant, "Swapin failed")
1074
1075    def TerminateSegment(self, req, fid):
1076        try:
1077            req = req['TerminateSegmentRequestBody']
1078        except KeyError:
1079            raise service_error(server_error.req, "Badly formed request")
1080
1081        auth_attr = req['allocID']['fedid']
1082        aid = "%s" % auth_attr
1083        attrs = req.get('fedAttr', [])
1084        if not self.auth.check_attribute(fid, auth_attr):
1085            raise service_error(service_error.access, "Access denied")
1086
1087        self.state_lock.acquire()
1088        if aid in self.allocation:
1089            proj = self.allocation[aid].get('project', None)
1090            if not proj: 
1091                proj = self.allocation[aid].get('sproject', None)
1092            user = self.allocation[aid].get('user', None)
1093            ename = self.allocation[aid].get('experiment', None)
1094        else:
1095            proj = None
1096            user = None
1097            ename = None
1098        self.state_lock.release()
1099
1100        if not proj:
1101            raise service_error(service_error.internal, 
1102                    "Can't find project for %s" % aid)
1103
1104        if not user:
1105            raise service_error(service_error.internal, 
1106                    "Can't find creation user for %s" % aid)
1107        if not ename:
1108            raise service_error(service_error.internal, 
1109                    "Can't find experiment name for %s" % aid)
1110        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1111                debug=self.create_debug)
1112        stopper(self, user, proj, ename)
1113        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.