source: fedd/federation/emulab_access.py @ 05e8da8

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 05e8da8 was e07c8f3, checked in by Ted Faber <faber@…>, 14 years ago

Remove software, etc when experiment swaps out

Fix a couple small bugs in access controller that I believed were fixed before.

  • Property mode set to 100644
File size: 36.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74
75        self.dragon_endpoint = config.get("access", "dragon")
76        self.dragon_vlans = config.get("access", "dragon_vlans")
77        self.deter_internal = config.get("access", "deter_internal")
78
79        self.tunnel_config = config.getboolean("access", "tunnel_config")
80        self.portal_command = config.get("access", "portal_command")
81        self.portal_image = config.get("access", "portal_image")
82        self.portal_type = config.get("access", "portal_type") or "pc"
83        self.portal_startcommand = config.get("access", "portal_startcommand")
84        self.node_startcommand = config.get("access", "node_startcommand")
85
86        self.federation_software = self.software_list(self.federation_software)
87        self.portal_software = self.software_list(self.portal_software)
88        self.local_seer_software = self.software_list(self.local_seer_software)
89
90        self.access_type = self.access_type.lower()
91        if self.access_type == 'remote_emulab':
92            self.start_segment = proxy_emulab_segment.start_segment
93            self.stop_segment = proxy_emulab_segment.stop_segment
94        elif self.access_type == 'local_emulab':
95            self.start_segment = local_emulab_segment.start_segment
96            self.stop_segment = local_emulab_segment.stop_segment
97        else:
98            self.start_segment = None
99            self.stop_segment = None
100
101        self.restricted = [ ]
102        self.access = { }
103        if config.has_option("access", "accessdb"):
104            self.read_access(config.get("access", "accessdb"))
105        tb = config.get('access', 'testbed')
106        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
107        else: self.testbed = [ ]
108
109        if config.has_option("access", "accessdb"):
110            self.read_access(config.get("access", "accessdb"), 
111                    self.make_access_project)
112
113        # read_state in the base_class
114        self.state_lock.acquire()
115        for a  in ('allocation', 'projects', 'keys', 'types'):
116            if a not in self.state:
117                self.state[a] = { }
118        self.allocation = self.state['allocation']
119        self.projects = self.state['projects']
120        self.keys = self.state['keys']
121        self.types = self.state['types']
122        # Add the ownership attributes to the authorizer.  Note that the
123        # indices of the allocation dict are strings, but the attributes are
124        # fedids, so there is a conversion.
125        for k in self.allocation.keys():
126            for o in self.allocation[k].get('owners', []):
127                self.auth.set_attribute(o, fedid(hexstr=k))
128            if self.allocation[k].has_key('userconfig'):
129                sfid = self.allocation[k]['userconfig']
130                fid = fedid(hexstr=sfid)
131                self.auth.set_attribute(fid, "/%s" % sfid)
132        self.state_lock.release()
133        self.exports = {
134                'SMB': self.export_SMB,
135                'seer': self.export_seer,
136                'tmcd': self.export_tmcd,
137                'userconfig': self.export_userconfig,
138                'project_export': self.export_project_export,
139                'local_seer_control': self.export_local_seer,
140                'seer_master': self.export_seer_master,
141                'hide_hosts': self.export_hide_hosts,
142                }
143
144        if not self.local_seer_image or not self.local_seer_software or \
145                not self.local_seer_start:
146            if 'local_seer_control' in self.exports:
147                del self.exports['local_seer_control']
148
149        if not self.local_seer_image or not self.local_seer_software or \
150                not self.seer_master_start:
151            if 'seer_master' in self.exports:
152                del self.exports['seer_master']
153
154
155        self.soap_services = {\
156            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
157            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
158            'StartSegment': soap_handler("StartSegment", self.StartSegment),
159            'TerminateSegment': soap_handler("TerminateSegment",
160                self.TerminateSegment),
161            }
162        self.xmlrpc_services =  {\
163            'RequestAccess': xmlrpc_handler('RequestAccess',
164                self.RequestAccess),
165            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
166                self.ReleaseAccess),
167            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
168            'TerminateSegment': xmlrpc_handler('TerminateSegment',
169                self.TerminateSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175        if not config.has_option("allocate", "uri"):
176            self.allocate_project = \
177                allocate_project_local(config, auth)
178        else:
179            self.allocate_project = \
180                allocate_project_remote(config, auth)
181
182
183        # If the project allocator exports services, put them in this object's
184        # maps so that classes that instantiate this can call the services.
185        self.soap_services.update(self.allocate_project.soap_services)
186        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
187
188    @staticmethod
189    def make_access_project(str):
190        """
191        Convert a string of the form (id[:resources:resouces], id, id) into an
192        access_project.  This is called by read_access to convert to local
193        attributes.  It returns a tuple of the form (project, user, user) where
194        users may be names or fedids.
195        """
196        def parse_name(n):
197            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
198            else: return n
199
200        str = str.strip()
201        if str.startswith('(') and str.endswith(')'):
202            str = str[1:-1]
203            names = [ s.strip() for s in str.split(",")]
204            if len(names) > 3:
205                raise self.parse_error("More than three fields in name")
206            first = names[0].split(":")
207            if first == 'fedid:':
208                del first[0]
209                first[0] = fedid(hexstr=first[0])
210            names[0] = access_project(first[0], first[1:])
211
212            for i in range(1,2):
213                names[i] = parse_name(names[i])
214
215            return tuple(names)
216        else:
217            raise self.parse_error('Bad mapping (unbalanced parens)')
218
219
220    # RequestAccess support routines
221
222    def lookup_access(self, req, fid):
223        """
224        Look up the local access control information mapped to this fedid and
225        credentials.  In this case it is a (project, create_user, access_user)
226        triple, and a triple of three booleans indicating which, if any will
227        need to be dynamically created.  Finally a list of owners for that
228        allocation is returned.
229
230        lookup_access_base pulls the first triple out, and it is parsed by this
231        routine into the boolean map.  Owners is always the controlling fedid.
232        """
233        # Return values
234        rp = access_project(None, ())
235        ru = None
236        # This maps a valid user to the Emulab projects and users to use
237        found, match = self.lookup_access_base(req, fid)
238        tb, project, user = match
239       
240        if found == None:
241            raise service_error(service_error.access,
242                    "Access denied - cannot map access")
243
244        # resolve <dynamic> and <same> in found
245        dyn_proj = False
246        dyn_create_user = False
247        dyn_service_user = False
248
249        if found[0].name == "<same>":
250            if project != None:
251                rp.name = project
252            else : 
253                raise service_error(\
254                        service_error.server_config,
255                        "Project matched <same> when no project given")
256        elif found[0].name == "<dynamic>":
257            rp.name = None
258            dyn_proj = True
259        else:
260            rp.name = found[0].name
261        rp.node_types = found[0].node_types;
262
263        if found[1] == "<same>":
264            if user_match == "<any>":
265                if user != None: rcu = user[0]
266                else: raise service_error(\
267                        service_error.server_config,
268                        "Matched <same> on anonymous request")
269            else:
270                rcu = user_match
271        elif found[1] == "<dynamic>":
272            rcu = None
273            dyn_create_user = True
274        else:
275            rcu = found[1]
276       
277        if found[2] == "<same>":
278            if user_match == "<any>":
279                if user != None: rsu = user[0]
280                else: raise service_error(\
281                        service_error.server_config,
282                        "Matched <same> on anonymous request")
283            else:
284                rsu = user_match
285        elif found[2] == "<dynamic>":
286            rsu = None
287            dyn_service_user = True
288        else:
289            rsu = found[2]
290
291        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
292                [ fid ]
293
294    def do_project_allocation(self, dyn, project, user):
295        """
296        Call the project allocation routines and return the info.
297        """
298        if dyn: 
299            # Compose the dynamic project request
300            # (only dynamic, dynamic currently allowed)
301            preq = { 'AllocateProjectRequestBody': \
302                        { 'project' : {\
303                            'user': [ \
304                            { \
305                                'access': [ { 
306                                    'sshPubkey': self.ssh_pubkey_file } ],
307                                 'role': "serviceAccess",\
308                            }, \
309                            { \
310                                'access': [ { 
311                                    'sshPubkey': self.ssh_pubkey_file } ],
312                                 'role': "experimentCreation",\
313                            }, \
314                            ], \
315                            }\
316                        }\
317                    }
318            return self.allocate_project.dynamic_project(preq)
319        else:
320            preq = {'StaticProjectRequestBody' : \
321                    { 'project': \
322                        { 'name' : { 'localname' : project },\
323                          'user' : [ \
324                            {\
325                                'userID': { 'localname' : user }, \
326                                'access': [ { 
327                                    'sshPubkey': self.ssh_pubkey_file } ],
328                                'role': 'experimentCreation'\
329                            },\
330                            {\
331                                'userID': { 'localname' : user}, \
332                                'access': [ { 
333                                    'sshPubkey': self.ssh_pubkey_file } ],
334                                'role': 'serviceAccess'\
335                            },\
336                        ]}\
337                    }\
338            }
339            return self.allocate_project.static_project(preq)
340
341    def save_project_state(self, aid, ap, dyn, owners):
342        """
343        Parse out and save the information relevant to the project created for
344        this experiment.  That info is largely in ap and owners.  dyn indicates
345        that the project was created dynamically.  Return the user and project
346        names.
347        """
348        self.state_lock.acquire()
349        self.allocation[aid] = { }
350        try:
351            pname = ap['project']['name']['localname']
352        except KeyError:
353            pname = None
354
355        if dyn:
356            if not pname:
357                self.state_lock.release()
358                raise service_error(service_error.internal,
359                        "Misformed allocation response?")
360            if pname in self.projects: self.projects[pname] += 1
361            else: self.projects[pname] = 1
362            self.allocation[aid]['project'] = pname
363        else:
364            # sproject is a static project associated with this allocation.
365            self.allocation[aid]['sproject'] = pname
366
367        self.allocation[aid]['keys'] = [ ]
368
369        try:
370            for u in ap['project']['user']:
371                uname = u['userID']['localname']
372                if u['role'] == 'experimentCreation':
373                    self.allocation[aid]['user'] = uname
374                for k in [ k['sshPubkey'] for k in u['access'] \
375                        if k.has_key('sshPubkey') ]:
376                    kv = "%s:%s" % (uname, k)
377                    if self.keys.has_key(kv): self.keys[kv] += 1
378                    else: self.keys[kv] = 1
379                    self.allocation[aid]['keys'].append((uname, k))
380        except KeyError:
381            self.state_lock.release()
382            raise service_error(service_error.internal,
383                    "Misformed allocation response?")
384
385        self.allocation[aid]['owners'] = owners
386        self.write_state()
387        self.state_lock.release()
388        return (pname, uname)
389
390    # End of RequestAccess support routines
391
392    def RequestAccess(self, req, fid):
393        """
394        Handle the access request.  Proxy if not for us.
395
396        Parse out the fields and make the allocations or rejections if for us,
397        otherwise, assuming we're willing to proxy, proxy the request out.
398        """
399
400        def gateway_hardware(h):
401            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
402            else: return h
403
404        def get_export_project(svcs):
405            """
406            if the service requests includes one to export a project, return
407            that project.
408            """
409            rv = None
410            for s in svcs:
411                if s.get('name', '') == 'project_export' and \
412                        s.get('visibility', '') == 'export':
413                    if not rv: 
414                        for a in s.get('feddAttr', []):
415                            if a.get('attribute', '') == 'project' \
416                                    and 'value' in a:
417                                rv = a['value']
418                    else:
419                        raise service_error(service_error, access, 
420                                'Requesting multiple project exports is ' + \
421                                        'not supported');
422            return rv
423
424        # The dance to get into the request body
425        if req.has_key('RequestAccessRequestBody'):
426            req = req['RequestAccessRequestBody']
427        else:
428            raise service_error(service_error.req, "No request!?")
429
430
431        found, dyn, owners = self.lookup_access(req, fid)
432        ap = None
433
434        # if this includes a project export request and the exported
435        # project is not the access project, access denied.
436        if 'service' in req:
437            ep = get_export_project(req['service'])
438            if ep and ep != found[0].name:
439                raise service_error(service_error.access,
440                        "Cannot export %s" % ep)
441
442        if self.ssh_pubkey_file:
443            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
444        else:
445            raise service_error(service_error.internal, 
446                    "SSH access parameters required")
447        # keep track of what's been added
448        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
449        aid = unicode(allocID)
450
451        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
452
453        services, svc_state = self.export_services(req.get('service',[]),
454                pname, uname)
455        self.state_lock.acquire()
456        # Store services state in global state
457        for k, v in svc_state.items():
458            self.allocation[aid][k] = v
459        self.write_state()
460        self.state_lock.release()
461        # Give the owners the right to change this allocation
462        for o in owners:
463            self.auth.set_attribute(o, allocID)
464        try:
465            f = open("%s/%s.pem" % (self.certdir, aid), "w")
466            print >>f, alloc_cert
467            f.close()
468        except EnvironmentError, e:
469            raise service_error(service_error.internal, 
470                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
471        resp = self.build_access_response({ 'fedid': allocID } ,
472                ap, services)
473        return resp
474
475    def do_release_project(self, del_project, del_users, del_types):
476        """
477        If a project and users has to be deleted, make the call.
478        """
479        msg = { 'project': { }}
480        if del_project:
481            msg['project']['name']= {'localname': del_project}
482        users = [ ]
483        for u in del_users.keys():
484            users.append({ 'userID': { 'localname': u },\
485                'access' :  \
486                        [ {'sshPubkey' : s } for s in del_users[u]]\
487            })
488        if users: 
489            msg['project']['user'] = users
490        if len(del_types) > 0:
491            msg['resources'] = { 'node': \
492                    [ {'hardware': [ h ] } for h in del_types ]\
493                }
494        if self.allocate_project.release_project:
495            msg = { 'ReleaseProjectRequestBody' : msg}
496            self.allocate_project.release_project(msg)
497
498    def ReleaseAccess(self, req, fid):
499        # The dance to get into the request body
500        if req.has_key('ReleaseAccessRequestBody'):
501            req = req['ReleaseAccessRequestBody']
502        else:
503            raise service_error(service_error.req, "No request!?")
504
505        try:
506            if req['allocID'].has_key('localname'):
507                auth_attr = aid = req['allocID']['localname']
508            elif req['allocID'].has_key('fedid'):
509                aid = unicode(req['allocID']['fedid'])
510                auth_attr = req['allocID']['fedid']
511            else:
512                raise service_error(service_error.req,
513                        "Only localnames and fedids are understood")
514        except KeyError:
515            raise service_error(service_error.req, "Badly formed request")
516
517        self.log.debug("[access] deallocation requested for %s", aid)
518        if not self.auth.check_attribute(fid, auth_attr):
519            self.log.debug("[access] deallocation denied for %s", aid)
520            raise service_error(service_error.access, "Access Denied")
521
522        # If we know this allocation, reduce the reference counts and
523        # remove the local allocations.  Otherwise report an error.  If
524        # there is an allocation to delete, del_users will be a dictonary
525        # of sets where the key is the user that owns the keys in the set.
526        # We use a set to avoid duplicates.  del_project is just the name
527        # of any dynamic project to delete.  We're somewhat lazy about
528        # deleting authorization attributes.  Having access to something
529        # that doesn't exist isn't harmful.
530        del_users = { }
531        del_project = None
532        del_types = set()
533
534        self.state_lock.acquire()
535        if aid in self.allocation:
536            self.log.debug("Found allocation for %s" %aid)
537            for k in self.allocation[aid]['keys']:
538                kk = "%s:%s" % k
539                self.keys[kk] -= 1
540                if self.keys[kk] == 0:
541                    if not del_users.has_key(k[0]):
542                        del_users[k[0]] = set()
543                    del_users[k[0]].add(k[1])
544                    del self.keys[kk]
545
546            if 'project' in self.allocation[aid]:
547                pname = self.allocation[aid]['project']
548                self.projects[pname] -= 1
549                if self.projects[pname] == 0:
550                    del_project = pname
551                    del self.projects[pname]
552
553            if 'types' in self.allocation[aid]:
554                for t in self.allocation[aid]['types']:
555                    self.types[t] -= 1
556                    if self.types[t] == 0:
557                        if not del_project: del_project = t[0]
558                        del_types.add(t[1])
559                        del self.types[t]
560
561            del self.allocation[aid]
562            self.write_state()
563            self.state_lock.release()
564            # If we actually have resources to deallocate, prepare the call.
565            if del_project or del_users:
566                self.do_release_project(del_project, del_users, del_types)
567            # And remove the access cert
568            cf = "%s/%s.pem" % (self.certdir, aid)
569            self.log.debug("Removing %s" % cf)
570            os.remove(cf)
571            return { 'allocID': req['allocID'] } 
572        else:
573            self.state_lock.release()
574            raise service_error(service_error.req, "No such allocation")
575
576    # These are subroutines for StartSegment
577    def generate_ns2(self, topo, expfn, softdir, connInfo):
578        """
579        Convert topo into an ns2 file, decorated with appropriate commands for
580        the particular testbed setup.  Convert all requests for software, etc
581        to point at the staged copies on this testbed and add the federation
582        startcommands.
583        """
584        class dragon_commands:
585            """
586            Functor to spit out approrpiate dragon commands for nodes listed in
587            the connectivity description.  The constructor makes a dict mapping
588            dragon nodes to their parameters and the __call__ checks each
589            element in turn for membership.
590            """
591            def __init__(self, map):
592                self.node_info = map
593
594            def __call__(self, e):
595                s = ""
596                if isinstance(e, topdl.Computer):
597                    if self.node_info.has_key(e.name):
598                        info = self.node_info[e.name]
599                        for ifname, vlan, type in info:
600                            for i in e.interface:
601                                if i.name == ifname:
602                                    addr = i.get_attribute('ip4_address')
603                                    subs = i.substrate[0]
604                                    break
605                            else:
606                                raise service_error(service_error.internal,
607                                        "No interface %s on element %s" % \
608                                                (ifname, e.name))
609                            # XXX: do netmask right
610                            if type =='link':
611                                s = ("tb-allow-external ${%s} " + \
612                                        "dragonportal ip %s vlan %s " + \
613                                        "netmask 255.255.255.0\n") % \
614                                        (topdl.to_tcl_name(e.name), addr, vlan)
615                            elif type =='lan':
616                                s = ("tb-allow-external ${%s} " + \
617                                        "dragonportal " + \
618                                        "ip %s vlan %s usurp %s\n") % \
619                                        (topdl.to_tcl_name(e.name), addr, 
620                                                vlan, subs)
621                            else:
622                                raise service_error(service_error_internal,
623                                        "Unknown DRAGON type %s" % type)
624                return s
625
626        class not_dragon:
627            """
628            Return true if a node is in the given map of dragon nodes.
629            """
630            def __init__(self, map):
631                self.nodes = set(map.keys())
632
633            def __call__(self, e):
634                return e.name not in self.nodes
635
636        # Main line of generate_ns2
637        t = topo.clone()
638
639        # Create the map of nodes that need direct connections (dragon
640        # connections) from the connInfo
641        dragon_map = { }
642        for i in [ i for i in connInfo if i['type'] == 'transit']:
643            for a in i.get('fedAttr', []):
644                if a['attribute'] == 'vlan_id':
645                    vlan = a['value']
646                    break
647            else:
648                raise service_error(service_error.internal, "No vlan tag")
649            members = i.get('member', [])
650            if len(members) > 1: type = 'lan'
651            else: type = 'link'
652
653            try:
654                for m in members:
655                    if m['element'] in dragon_map:
656                        dragon_map[m['element']].append(( m['interface'], 
657                            vlan, type))
658                    else:
659                        dragon_map[m['element']] = [( m['interface'], 
660                            vlan, type),]
661            except KeyError:
662                raise service_error(service_error.req,
663                        "Missing connectivity info")
664
665        # Weed out the things we aren't going to instantiate: Segments, portal
666        # substrates, and portal interfaces.  (The copy in the for loop allows
667        # us to delete from e.elements in side the for loop).  While we're
668        # touching all the elements, we also adjust paths from the original
669        # testbed to local testbed paths and put the federation commands into
670        # the start commands
671        for e in [e for e in t.elements]:
672            if isinstance(e, topdl.Segment):
673                t.elements.remove(e)
674            if isinstance(e, topdl.Computer):
675                self.add_kit(e, self.federation_software)
676                if e.get_attribute('portal') and self.portal_startcommand:
677                    # Add local portal support software
678                    self.add_kit(e, self.portal_software)
679                    # Portals never have a user-specified start command
680                    e.set_attribute('startup', self.portal_startcommand)
681                elif self.node_startcommand:
682                    if e.get_attribute('startup'):
683                        e.set_attribute('startup', "%s \\$USER '%s'" % \
684                                (self.node_startcommand, 
685                                    e.get_attribute('startup')))
686                    else:
687                        e.set_attribute('startup', self.node_startcommand)
688
689                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
690                # Remove portal interfaces that do not connect to DRAGON
691                e.interface = [i for i in e.interface \
692                        if not i.get_attribute('portal') or i.name in dinf ]
693            # Fix software paths
694            for s in getattr(e, 'software', []):
695                s.location = re.sub("^.*/", softdir, s.location)
696
697        t.substrates = [ s.clone() for s in t.substrates ]
698        t.incorporate_elements()
699
700        # Customize the ns2 output for local portal commands and images
701        filters = []
702
703        if self.dragon_endpoint:
704            add_filter = not_dragon(dragon_map)
705            filters.append(dragon_commands(dragon_map))
706        else:
707            add_filter = None
708
709        if self.portal_command:
710            filters.append(topdl.generate_portal_command_filter(
711                self.portal_command, add_filter=add_filter))
712
713        if self.portal_image:
714            filters.append(topdl.generate_portal_image_filter(
715                self.portal_image))
716
717        if self.portal_type:
718            filters.append(topdl.generate_portal_hardware_filter(
719                self.portal_type))
720
721        # Convert to ns and write it out
722        expfile = topdl.topology_to_ns2(t, filters)
723        try:
724            f = open(expfn, "w")
725            print >>f, expfile
726            f.close()
727        except EnvironmentError:
728            raise service_error(service_error.internal,
729                    "Cannot write experiment file %s: %s" % (expfn,e))
730
731    def export_store_info(self, cf, proj, ename, connInfo):
732        """
733        For the export requests in the connection info, install the peer names
734        at the experiment controller via SetValue calls.
735        """
736
737        for c in connInfo:
738            for p in [ p for p in c.get('parameter', []) \
739                    if p.get('type', '') == 'output']:
740
741                if p.get('name', '') == 'peer':
742                    k = p.get('key', None)
743                    surl = p.get('store', None)
744                    if surl and k and k.index('/') != -1:
745                        value = "%s.%s.%s%s" % \
746                                (k[k.index('/')+1:], ename, proj, self.domain)
747                        req = { 'name': k, 'value': value }
748                        self.log.debug("Setting %s to %s on %s" % \
749                                (k, value, surl))
750                        self.call_SetValue(surl, req, cf)
751                    else:
752                        self.log.error("Bad export request: %s" % p)
753                elif p.get('name', '') == 'ssh_port':
754                    k = p.get('key', None)
755                    surl = p.get('store', None)
756                    if surl and k:
757                        req = { 'name': k, 'value': self.ssh_port }
758                        self.log.debug("Setting %s to %s on %s" % \
759                                (k, self.ssh_port, surl))
760                        self.call_SetValue(surl, req, cf)
761                    else:
762                        self.log.error("Bad export request: %s" % p)
763                else:
764                    self.log.error("Unknown export parameter: %s" % \
765                            p.get('name'))
766                    continue
767
768    def add_seer_node(self, topo, name, startup):
769        """
770        Add a seer node to the given topology, with the startup command passed
771        in.  Used by configure seer_services.
772        """
773        c_node = topdl.Computer(
774                name=name, 
775                os= topdl.OperatingSystem(
776                    attribute=[
777                    { 'attribute': 'osid', 
778                        'value': self.local_seer_image },
779                    ]),
780                attribute=[
781                    { 'attribute': 'startup', 'value': startup },
782                    ]
783                )
784        self.add_kit(c_node, self.local_seer_software)
785        topo.elements.append(c_node)
786
787    def configure_seer_services(self, services, topo, softdir):
788        """
789        Make changes to the topology required for the seer requests being made.
790        Specifically, add any control or master nodes required and set up the
791        start commands on the nodes to interconnect them.
792        """
793        local_seer = False      # True if we need to add a control node
794        collect_seer = False    # True if there is a seer-master node
795        seer_master= False      # True if we need to add the seer-master
796        for s in services:
797            s_name = s.get('name', '')
798            s_vis = s.get('visibility','')
799
800            if s_name  == 'local_seer_control' and s_vis == 'export':
801                local_seer = True
802            elif s_name == 'seer_master':
803                if s_vis == 'import':
804                    collect_seer = True
805                elif s_vis == 'export':
806                    seer_master = True
807       
808        # We've got the whole picture now, so add nodes if needed and configure
809        # them to interconnect properly.
810        if local_seer or seer_master:
811            # Copy local seer control node software to the tempdir
812            for l, f in self.local_seer_software:
813                base = os.path.basename(f)
814                copy_file(f, "%s/%s" % (softdir, base))
815        # If we're collecting seers somewhere the controllers need to talk to
816        # the master.  In testbeds that export the service, that will be a
817        # local node that we'll add below.  Elsewhere it will be the control
818        # portal that will port forward to the exporting master.
819        if local_seer:
820            if collect_seer:
821                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
822            else:
823                startup = self.local_seer_start
824            self.add_seer_node(topo, 'control', startup)
825        # If this is the seer master, add that node, too.
826        if seer_master:
827            self.add_seer_node(topo, 'seer-master', 
828                    "%s -R -n -R seer-master -R -A -R sink" % \
829                            self.seer_master_start)
830
831    def retrieve_software(self, topo, certfile, softdir):
832        """
833        Collect the software that nodes in the topology need loaded and stage
834        it locally.  This implies retrieving it from the experiment_controller
835        and placing it into softdir.  Certfile is used to prove that this node
836        has access to that data (it's the allocation/segment fedid).  Finally
837        local portal and federation software is also copied to the same staging
838        directory for simplicity - all software needed for experiment creation
839        is in softdir.
840        """
841        sw = set()
842        for e in topo.elements:
843            for s in getattr(e, 'software', []):
844                sw.add(s.location)
845        for s in sw:
846            self.log.debug("Retrieving %s" % s)
847            try:
848                get_url(s, certfile, softdir)
849            except:
850                t, v, st = sys.exc_info()
851                raise service_error(service_error.internal,
852                        "Error retrieving %s: %s" % (s, v))
853
854        # Copy local federation and portal node software to the tempdir
855        for s in (self.federation_software, self.portal_software):
856            for l, f in s:
857                base = os.path.basename(f)
858                copy_file(f, "%s/%s" % (softdir, base))
859
860
861    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
862        """
863        Gather common configuration files, retrieve or create an experiment
864        name and project name, and return the ssh_key filenames.  Create an
865        allocation log bound to the state log variable as well.
866        """
867        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
868        ename = None
869        pubkey_base = None
870        secretkey_base = None
871        proj = None
872        user = None
873        alloc_log = None
874
875        for a in attrs:
876            if a['attribute'] in configs:
877                try:
878                    self.log.debug("Retrieving %s from %s" % \
879                            (a['attribute'], a['value']))
880                    get_url(a['value'], certfile, tmpdir)
881                except:
882                    t, v, st = sys.exc_info()
883                    raise service_error(service_error.internal,
884                            "Error retrieving %s: %s" % (a.get('value', ""), v))
885            if a['attribute'] == 'ssh_pubkey':
886                pubkey_base = a['value'].rpartition('/')[2]
887            if a['attribute'] == 'ssh_secretkey':
888                secretkey_base = a['value'].rpartition('/')[2]
889            if a['attribute'] == 'experiment_name':
890                ename = a['value']
891
892        if not ename:
893            ename = ""
894            for i in range(0,5):
895                ename += random.choice(string.ascii_letters)
896            self.log.warn("No experiment name: picked one randomly: %s" \
897                    % ename)
898
899        if not pubkey_base:
900            raise service_error(service_error.req, 
901                    "No public key attribute")
902
903        if not secretkey_base:
904            raise service_error(service_error.req, 
905                    "No secret key attribute")
906
907        self.state_lock.acquire()
908        if aid in self.allocation:
909            proj = self.allocation[aid].get('project', None)
910            if not proj: 
911                proj = self.allocation[aid].get('sproject', None)
912            user = self.allocation[aid].get('user', None)
913            self.allocation[aid]['experiment'] = ename
914            self.allocation[aid]['log'] = [ ]
915            # Create a logger that logs to the experiment's state object as
916            # well as to the main log file.
917            alloc_log = logging.getLogger('fedd.access.%s' % ename)
918            h = logging.StreamHandler(
919                    list_log.list_log(self.allocation[aid]['log']))
920            # XXX: there should be a global one of these rather than
921            # repeating the code.
922            h.setFormatter(logging.Formatter(
923                "%(asctime)s %(name)s %(message)s",
924                        '%d %b %y %H:%M:%S'))
925            alloc_log.addHandler(h)
926            self.write_state()
927        self.state_lock.release()
928
929        if not proj:
930            raise service_error(service_error.internal, 
931                    "Can't find project for %s" %aid)
932
933        if not user:
934            raise service_error(service_error.internal, 
935                    "Can't find creation user for %s" %aid)
936
937        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
938
939    def finalize_experiment(self, starter, topo, aid, alloc_id):
940        """
941        Store key bits of experiment state in the global repository, including
942        the response that may need to be replayed, and return the response.
943        """
944        # Copy the assigned names into the return topology
945        embedding = [ ]
946        for n in starter.node:
947            embedding.append({ 
948                'toponame': n,
949                'physname': ["%s%s" %  (starter.node[n], self.domain)],
950                })
951        # Grab the log (this is some anal locking, but better safe than
952        # sorry)
953        self.state_lock.acquire()
954        logv = "".join(self.allocation[aid]['log'])
955        # It's possible that the StartSegment call gets retried (!).
956        # if the 'started' key is in the allocation, we'll return it rather
957        # than redo the setup.
958        self.allocation[aid]['started'] = { 
959                'allocID': alloc_id,
960                'allocationLog': logv,
961                'segmentdescription': { 
962                    'topdldescription': topo.clone().to_dict()
963                    },
964                'embedding': embedding
965                }
966        retval = copy.copy(self.allocation[aid]['started'])
967        self.write_state()
968        self.state_lock.release()
969        return retval
970   
971    # End of StartSegment support routines
972
973    def StartSegment(self, req, fid):
974        err = None  # Any service_error generated after tmpdir is created
975        rv = None   # Return value from segment creation
976
977        try:
978            req = req['StartSegmentRequestBody']
979            auth_attr = req['allocID']['fedid']
980            topref = req['segmentdescription']['topdldescription']
981        except KeyError:
982            raise service_error(server_error.req, "Badly formed request")
983
984        connInfo = req.get('connection', [])
985        services = req.get('service', [])
986        aid = "%s" % auth_attr
987        attrs = req.get('fedAttr', [])
988        if not self.auth.check_attribute(fid, auth_attr):
989            raise service_error(service_error.access, "Access denied")
990        else:
991            # See if this is a replay of an earlier succeeded StartSegment -
992            # sometimes SSL kills 'em.  If so, replay the response rather than
993            # redoing the allocation.
994            self.state_lock.acquire()
995            retval = self.allocation[aid].get('started', None)
996            self.state_lock.release()
997            if retval:
998                self.log.warning("Duplicate StartSegment for %s: " % aid + \
999                        "replaying response")
1000                return retval
1001
1002        # A new request.  Do it.
1003
1004        if topref: topo = topdl.Topology(**topref)
1005        else:
1006            raise service_error(service_error.req, 
1007                    "Request missing segmentdescription'")
1008       
1009        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1010        try:
1011            tmpdir = tempfile.mkdtemp(prefix="access-")
1012            softdir = "%s/software" % tmpdir
1013            os.mkdir(softdir)
1014        except EnvironmentError:
1015            raise service_error(service_error.internal, "Cannot create tmp dir")
1016
1017        # Try block alllows us to clean up temporary files.
1018        try:
1019            self.retrieve_software(topo, certfile, softdir)
1020            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1021                    self.initialize_experiment_info(attrs, aid, 
1022                            certfile, tmpdir)
1023
1024            # Set up userconf and seer if needed
1025            self.configure_userconf(services, tmpdir)
1026            self.configure_seer_services(services, topo, softdir)
1027            # Get and send synch store variables
1028            self.export_store_info(certfile, proj, ename, connInfo)
1029            self.import_store_info(certfile, connInfo)
1030
1031            expfile = "%s/experiment.tcl" % tmpdir
1032
1033            self.generate_portal_configs(topo, pubkey_base, 
1034                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1035            self.generate_ns2(topo, expfile, 
1036                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1037
1038            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1039                    debug=self.create_debug, log=alloc_log)
1040            rv = starter(self, ename, proj, user, expfile, tmpdir)
1041        except service_error, e:
1042            err = e
1043        except:
1044            t, v, st = sys.exc_info()
1045            err = service_error(service_error.internal, "%s: %s" % \
1046                    (v, traceback.extract_tb(st)))
1047
1048        # Walk up tmpdir, deleting as we go
1049        if self.cleanup: self.remove_dirs(tmpdir)
1050        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1051
1052        if rv:
1053            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1054        elif err:
1055            raise service_error(service_error.federant,
1056                    "Swapin failed: %s" % err)
1057        else:
1058            raise service_error(service_error.federant, "Swapin failed")
1059
1060    def TerminateSegment(self, req, fid):
1061        try:
1062            req = req['TerminateSegmentRequestBody']
1063        except KeyError:
1064            raise service_error(server_error.req, "Badly formed request")
1065
1066        auth_attr = req['allocID']['fedid']
1067        aid = "%s" % auth_attr
1068        attrs = req.get('fedAttr', [])
1069        if not self.auth.check_attribute(fid, auth_attr):
1070            raise service_error(service_error.access, "Access denied")
1071
1072        self.state_lock.acquire()
1073        if aid in self.allocation:
1074            proj = self.allocation[aid].get('project', None)
1075            if not proj: 
1076                proj = self.allocation[aid].get('sproject', None)
1077            user = self.allocation[aid].get('user', None)
1078            ename = self.allocation[aid].get('experiment', None)
1079        else:
1080            proj = None
1081            user = None
1082            ename = None
1083        self.state_lock.release()
1084
1085        if not proj:
1086            raise service_error(service_error.internal, 
1087                    "Can't find project for %s" % aid)
1088
1089        if not user:
1090            raise service_error(service_error.internal, 
1091                    "Can't find creation user for %s" % aid)
1092        if not ename:
1093            raise service_error(service_error.internal, 
1094                    "Can't find experiment name for %s" % aid)
1095        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1096                debug=self.create_debug)
1097        stopper(self, user, proj, ename)
1098        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.