source: fedd/federation/emulab_access.py @ 60961f5

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 60961f5 was 60961f5, checked in by Ted Faber <faber@…>, 14 years ago

dead code

  • Property mode set to 100644
File size: 35.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74
75        self.dragon_endpoint = config.get("access", "dragon")
76        self.dragon_vlans = config.get("access", "dragon_vlans")
77        self.deter_internal = config.get("access", "deter_internal")
78
79        self.tunnel_config = config.getboolean("access", "tunnel_config")
80        self.portal_command = config.get("access", "portal_command")
81        self.portal_image = config.get("access", "portal_image")
82        self.portal_type = config.get("access", "portal_type") or "pc"
83        self.portal_startcommand = config.get("access", "portal_startcommand")
84        self.node_startcommand = config.get("access", "node_startcommand")
85
86        self.federation_software = self.software_list(self.federation_software)
87        self.portal_software = self.software_list(self.portal_software)
88        self.local_seer_software = self.software_list(self.local_seer_software)
89
90        self.access_type = self.access_type.lower()
91        if self.access_type == 'remote_emulab':
92            self.start_segment = proxy_emulab_segment.start_segment
93            self.stop_segment = proxy_emulab_segment.stop_segment
94        elif self.access_type == 'local_emulab':
95            self.start_segment = local_emulab_segment.start_segment
96            self.stop_segment = local_emulab_segment.stop_segment
97        else:
98            self.start_segment = None
99            self.stop_segment = None
100
101        self.restricted = [ ]
102        self.access = { }
103        if config.has_option("access", "accessdb"):
104            self.read_access(config.get("access", "accessdb"))
105        tb = config.get('access', 'testbed')
106        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
107        else: self.testbed = [ ]
108
109        if config.has_option("access", "accessdb"):
110            self.read_access(config.get("access", "accessdb"), 
111                    self.make_access_project)
112
113        # read_state in the base_class
114        self.state_lock.acquire()
115        for a  in ('allocation', 'projects', 'keys', 'types'):
116            if a not in self.state:
117                self.state[a] = { }
118        self.allocation = self.state['allocation']
119        self.projects = self.state['projects']
120        self.keys = self.state['keys']
121        self.types = self.state['types']
122        # Add the ownership attributes to the authorizer.  Note that the
123        # indices of the allocation dict are strings, but the attributes are
124        # fedids, so there is a conversion.
125        for k in self.allocation.keys():
126            for o in self.allocation[k].get('owners', []):
127                self.auth.set_attribute(o, fedid(hexstr=k))
128            if self.allocation[k].has_key('userconfig'):
129                sfid = self.allocation[k]['userconfig']
130                fid = fedid(hexstr=sfid)
131                self.auth.set_attribute(fid, "/%s" % sfid)
132        self.state_lock.release()
133        self.exports = {
134                'SMB': self.export_SMB,
135                'seer': self.export_seer,
136                'tmcd': self.export_tmcd,
137                'userconfig': self.export_userconfig,
138                'project_export': self.export_project_export,
139                'local_seer_control': self.export_local_seer,
140                'seer_master': self.export_seer_master,
141                'hide_hosts': self.export_hide_hosts,
142                }
143
144        if not self.local_seer_image or not self.local_seer_software or \
145                not self.local_seer_start:
146            if 'local_seer_control' in self.exports:
147                del self.exports['local_seer_control']
148
149        if not self.local_seer_image or not self.local_seer_software or \
150                not self.seer_master_start:
151            if 'seer_master' in self.exports:
152                del self.exports['seer_master']
153
154
155        self.soap_services = {\
156            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
157            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
158            'StartSegment': soap_handler("StartSegment", self.StartSegment),
159            'TerminateSegment': soap_handler("TerminateSegment",
160                self.TerminateSegment),
161            }
162        self.xmlrpc_services =  {\
163            'RequestAccess': xmlrpc_handler('RequestAccess',
164                self.RequestAccess),
165            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
166                self.ReleaseAccess),
167            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
168            'TerminateSegment': xmlrpc_handler('TerminateSegment',
169                self.TerminateSegment),
170            }
171
172        self.call_SetValue = service_caller('SetValue')
173        self.call_GetValue = service_caller('GetValue', log=self.log)
174
175        if not config.has_option("allocate", "uri"):
176            self.allocate_project = \
177                allocate_project_local(config, auth)
178        else:
179            self.allocate_project = \
180                allocate_project_remote(config, auth)
181
182
183        # If the project allocator exports services, put them in this object's
184        # maps so that classes that instantiate this can call the services.
185        self.soap_services.update(self.allocate_project.soap_services)
186        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
187
188    @staticmethod
189    def make_access_project(str):
190        """
191        Convert a string of the form (id[:resources:resouces], id, id) into an
192        access_project.  This is called by read_access to convert to local
193        attributes.  It returns a tuple of the form (project, user, user) where
194        users may be names or fedids.
195        """
196        def parse_name(n):
197            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
198            else: return n
199
200        str = str.strip()
201        if str.startswith('(') and str.endswith(')'):
202            str = str[1:-1]
203            names = [ s.strip() for s in str.split(",")]
204            if len(names) > 3:
205                raise self.parse_error("More than three fields in name")
206            first = names[0].split(":")
207            if first == 'fedid:':
208                del first[0]
209                first[0] = fedid(hexstr=first[0])
210            names[0] = access_project(first[0], first[1:])
211
212            for i in range(1,2):
213                names[i] = parse_name(names[i])
214
215            return tuple(names)
216        else:
217            raise self.parse_error('Bad mapping (unbalanced parens)')
218
219
220    # RequestAccess support routines
221
222    def lookup_access(self, req, fid):
223        """
224        Look up the local access control information mapped to this fedid and
225        credentials.  In this case it is a (project, create_user, access_user)
226        triple, and a triple of three booleans indicating which, if any will
227        need to be dynamically created.  Finally a list of owners for that
228        allocation is returned.
229
230        lookup_access_base pulls the first triple out, and it is parsed by this
231        routine into the boolean map.  Owners is always the controlling fedid.
232        """
233        # Return values
234        rp = access_project(None, ())
235        ru = None
236        # This maps a valid user to the Emulab projects and users to use
237        found, match = self.lookup_access_base(req, fid)
238        tb, project, user = match
239       
240        if found == None:
241            raise service_error(service_error.access,
242                    "Access denied - cannot map access")
243
244        # resolve <dynamic> and <same> in found
245        dyn_proj = False
246        dyn_create_user = False
247        dyn_service_user = False
248
249        if found[0].name == "<same>":
250            if project != None:
251                rp.name = project
252            else : 
253                raise service_error(\
254                        service_error.server_config,
255                        "Project matched <same> when no project given")
256        elif found[0].name == "<dynamic>":
257            rp.name = None
258            dyn_proj = True
259        else:
260            rp.name = found[0].name
261        rp.node_types = found[0].node_types;
262
263        if found[1] == "<same>":
264            if user_match == "<any>":
265                if user != None: rcu = user[0]
266                else: raise service_error(\
267                        service_error.server_config,
268                        "Matched <same> on anonymous request")
269            else:
270                rcu = user_match
271        elif found[1] == "<dynamic>":
272            rcu = None
273            dyn_create_user = True
274        else:
275            rcu = found[1]
276       
277        if found[2] == "<same>":
278            if user_match == "<any>":
279                if user != None: rsu = user[0]
280                else: raise service_error(\
281                        service_error.server_config,
282                        "Matched <same> on anonymous request")
283            else:
284                rsu = user_match
285        elif found[2] == "<dynamic>":
286            rsu = None
287            dyn_service_user = True
288        else:
289            rsu = found[2]
290
291        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
292                [ fid ]
293
294    def do_project_allocation(self, dyn, project, user):
295        """
296        Call the project allocation routines and return the info.
297        """
298        if dyn: 
299            # Compose the dynamic project request
300            # (only dynamic, dynamic currently allowed)
301            preq = { 'AllocateProjectRequestBody': \
302                        { 'project' : {\
303                            'user': [ \
304                            { \
305                                'access': [ { 
306                                    'sshPubkey': self.ssh_pubkey_file } ],
307                                 'role': "serviceAccess",\
308                            }, \
309                            { \
310                                'access': [ { 
311                                    'sshPubkey': self.ssh_pubkey_file } ],
312                                 'role': "experimentCreation",\
313                            }, \
314                            ], \
315                            }\
316                        }\
317                    }
318            return self.allocate_project.dynamic_project(preq)
319        else:
320            preq = {'StaticProjectRequestBody' : \
321                    { 'project': \
322                        { 'name' : { 'localname' : project },\
323                          'user' : [ \
324                            {\
325                                'userID': { 'localname' : user }, \
326                                'access': [ { 
327                                    'sshPubkey': self.ssh_pubkey_file } ],
328                                'role': 'experimentCreation'\
329                            },\
330                            {\
331                                'userID': { 'localname' : user}, \
332                                'access': [ { 
333                                    'sshPubkey': self.ssh_pubkey_file } ],
334                                'role': 'serviceAccess'\
335                            },\
336                        ]}\
337                    }\
338            }
339            return self.allocate_project.static_project(preq)
340
341    def save_project_state(self, aid, ap, dyn, owners):
342        """
343        Parse out and save the information relevant to the project created for
344        this experiment.  That info is largely in ap and owners.  dyn indicates
345        that the project was created dynamically.  Return the user and project
346        names.
347        """
348        self.state_lock.acquire()
349        self.allocation[aid] = { }
350        try:
351            pname = ap['project']['name']['localname']
352        except KeyError:
353            pname = None
354
355        if dyn:
356            if not pname:
357                self.state_lock.release()
358                raise service_error(service_error.internal,
359                        "Misformed allocation response?")
360            if pname in self.projects: self.projects[pname] += 1
361            else: self.projects[pname] = 1
362            self.allocation[aid]['project'] = pname
363        else:
364            # sproject is a static project associated with this allocation.
365            self.allocation[aid]['sproject'] = pname
366
367        self.allocation[aid]['keys'] = [ ]
368
369        try:
370            for u in ap['project']['user']:
371                uname = u['userID']['localname']
372                if u['role'] == 'experimentCreation':
373                    self.allocation[aid]['user'] = uname
374                for k in [ k['sshPubkey'] for k in u['access'] \
375                        if k.has_key('sshPubkey') ]:
376                    kv = "%s:%s" % (uname, k)
377                    if self.keys.has_key(kv): self.keys[kv] += 1
378                    else: self.keys[kv] = 1
379                    self.allocation[aid]['keys'].append((uname, k))
380        except KeyError:
381            self.state_lock.release()
382            raise service_error(service_error.internal,
383                    "Misformed allocation response?")
384
385        self.allocation[aid]['owners'] = owners
386        self.write_state()
387        self.state_lock.release()
388        return (pname, uname)
389
390    # End of RequestAccess support routines
391
392    def RequestAccess(self, req, fid):
393        """
394        Handle the access request.  Proxy if not for us.
395
396        Parse out the fields and make the allocations or rejections if for us,
397        otherwise, assuming we're willing to proxy, proxy the request out.
398        """
399
400        def gateway_hardware(h):
401            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
402            else: return h
403
404        def get_export_project(svcs):
405            """
406            if the service requests includes one to export a project, return
407            that project.
408            """
409            rv = None
410            for s in svcs:
411                if s.get('name', '') == 'project_export' and \
412                        s.get('visibility', '') == 'export':
413                    if not rv: 
414                        for a in s.get('feddAttr', []):
415                            if a.get('attribute', '') == 'project' \
416                                    and 'value' in a:
417                                rv = a['value']
418                    else:
419                        raise service_error(service_error, access, 
420                                'Requesting multiple project exports is ' + \
421                                        'not supported');
422            return rv
423
424        # The dance to get into the request body
425        if req.has_key('RequestAccessRequestBody'):
426            req = req['RequestAccessRequestBody']
427        else:
428            raise service_error(service_error.req, "No request!?")
429
430
431        found, dyn, owners = self.lookup_access(req, fid)
432        ap = None
433
434        # if this includes a project export request and the exported
435        # project is not the access project, access denied.
436        if 'service' in req:
437            ep = get_export_project(req['service'])
438            if ep and ep != found[0].name:
439                raise service_error(service_error.access,
440                        "Cannot export %s" % ep)
441
442        if self.ssh_pubkey_file:
443            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
444        else:
445            raise service_error(service_error.internal, 
446                    "SSH access parameters required")
447        # keep track of what's been added
448        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
449        aid = unicode(allocID)
450
451        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
452
453        services, svc_state = self.export_services(req.get('service',[]),
454                pname, uname)
455        self.state_lock.acquire()
456        # Store services state in global state
457        for k, v in svc_state.items():
458            self.allocation[aid][k] = v
459        self.write_state()
460        self.state_lock.release()
461        # Give the owners the right to change this allocation
462        for o in owners:
463            self.auth.set_attribute(o, allocID)
464        try:
465            f = open("%s/%s.pem" % (self.certdir, aid), "w")
466            print >>f, alloc_cert
467            f.close()
468        except EnvironmentError, e:
469            raise service_error(service_error.internal, 
470                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
471        resp = self.build_access_response({ 'fedid': allocID } ,
472                ap, services)
473        return resp
474
475    def do_release_project(self, del_project, del_users, del_types):
476        """
477        If a project and users has to be deleted, make the call.
478        """
479        msg = { 'project': { }}
480        if del_project:
481            msg['project']['name']= {'localname': del_project}
482        users = [ ]
483        for u in del_users.keys():
484            users.append({ 'userID': { 'localname': u },\
485                'access' :  \
486                        [ {'sshPubkey' : s } for s in del_users[u]]\
487            })
488        if users: 
489            msg['project']['user'] = users
490        if len(del_types) > 0:
491            msg['resources'] = { 'node': \
492                    [ {'hardware': [ h ] } for h in del_types ]\
493                }
494        if self.allocate_project.release_project:
495            msg = { 'ReleaseProjectRequestBody' : msg}
496            self.allocate_project.release_project(msg)
497
498    def ReleaseAccess(self, req, fid):
499        # The dance to get into the request body
500        if req.has_key('ReleaseAccessRequestBody'):
501            req = req['ReleaseAccessRequestBody']
502        else:
503            raise service_error(service_error.req, "No request!?")
504
505        try:
506            if req['allocID'].has_key('localname'):
507                auth_attr = aid = req['allocID']['localname']
508            elif req['allocID'].has_key('fedid'):
509                aid = unicode(req['allocID']['fedid'])
510                auth_attr = req['allocID']['fedid']
511            else:
512                raise service_error(service_error.req,
513                        "Only localnames and fedids are understood")
514        except KeyError:
515            raise service_error(service_error.req, "Badly formed request")
516
517        self.log.debug("[access] deallocation requested for %s", aid)
518        if not self.auth.check_attribute(fid, auth_attr):
519            self.log.debug("[access] deallocation denied for %s", aid)
520            raise service_error(service_error.access, "Access Denied")
521
522        # If we know this allocation, reduce the reference counts and
523        # remove the local allocations.  Otherwise report an error.  If
524        # there is an allocation to delete, del_users will be a dictonary
525        # of sets where the key is the user that owns the keys in the set.
526        # We use a set to avoid duplicates.  del_project is just the name
527        # of any dynamic project to delete.  We're somewhat lazy about
528        # deleting authorization attributes.  Having access to something
529        # that doesn't exist isn't harmful.
530        del_users = { }
531        del_project = None
532        del_types = set()
533
534        self.state_lock.acquire()
535        if aid in self.allocation:
536            self.log.debug("Found allocation for %s" %aid)
537            for k in self.allocation[aid]['keys']:
538                kk = "%s:%s" % k
539                self.keys[kk] -= 1
540                if self.keys[kk] == 0:
541                    if not del_users.has_key(k[0]):
542                        del_users[k[0]] = set()
543                    del_users[k[0]].add(k[1])
544                    del self.keys[kk]
545
546            if 'project' in self.allocation[aid]:
547                pname = self.allocation[aid]['project']
548                self.projects[pname] -= 1
549                if self.projects[pname] == 0:
550                    del_project = pname
551                    del self.projects[pname]
552
553            if 'types' in self.allocation[aid]:
554                for t in self.allocation[aid]['types']:
555                    self.types[t] -= 1
556                    if self.types[t] == 0:
557                        if not del_project: del_project = t[0]
558                        del_types.add(t[1])
559                        del self.types[t]
560
561            del self.allocation[aid]
562            self.write_state()
563            self.state_lock.release()
564            # If we actually have resources to deallocate, prepare the call.
565            if del_project or del_users:
566                self.do_release_project(del_project, del_users, del_types)
567            # And remove the access cert
568            cf = "%s/%s.pem" % (self.certdir, aid)
569            self.log.debug("Removing %s" % cf)
570            os.remove(cf)
571            return { 'allocID': req['allocID'] } 
572        else:
573            self.state_lock.release()
574            raise service_error(service_error.req, "No such allocation")
575
576    # These are subroutines for StartSegment
577    def generate_ns2(self, topo, expfn, softdir, connInfo):
578        """
579        Convert topo into an ns2 file, decorated with appropriate commands for
580        the particular testbed setup.  Convert all requests for software, etc
581        to point at the staged copies on this testbed and add the federation
582        startcommands.
583        """
584        class dragon_commands:
585            """
586            Functor to spit out approrpiate dragon commands for nodes listed in
587            the connectivity description.  The constructor makes a dict mapping
588            dragon nodes to their parameters and the __call__ checks each
589            element in turn for membership.
590            """
591            def __init__(self, map):
592                self.node_info = map
593
594            def __call__(self, e):
595                s = ""
596                if isinstance(e, topdl.Computer):
597                    if self.node_info.has_key(e.name):
598                        info = self.node_info[e.name]
599                        for ifname, vlan, type in info:
600                            for i in e.interface:
601                                if i.name == ifname:
602                                    addr = i.get_attribute('ip4_address')
603                                    subs = i.substrate[0]
604                                    break
605                            else:
606                                raise service_error(service_error.internal,
607                                        "No interface %s on element %s" % \
608                                                (ifname, e.name))
609                            # XXX: do netmask right
610                            if type =='link':
611                                s = ("tb-allow-external ${%s} " + \
612                                        "dragonportal ip %s vlan %s " + \
613                                        "netmask 255.255.255.0\n") % \
614                                        (e.name, addr, vlan)
615                            elif type =='lan':
616                                s = ("tb-allow-external ${%s} " + \
617                                        "dragonportal " + \
618                                        "ip %s vlan %s usurp %s\n") % \
619                                        (e.name, addr, vlan, subs)
620                            else:
621                                raise service_error(service_error_internal,
622                                        "Unknown DRAGON type %s" % type)
623                return s
624
625        class not_dragon:
626            """
627            Return true if a node is in the given map of dragon nodes.
628            """
629            def __init__(self, map):
630                self.nodes = set(map.keys())
631
632            def __call__(self, e):
633                return e.name not in self.nodes
634
635        # Main line of generate_ns2
636        t = topo.clone()
637
638        # Create the map of nodes that need direct connections (dragon
639        # connections) from the connInfo
640        dragon_map = { }
641        for i in [ i for i in connInfo if i['type'] == 'transit']:
642            for a in i.get('fedAttr', []):
643                if a['attribute'] == 'vlan_id':
644                    vlan = a['value']
645                    break
646            else:
647                raise service_error(service_error.internal, "No vlan tag")
648            members = i.get('member', [])
649            if len(members) > 1: type = 'lan'
650            else: type = 'link'
651
652            try:
653                for m in members:
654                    if m['element'] in dragon_map:
655                        dragon_map[m['element']].append(( m['interface'], 
656                            vlan, type))
657                    else:
658                        dragon_map[m['element']] = [( m['interface'], 
659                            vlan, type),]
660            except KeyError:
661                raise service_error(service_error.req,
662                        "Missing connectivity info")
663
664        # Weed out the things we aren't going to instantiate: Segments, portal
665        # substrates, and portal interfaces.  (The copy in the for loop allows
666        # us to delete from e.elements in side the for loop).  While we're
667        # touching all the elements, we also adjust paths from the original
668        # testbed to local testbed paths and put the federation commands into
669        # the start commands
670        for e in [e for e in t.elements]:
671            if isinstance(e, topdl.Segment):
672                t.elements.remove(e)
673            if isinstance(e, topdl.Computer):
674                self.add_kit(e, self.federation_software)
675                if e.get_attribute('portal') and self.portal_startcommand:
676                    # Add local portal support software
677                    self.add_kit(e, self.portal_software)
678                    # Portals never have a user-specified start command
679                    e.set_attribute('startup', self.portal_startcommand)
680                elif self.node_startcommand:
681                    if e.get_attribute('startup'):
682                        e.set_attribute('startup', "%s \\$USER '%s'" % \
683                                (self.node_startcommand, 
684                                    e.get_attribute('startup')))
685                    else:
686                        e.set_attribute('startup', self.node_startcommand)
687
688                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
689                # Remove portal interfaces that do not connect to DRAGON
690                e.interface = [i for i in e.interface \
691                        if not i.get_attribute('portal') or i.name in dinf ]
692            # Fix software paths
693            for s in getattr(e, 'software', []):
694                s.location = re.sub("^.*/", softdir, s.location)
695
696        t.substrates = [ s.clone() for s in t.substrates ]
697        t.incorporate_elements()
698
699        # Customize the ns2 output for local portal commands and images
700        filters = []
701
702        if self.dragon_endpoint:
703            add_filter = not_dragon(dragon_map)
704            filters.append(dragon_commands(dragon_map))
705        else:
706            add_filter = None
707
708        if self.portal_command:
709            filters.append(topdl.generate_portal_command_filter(
710                self.portal_command, add_filter=add_filter))
711
712        if self.portal_image:
713            filters.append(topdl.generate_portal_image_filter(
714                self.portal_image))
715
716        if self.portal_type:
717            filters.append(topdl.generate_portal_hardware_filter(
718                self.portal_type))
719
720        # Convert to ns and write it out
721        expfile = topdl.topology_to_ns2(t, filters)
722        try:
723            f = open(expfn, "w")
724            print >>f, expfile
725            f.close()
726        except EnvironmentError:
727            raise service_error(service_error.internal,
728                    "Cannot write experiment file %s: %s" % (expfn,e))
729
730    def export_store_info(self, cf, proj, ename, connInfo):
731        """
732        For the export requests in the connection info, install the peer names
733        at the experiment controller via SetValue calls.
734        """
735
736        for c in connInfo:
737            for p in [ p for p in c.get('parameter', []) \
738                    if p.get('type', '') == 'output']:
739
740                if p.get('name', '') == 'peer':
741                    k = p.get('key', None)
742                    surl = p.get('store', None)
743                    if surl and k and k.index('/') != -1:
744                        value = "%s.%s.%s%s" % \
745                                (k[k.index('/')+1:], ename, proj, self.domain)
746                        req = { 'name': k, 'value': value }
747                        self.log.debug("Setting %s to %s on %s" % \
748                                (k, value, surl))
749                        self.call_SetValue(surl, req, cf)
750                    else:
751                        self.log.error("Bad export request: %s" % p)
752                elif p.get('name', '') == 'ssh_port':
753                    k = p.get('key', None)
754                    surl = p.get('store', None)
755                    if surl and k:
756                        req = { 'name': k, 'value': self.ssh_port }
757                        self.log.debug("Setting %s to %s on %s" % \
758                                (k, self.ssh_port, surl))
759                        self.call_SetValue(surl, req, cf)
760                    else:
761                        self.log.error("Bad export request: %s" % p)
762                else:
763                    self.log.error("Unknown export parameter: %s" % \
764                            p.get('name'))
765                    continue
766
767    def add_seer_node(self, topo, name, startup):
768        """
769        Add a seer node to the given topology, with the startup command passed
770        in.  Used by configure seer_services.
771        """
772        c_node = topdl.Computer(
773                name=name, 
774                os= topdl.OperatingSystem(
775                    attribute=[
776                    { 'attribute': 'osid', 
777                        'value': self.local_seer_image },
778                    ]),
779                attribute=[
780                    { 'attribute': 'startup', 'value': startup },
781                    ]
782                )
783        self.add_kit(c_node, self.local_seer_software)
784        topo.elements.append(c_node)
785
786    def configure_seer_services(self, services, topo, softdir):
787        """
788        Make changes to the topology required for the seer requests being made.
789        Specifically, add any control or master nodes required and set up the
790        start commands on the nodes to interconnect them.
791        """
792        local_seer = False      # True if we need to add a control node
793        collect_seer = False    # True if there is a seer-master node
794        seer_master= False      # True if we need to add the seer-master
795        for s in services:
796            s_name = s.get('name', '')
797            s_vis = s.get('visibility','')
798
799            if s_name  == 'local_seer_control' and s_vis == 'export':
800                local_seer = True
801            elif s_name == 'seer_master':
802                if s_vis == 'import':
803                    collect_seer = True
804                elif s_vis == 'export':
805                    seer_master = True
806       
807        # We've got the whole picture now, so add nodes if needed and configure
808        # them to interconnect properly.
809        if local_seer or seer_master:
810            # Copy local seer control node software to the tempdir
811            for l, f in self.local_seer_software:
812                base = os.path.basename(f)
813                copy_file(f, "%s/%s" % (softdir, base))
814        # If we're collecting seers somewhere the controllers need to talk to
815        # the master.  In testbeds that export the service, that will be a
816        # local node that we'll add below.  Elsewhere it will be the control
817        # portal that will port forward to the exporting master.
818        if local_seer:
819            if collect_seer:
820                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
821            else:
822                startup = self.local_seer_start
823            self.add_seer_node(topo, 'control', startup)
824        # If this is the seer master, add that node, too.
825        if seer_master:
826            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
827
828    def retrieve_software(self, topo, certfile, softdir):
829        """
830        Collect the software that nodes in the topology need loaded and stage
831        it locally.  This implies retrieving it from the experiment_controller
832        and placing it into softdir.  Certfile is used to prove that this node
833        has access to that data (it's the allocation/segment fedid).  Finally
834        local portal and federation software is also copied to the same staging
835        directory for simplicity - all software needed for experiment creation
836        is in softdir.
837        """
838        sw = set()
839        for e in topo.elements:
840            for s in getattr(e, 'software', []):
841                sw.add(s.location)
842        for s in sw:
843            self.log.debug("Retrieving %s" % s)
844            try:
845                get_url(s, certfile, softdir)
846            except:
847                t, v, st = sys.exc_info()
848                raise service_error(service_error.internal,
849                        "Error retrieving %s: %s" % (s, v))
850
851        # Copy local federation and portal node software to the tempdir
852        for s in (self.federation_software, self.portal_software):
853            for l, f in s:
854                base = os.path.basename(f)
855                copy_file(f, "%s/%s" % (softdir, base))
856
857
858    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
859        """
860        Gather common configuration files, retrieve or create an experiment
861        name and project name, and return the ssh_key filenames.  Create an
862        allocation log bound to the state log variable as well.
863        """
864        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
865        ename = None
866        pubkey_base = None
867        secretkey_base = None
868        proj = None
869        user = None
870        alloc_log = None
871
872        for a in attrs:
873            if a['attribute'] in configs:
874                try:
875                    self.log.debug("Retrieving %s from %s" % \
876                            (a['attribute'], a['value']))
877                    get_url(a['value'], certfile, tmpdir)
878                except:
879                    t, v, st = sys.exc_info()
880                    raise service_error(service_error.internal,
881                            "Error retrieving %s: %s" % (a.get('value', ""), v))
882            if a['attribute'] == 'ssh_pubkey':
883                pubkey_base = a['value'].rpartition('/')[2]
884            if a['attribute'] == 'ssh_secretkey':
885                secretkey_base = a['value'].rpartition('/')[2]
886            if a['attribute'] == 'experiment_name':
887                ename = a['value']
888
889        if not ename:
890            ename = ""
891            for i in range(0,5):
892                ename += random.choice(string.ascii_letters)
893            self.log.warn("No experiment name: picked one randomly: %s" \
894                    % ename)
895
896        if not pubkey_base:
897            raise service_error(service_error.req, 
898                    "No public key attribute")
899
900        if not secretkey_base:
901            raise service_error(service_error.req, 
902                    "No secret key attribute")
903
904        self.state_lock.acquire()
905        if aid in self.allocation:
906            proj = self.allocation[aid].get('project', None)
907            if not proj: 
908                proj = self.allocation[aid].get('sproject', None)
909            user = self.allocation[aid].get('user', None)
910            self.allocation[aid]['experiment'] = ename
911            self.allocation[aid]['log'] = [ ]
912            # Create a logger that logs to the experiment's state object as
913            # well as to the main log file.
914            alloc_log = logging.getLogger('fedd.access.%s' % ename)
915            h = logging.StreamHandler(
916                    list_log.list_log(self.allocation[aid]['log']))
917            # XXX: there should be a global one of these rather than
918            # repeating the code.
919            h.setFormatter(logging.Formatter(
920                "%(asctime)s %(name)s %(message)s",
921                        '%d %b %y %H:%M:%S'))
922            alloc_log.addHandler(h)
923            self.write_state()
924        self.state_lock.release()
925
926        if not proj:
927            raise service_error(service_error.internal, 
928                    "Can't find project for %s" %aid)
929
930        if not user:
931            raise service_error(service_error.internal, 
932                    "Can't find creation user for %s" %aid)
933
934        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
935
936    def finalize_experiment(self, starter, topo, aid, alloc_id):
937        """
938        Store key bits of experiment state in the global repository, including
939        the response that may need to be replayed, and return the response.
940        """
941        # Copy the assigned names into the return topology
942        embedding = [ ]
943        for n in starter.node:
944            embedding.append({ 
945                'toponame': n,
946                'physname': ["%s%s" %  (starter.node[n], self.domain)],
947                })
948        # Grab the log (this is some anal locking, but better safe than
949        # sorry)
950        self.state_lock.acquire()
951        logv = "".join(self.allocation[aid]['log'])
952        # It's possible that the StartSegment call gets retried (!).
953        # if the 'started' key is in the allocation, we'll return it rather
954        # than redo the setup.
955        self.allocation[aid]['started'] = { 
956                'allocID': alloc_id,
957                'allocationLog': logv,
958                'segmentdescription': { 
959                    'topdldescription': topo.clone().to_dict()
960                    },
961                'embedding': embedding
962                }
963        retval = copy.copy(self.allocation[aid]['started'])
964        self.write_state()
965        self.state_lock.release()
966        return retval
967   
968    # End of StartSegment support routines
969
970    def StartSegment(self, req, fid):
971        err = None  # Any service_error generated after tmpdir is created
972        rv = None   # Return value from segment creation
973
974        try:
975            req = req['StartSegmentRequestBody']
976            auth_attr = req['allocID']['fedid']
977            topref = req['segmentdescription']['topdldescription']
978        except KeyError:
979            raise service_error(server_error.req, "Badly formed request")
980
981        connInfo = req.get('connection', [])
982        services = req.get('service', [])
983        aid = "%s" % auth_attr
984        attrs = req.get('fedAttr', [])
985        if not self.auth.check_attribute(fid, auth_attr):
986            raise service_error(service_error.access, "Access denied")
987        else:
988            # See if this is a replay of an earlier succeeded StartSegment -
989            # sometimes SSL kills 'em.  If so, replay the response rather than
990            # redoing the allocation.
991            self.state_lock.acquire()
992            retval = self.allocation[aid].get('started', None)
993            self.state_lock.release()
994            if retval:
995                self.log.warning("Duplicate StartSegment for %s: " % aid + \
996                        "replaying response")
997                return retval
998
999        # A new request.  Do it.
1000
1001        if topref: topo = topdl.Topology(**topref)
1002        else:
1003            raise service_error(service_error.req, 
1004                    "Request missing segmentdescription'")
1005       
1006        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1007        try:
1008            tmpdir = tempfile.mkdtemp(prefix="access-")
1009            softdir = "%s/software" % tmpdir
1010            os.mkdir(softdir)
1011        except EnvironmentError:
1012            raise service_error(service_error.internal, "Cannot create tmp dir")
1013
1014        # Try block alllows us to clean up temporary files.
1015        try:
1016            self.retrieve_software(topo, certfile, softdir)
1017            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1018                    self.initialize_experiment_info(attrs, aid, 
1019                            certfile, tmpdir)
1020
1021            # Set up userconf and seer if needed
1022            self.configure_userconf(services, tmpdir)
1023            self.configure_seer_services(services, topo, softdir)
1024            # Get and send synch store variables
1025            self.export_store_info(certfile, proj, ename, connInfo)
1026            self.import_store_info(certfile, connInfo)
1027
1028            expfile = "%s/experiment.tcl" % tmpdir
1029
1030            self.generate_portal_configs(topo, pubkey_base, 
1031                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1032            self.generate_ns2(topo, expfile, 
1033                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1034
1035            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1036                    debug=self.create_debug, log=alloc_log)
1037            rv = starter(self, ename, proj, user, expfile, tmpdir)
1038        except service_error, e:
1039            err = e
1040        except:
1041            t, v, st = sys.exc_info()
1042            err = service_error(service_error.internal, "%s: %s" % \
1043                    (v, traceback.extract_tb(st)))
1044
1045        # Walk up tmpdir, deleting as we go
1046        if self.cleanup: self.remove_dirs(tmpdir)
1047        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1048
1049        if rv:
1050            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1051        elif err:
1052            raise service_error(service_error.federant,
1053                    "Swapin failed: %s" % err)
1054        else:
1055            raise service_error(service_error.federant, "Swapin failed")
1056
1057    def TerminateSegment(self, req, fid):
1058        try:
1059            req = req['TerminateSegmentRequestBody']
1060        except KeyError:
1061            raise service_error(server_error.req, "Badly formed request")
1062
1063        auth_attr = req['allocID']['fedid']
1064        aid = "%s" % auth_attr
1065        attrs = req.get('fedAttr', [])
1066        if not self.auth.check_attribute(fid, auth_attr):
1067            raise service_error(service_error.access, "Access denied")
1068
1069        self.state_lock.acquire()
1070        if aid in self.allocation:
1071            proj = self.allocation[aid].get('project', None)
1072            if not proj: 
1073                proj = self.allocation[aid].get('sproject', None)
1074            user = self.allocation[aid].get('user', None)
1075            ename = self.allocation[aid].get('experiment', None)
1076        else:
1077            proj = None
1078            user = None
1079            ename = None
1080        self.state_lock.release()
1081
1082        if not proj:
1083            raise service_error(service_error.internal, 
1084                    "Can't find project for %s" % aid)
1085
1086        if not user:
1087            raise service_error(service_error.internal, 
1088                    "Can't find creation user for %s" % aid)
1089        if not ename:
1090            raise service_error(service_error.internal, 
1091                    "Can't find experiment name for %s" % aid)
1092        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1093                debug=self.create_debug)
1094        stopper(self, user, proj, ename)
1095        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.