source: fedd/federation/emulab_access.py @ a20a20f

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since a20a20f was a20a20f, checked in by Ted Faber <faber@…>, 14 years ago

Pluggable exports and add directory removal to the base class

  • Property mode set to 100644
File size: 36.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.boss = config.get("access", "boss")
62        self.ops = config.get("access", "ops")
63        self.domain = config.get("access", "domain")
64        self.fileserver = config.get("access", "fileserver")
65        self.eventserver = config.get("access", "eventserver")
66        self.userconfdir = config.get("access","userconfdir")
67        self.userconfcmd = config.get("access","userconfcmd")
68        self.userconfurl = config.get("access","userconfurl")
69        self.federation_software = config.get("access", "federation_software")
70        self.portal_software = config.get("access", "portal_software")
71        self.local_seer_software = config.get("access", "local_seer_software")
72        self.local_seer_image = config.get("access", "local_seer_image")
73        self.local_seer_start = config.get("access", "local_seer_start")
74        self.seer_master_start = config.get("access", "seer_master_start")
75        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
76        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
77        self.ssh_port = config.get("access","ssh_port") or "22"
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        if config.has_option("access", "accessdb"):
108            self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        if config.has_option("access", "accessdb"):
114            self.read_access(config.get("access", "accessdb"), 
115                    self.make_access_project)
116
117        # read_state in the base_class
118        self.state_lock.acquire()
119        for a  in ('allocation', 'projects', 'keys', 'types'):
120            if a not in self.state:
121                self.state[a] = { }
122        self.allocation = self.state['allocation']
123        self.projects = self.state['projects']
124        self.keys = self.state['keys']
125        self.types = self.state['types']
126        # Add the ownership attributes to the authorizer.  Note that the
127        # indices of the allocation dict are strings, but the attributes are
128        # fedids, so there is a conversion.
129        for k in self.allocation.keys():
130            for o in self.allocation[k].get('owners', []):
131                self.auth.set_attribute(o, fedid(hexstr=k))
132            if self.allocation[k].has_key('userconfig'):
133                sfid = self.allocation[k]['userconfig']
134                fid = fedid(hexstr=sfid)
135                self.auth.set_attribute(fid, "/%s" % sfid)
136        self.state_lock.release()
137        self.exports = {
138                'SMB': self.export_SMB,
139                'seer': self.export_seer,
140                'tmcd': self.export_tmcd,
141                'userconfig': self.export_userconfig,
142                'project_export': self.export_project_export,
143                'local_seer_control': self.export_local_seer,
144                'seer_master': self.export_seer_master,
145                'hide_hosts': self.export_hide_hosts,
146                }
147
148        if not self.local_seer_image or not self.local_seer_software or \
149                not self.local_seer_start:
150            if 'local_seer_control' in self.exports:
151                del self.exports['local_seer_control']
152
153        if not self.local_seer_image or not self.local_seer_software or \
154                not self.seer_master_start:
155            if 'seer_master' in self.exports:
156                del self.exports['seer_master']
157
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment",
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176        self.call_SetValue = service_caller('SetValue')
177        self.call_GetValue = service_caller('GetValue', log=self.log)
178
179        if not config.has_option("allocate", "uri"):
180            self.allocate_project = \
181                allocate_project_local(config, auth)
182        else:
183            self.allocate_project = \
184                allocate_project_remote(config, auth)
185
186
187        # If the project allocator exports services, put them in this object's
188        # maps so that classes that instantiate this can call the services.
189        self.soap_services.update(self.allocate_project.soap_services)
190        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
191
192    @staticmethod
193    def make_access_project(str):
194        """
195        Convert a string of the form (id[:resources:resouces], id, id) into an
196        access_project.  This is called by read_access to convert to local
197        attributes.  It returns a tuple of the form (project, user, user) where
198        users may be names or fedids.
199        """
200        def parse_name(n):
201            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
202            else: return n
203
204        str = str.strip()
205        if str.startswith('(') and str.endswith(')'):
206            str = str[1:-1]
207            names = [ s.strip() for s in str.split(",")]
208            if len(names) > 3:
209                raise self.parse_error("More than three fields in name")
210            first = names[0].split(":")
211            if first == 'fedid:':
212                del first[0]
213                first[0] = fedid(hexstr=first[0])
214            names[0] = access_project(first[0], first[1:])
215
216            for i in range(1,2):
217                names[i] = parse_name(names[i])
218
219            return tuple(names)
220        else:
221            raise self.parse_error('Bad mapping (unbalanced parens)')
222
223
224    # RequestAccess support routines
225
226    def lookup_access(self, req, fid):
227        """
228        Look up the local access control information mapped to this fedid and
229        credentials.  In this case it is a (project, create_user, access_user)
230        triple, and a triple of three booleans indicating which, if any will
231        need to be dynamically created.  Finally a list of owners for that
232        allocation is returned.
233
234        lookup_access_base pulls the first triple out, and it is parsed by this
235        routine into the boolean map.  Owners is always the controlling fedid.
236        """
237        # Return values
238        rp = access_project(None, ())
239        ru = None
240        # This maps a valid user to the Emulab projects and users to use
241        found, match = self.lookup_access_base(req, fid)
242        tb, project, user = match
243       
244        if found == None:
245            raise service_error(service_error.access,
246                    "Access denied - cannot map access")
247
248        # resolve <dynamic> and <same> in found
249        dyn_proj = False
250        dyn_create_user = False
251        dyn_service_user = False
252
253        if found[0].name == "<same>":
254            if project != None:
255                rp.name = project
256            else : 
257                raise service_error(\
258                        service_error.server_config,
259                        "Project matched <same> when no project given")
260        elif found[0].name == "<dynamic>":
261            rp.name = None
262            dyn_proj = True
263        else:
264            rp.name = found[0].name
265        rp.node_types = found[0].node_types;
266
267        if found[1] == "<same>":
268            if user_match == "<any>":
269                if user != None: rcu = user[0]
270                else: raise service_error(\
271                        service_error.server_config,
272                        "Matched <same> on anonymous request")
273            else:
274                rcu = user_match
275        elif found[1] == "<dynamic>":
276            rcu = None
277            dyn_create_user = True
278        else:
279            rcu = found[1]
280       
281        if found[2] == "<same>":
282            if user_match == "<any>":
283                if user != None: rsu = user[0]
284                else: raise service_error(\
285                        service_error.server_config,
286                        "Matched <same> on anonymous request")
287            else:
288                rsu = user_match
289        elif found[2] == "<dynamic>":
290            rsu = None
291            dyn_service_user = True
292        else:
293            rsu = found[2]
294
295        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
296                [ fid ]
297
298    def do_project_allocation(self, dyn, project, user):
299        """
300        Call the project allocation routines and return the info.
301        """
302        if dyn: 
303            # Compose the dynamic project request
304            # (only dynamic, dynamic currently allowed)
305            preq = { 'AllocateProjectRequestBody': \
306                        { 'project' : {\
307                            'user': [ \
308                            { \
309                                'access': [ { 
310                                    'sshPubkey': self.ssh_pubkey_file } ],
311                                 'role': "serviceAccess",\
312                            }, \
313                            { \
314                                'access': [ { 
315                                    'sshPubkey': self.ssh_pubkey_file } ],
316                                 'role': "experimentCreation",\
317                            }, \
318                            ], \
319                            }\
320                        }\
321                    }
322            return self.allocate_project.dynamic_project(preq)
323        else:
324            preq = {'StaticProjectRequestBody' : \
325                    { 'project': \
326                        { 'name' : { 'localname' : project },\
327                          'user' : [ \
328                            {\
329                                'userID': { 'localname' : user }, \
330                                'access': [ { 
331                                    'sshPubkey': self.ssh_pubkey_file } ],
332                                'role': 'experimentCreation'\
333                            },\
334                            {\
335                                'userID': { 'localname' : user}, \
336                                'access': [ { 
337                                    'sshPubkey': self.ssh_pubkey_file } ],
338                                'role': 'serviceAccess'\
339                            },\
340                        ]}\
341                    }\
342            }
343            return self.allocate_project.static_project(preq)
344
345    def save_project_state(self, aid, ap, dyn, owners):
346        """
347        Parse out and save the information relevant to the project created for
348        this experiment.  That info is largely in ap and owners.  dyn indicates
349        that the project was created dynamically.  Return the user and project
350        names.
351        """
352        self.state_lock.acquire()
353        self.allocation[aid] = { }
354        try:
355            pname = ap['project']['name']['localname']
356        except KeyError:
357            pname = None
358
359        if dyn:
360            if not pname:
361                self.state_lock.release()
362                raise service_error(service_error.internal,
363                        "Misformed allocation response?")
364            if pname in self.projects: self.projects[pname] += 1
365            else: self.projects[pname] = 1
366            self.allocation[aid]['project'] = pname
367        else:
368            # sproject is a static project associated with this allocation.
369            self.allocation[aid]['sproject'] = pname
370
371        self.allocation[aid]['keys'] = [ ]
372
373        try:
374            for u in ap['project']['user']:
375                uname = u['userID']['localname']
376                if u['role'] == 'experimentCreation':
377                    self.allocation[aid]['user'] = uname
378                for k in [ k['sshPubkey'] for k in u['access'] \
379                        if k.has_key('sshPubkey') ]:
380                    kv = "%s:%s" % (uname, k)
381                    if self.keys.has_key(kv): self.keys[kv] += 1
382                    else: self.keys[kv] = 1
383                    self.allocation[aid]['keys'].append((uname, k))
384        except KeyError:
385            self.state_lock.release()
386            raise service_error(service_error.internal,
387                    "Misformed allocation response?")
388
389        self.allocation[aid]['owners'] = owners
390        self.write_state()
391        self.state_lock.release()
392        return (pname, uname)
393
394    # End of RequestAccess support routines
395
396    def RequestAccess(self, req, fid):
397        """
398        Handle the access request.  Proxy if not for us.
399
400        Parse out the fields and make the allocations or rejections if for us,
401        otherwise, assuming we're willing to proxy, proxy the request out.
402        """
403
404        def gateway_hardware(h):
405            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
406            else: return h
407
408        def get_export_project(svcs):
409            """
410            if the service requests includes one to export a project, return
411            that project.
412            """
413            rv = None
414            for s in svcs:
415                if s.get('name', '') == 'project_export' and \
416                        s.get('visibility', '') == 'export':
417                    if not rv: 
418                        for a in s.get('feddAttr', []):
419                            if a.get('attribute', '') == 'project' \
420                                    and 'value' in a:
421                                rv = a['value']
422                    else:
423                        raise service_error(service_error, access, 
424                                'Requesting multiple project exports is ' + \
425                                        'not supported');
426            return rv
427
428        # The dance to get into the request body
429        if req.has_key('RequestAccessRequestBody'):
430            req = req['RequestAccessRequestBody']
431        else:
432            raise service_error(service_error.req, "No request!?")
433
434
435        found, dyn, owners = self.lookup_access(req, fid)
436        ap = None
437
438        # if this includes a project export request and the exported
439        # project is not the access project, access denied.
440        if 'service' in req:
441            ep = get_export_project(req['service'])
442            if ep and ep != found[0].name:
443                raise service_error(service_error.access,
444                        "Cannot export %s" % ep)
445
446        if self.ssh_pubkey_file:
447            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
448        else:
449            raise service_error(service_error.internal, 
450                    "SSH access parameters required")
451        # keep track of what's been added
452        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
453        aid = unicode(allocID)
454
455        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
456
457        services, svc_state = self.export_services(req.get('service',[]),
458                pname, uname)
459        self.state_lock.acquire()
460        # Store services state in global state
461        for k, v in svc_state.items():
462            self.allocation[aid][k] = v
463        self.write_state()
464        self.state_lock.release()
465        # Give the owners the right to change this allocation
466        for o in owners:
467            self.auth.set_attribute(o, allocID)
468        try:
469            f = open("%s/%s.pem" % (self.certdir, aid), "w")
470            print >>f, alloc_cert
471            f.close()
472        except EnvironmentError, e:
473            raise service_error(service_error.internal, 
474                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
475        resp = self.build_access_response({ 'fedid': allocID } ,
476                ap, services)
477        return resp
478
479    def do_release_project(self, del_project, del_users, del_types):
480        """
481        If a project and users has to be deleted, make the call.
482        """
483        msg = { 'project': { }}
484        if del_project:
485            msg['project']['name']= {'localname': del_project}
486        users = [ ]
487        for u in del_users.keys():
488            users.append({ 'userID': { 'localname': u },\
489                'access' :  \
490                        [ {'sshPubkey' : s } for s in del_users[u]]\
491            })
492        if users: 
493            msg['project']['user'] = users
494        if len(del_types) > 0:
495            msg['resources'] = { 'node': \
496                    [ {'hardware': [ h ] } for h in del_types ]\
497                }
498        if self.allocate_project.release_project:
499            msg = { 'ReleaseProjectRequestBody' : msg}
500            self.allocate_project.release_project(msg)
501
502    def ReleaseAccess(self, req, fid):
503        # The dance to get into the request body
504        if req.has_key('ReleaseAccessRequestBody'):
505            req = req['ReleaseAccessRequestBody']
506        else:
507            raise service_error(service_error.req, "No request!?")
508
509        try:
510            if req['allocID'].has_key('localname'):
511                auth_attr = aid = req['allocID']['localname']
512            elif req['allocID'].has_key('fedid'):
513                aid = unicode(req['allocID']['fedid'])
514                auth_attr = req['allocID']['fedid']
515            else:
516                raise service_error(service_error.req,
517                        "Only localnames and fedids are understood")
518        except KeyError:
519            raise service_error(service_error.req, "Badly formed request")
520
521        self.log.debug("[access] deallocation requested for %s", aid)
522        if not self.auth.check_attribute(fid, auth_attr):
523            self.log.debug("[access] deallocation denied for %s", aid)
524            raise service_error(service_error.access, "Access Denied")
525
526        # If we know this allocation, reduce the reference counts and
527        # remove the local allocations.  Otherwise report an error.  If
528        # there is an allocation to delete, del_users will be a dictonary
529        # of sets where the key is the user that owns the keys in the set.
530        # We use a set to avoid duplicates.  del_project is just the name
531        # of any dynamic project to delete.  We're somewhat lazy about
532        # deleting authorization attributes.  Having access to something
533        # that doesn't exist isn't harmful.
534        del_users = { }
535        del_project = None
536        del_types = set()
537
538        self.state_lock.acquire()
539        if aid in self.allocation:
540            self.log.debug("Found allocation for %s" %aid)
541            for k in self.allocation[aid]['keys']:
542                kk = "%s:%s" % k
543                self.keys[kk] -= 1
544                if self.keys[kk] == 0:
545                    if not del_users.has_key(k[0]):
546                        del_users[k[0]] = set()
547                    del_users[k[0]].add(k[1])
548                    del self.keys[kk]
549
550            if 'project' in self.allocation[aid]:
551                pname = self.allocation[aid]['project']
552                self.projects[pname] -= 1
553                if self.projects[pname] == 0:
554                    del_project = pname
555                    del self.projects[pname]
556
557            if 'types' in self.allocation[aid]:
558                for t in self.allocation[aid]['types']:
559                    self.types[t] -= 1
560                    if self.types[t] == 0:
561                        if not del_project: del_project = t[0]
562                        del_types.add(t[1])
563                        del self.types[t]
564
565            del self.allocation[aid]
566            self.write_state()
567            self.state_lock.release()
568            # If we actually have resources to deallocate, prepare the call.
569            if del_project or del_users:
570                self.do_release_project(del_project, del_users, del_types)
571            # And remove the access cert
572            cf = "%s/%s.pem" % (self.certdir, aid)
573            self.log.debug("Removing %s" % cf)
574            os.remove(cf)
575            return { 'allocID': req['allocID'] } 
576        else:
577            self.state_lock.release()
578            raise service_error(service_error.req, "No such allocation")
579
580    # These are subroutines for StartSegment
581    def generate_ns2(self, topo, expfn, softdir, connInfo):
582        """
583        Convert topo into an ns2 file, decorated with appropriate commands for
584        the particular testbed setup.  Convert all requests for software, etc
585        to point at the staged copies on this testbed and add the federation
586        startcommands.
587        """
588        class dragon_commands:
589            """
590            Functor to spit out approrpiate dragon commands for nodes listed in
591            the connectivity description.  The constructor makes a dict mapping
592            dragon nodes to their parameters and the __call__ checks each
593            element in turn for membership.
594            """
595            def __init__(self, map):
596                self.node_info = map
597
598            def __call__(self, e):
599                s = ""
600                if isinstance(e, topdl.Computer):
601                    if self.node_info.has_key(e.name):
602                        info = self.node_info[e.name]
603                        for ifname, vlan, type in info:
604                            for i in e.interface:
605                                if i.name == ifname:
606                                    addr = i.get_attribute('ip4_address')
607                                    subs = i.substrate[0]
608                                    break
609                            else:
610                                raise service_error(service_error.internal,
611                                        "No interface %s on element %s" % \
612                                                (ifname, e.name))
613                            # XXX: do netmask right
614                            if type =='link':
615                                s = ("tb-allow-external ${%s} " + \
616                                        "dragonportal ip %s vlan %s " + \
617                                        "netmask 255.255.255.0\n") % \
618                                        (e.name, addr, vlan)
619                            elif type =='lan':
620                                s = ("tb-allow-external ${%s} " + \
621                                        "dragonportal " + \
622                                        "ip %s vlan %s usurp %s\n") % \
623                                        (e.name, addr, vlan, subs)
624                            else:
625                                raise service_error(service_error_internal,
626                                        "Unknown DRAGON type %s" % type)
627                return s
628
629        class not_dragon:
630            """
631            Return true if a node is in the given map of dragon nodes.
632            """
633            def __init__(self, map):
634                self.nodes = set(map.keys())
635
636            def __call__(self, e):
637                return e.name not in self.nodes
638
639        # Main line of generate_ns2
640        t = topo.clone()
641
642        # Create the map of nodes that need direct connections (dragon
643        # connections) from the connInfo
644        dragon_map = { }
645        for i in [ i for i in connInfo if i['type'] == 'transit']:
646            for a in i.get('fedAttr', []):
647                if a['attribute'] == 'vlan_id':
648                    vlan = a['value']
649                    break
650            else:
651                raise service_error(service_error.internal, "No vlan tag")
652            members = i.get('member', [])
653            if len(members) > 1: type = 'lan'
654            else: type = 'link'
655
656            try:
657                for m in members:
658                    if m['element'] in dragon_map:
659                        dragon_map[m['element']].append(( m['interface'], 
660                            vlan, type))
661                    else:
662                        dragon_map[m['element']] = [( m['interface'], 
663                            vlan, type),]
664            except KeyError:
665                raise service_error(service_error.req,
666                        "Missing connectivity info")
667
668        # Weed out the things we aren't going to instantiate: Segments, portal
669        # substrates, and portal interfaces.  (The copy in the for loop allows
670        # us to delete from e.elements in side the for loop).  While we're
671        # touching all the elements, we also adjust paths from the original
672        # testbed to local testbed paths and put the federation commands into
673        # the start commands
674        for e in [e for e in t.elements]:
675            if isinstance(e, topdl.Segment):
676                t.elements.remove(e)
677            if isinstance(e, topdl.Computer):
678                self.add_kit(e, self.federation_software)
679                if e.get_attribute('portal') and self.portal_startcommand:
680                    # Add local portal support software
681                    self.add_kit(e, self.portal_software)
682                    # Portals never have a user-specified start command
683                    e.set_attribute('startup', self.portal_startcommand)
684                elif self.node_startcommand:
685                    if e.get_attribute('startup'):
686                        e.set_attribute('startup', "%s \\$USER '%s'" % \
687                                (self.node_startcommand, 
688                                    e.get_attribute('startup')))
689                    else:
690                        e.set_attribute('startup', self.node_startcommand)
691
692                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
693                # Remove portal interfaces that do not connect to DRAGON
694                e.interface = [i for i in e.interface \
695                        if not i.get_attribute('portal') or i.name in dinf ]
696            # Fix software paths
697            for s in getattr(e, 'software', []):
698                s.location = re.sub("^.*/", softdir, s.location)
699
700        t.substrates = [ s.clone() for s in t.substrates ]
701        t.incorporate_elements()
702
703        # Customize the ns2 output for local portal commands and images
704        filters = []
705
706        if self.dragon_endpoint:
707            add_filter = not_dragon(dragon_map)
708            filters.append(dragon_commands(dragon_map))
709        else:
710            add_filter = None
711
712        if self.portal_command:
713            filters.append(topdl.generate_portal_command_filter(
714                self.portal_command, add_filter=add_filter))
715
716        if self.portal_image:
717            filters.append(topdl.generate_portal_image_filter(
718                self.portal_image))
719
720        if self.portal_type:
721            filters.append(topdl.generate_portal_hardware_filter(
722                self.portal_type))
723
724        # Convert to ns and write it out
725        expfile = topdl.topology_to_ns2(t, filters)
726        try:
727            f = open(expfn, "w")
728            print >>f, expfile
729            f.close()
730        except EnvironmentError:
731            raise service_error(service_error.internal,
732                    "Cannot write experiment file %s: %s" % (expfn,e))
733
734    def export_store_info(self, cf, proj, ename, connInfo):
735        """
736        For the export requests in the connection info, install the peer names
737        at the experiment controller via SetValue calls.
738        """
739
740        for c in connInfo:
741            for p in [ p for p in c.get('parameter', []) \
742                    if p.get('type', '') == 'output']:
743
744                if p.get('name', '') == 'peer':
745                    k = p.get('key', None)
746                    surl = p.get('store', None)
747                    if surl and k and k.index('/') != -1:
748                        value = "%s.%s.%s%s" % \
749                                (k[k.index('/')+1:], ename, proj, self.domain)
750                        req = { 'name': k, 'value': value }
751                        self.log.debug("Setting %s to %s on %s" % \
752                                (k, value, surl))
753                        self.call_SetValue(surl, req, cf)
754                    else:
755                        self.log.error("Bad export request: %s" % p)
756                elif p.get('name', '') == 'ssh_port':
757                    k = p.get('key', None)
758                    surl = p.get('store', None)
759                    if surl and k:
760                        req = { 'name': k, 'value': self.ssh_port }
761                        self.log.debug("Setting %s to %s on %s" % \
762                                (k, self.ssh_port, surl))
763                        self.call_SetValue(surl, req, cf)
764                    else:
765                        self.log.error("Bad export request: %s" % p)
766                else:
767                    self.log.error("Unknown export parameter: %s" % \
768                            p.get('name'))
769                    continue
770
771    def add_seer_node(self, topo, name, startup):
772        """
773        Add a seer node to the given topology, with the startup command passed
774        in.  Used by configure seer_services.
775        """
776        c_node = topdl.Computer(
777                name=name, 
778                os= topdl.OperatingSystem(
779                    attribute=[
780                    { 'attribute': 'osid', 
781                        'value': self.local_seer_image },
782                    ]),
783                attribute=[
784                    { 'attribute': 'startup', 'value': startup },
785                    ]
786                )
787        self.add_kit(c_node, self.local_seer_software)
788        topo.elements.append(c_node)
789
790    def configure_seer_services(self, services, topo, softdir):
791        """
792        Make changes to the topology required for the seer requests being made.
793        Specifically, add any control or master nodes required and set up the
794        start commands on the nodes to interconnect them.
795        """
796        local_seer = False      # True if we need to add a control node
797        collect_seer = False    # True if there is a seer-master node
798        seer_master= False      # True if we need to add the seer-master
799        for s in services:
800            s_name = s.get('name', '')
801            s_vis = s.get('visibility','')
802
803            if s_name  == 'local_seer_control' and s_vis == 'export':
804                local_seer = True
805            elif s_name == 'seer_master':
806                if s_vis == 'import':
807                    collect_seer = True
808                elif s_vis == 'export':
809                    seer_master = True
810       
811        # We've got the whole picture now, so add nodes if needed and configure
812        # them to interconnect properly.
813        if local_seer or seer_master:
814            # Copy local seer control node software to the tempdir
815            for l, f in self.local_seer_software:
816                base = os.path.basename(f)
817                copy_file(f, "%s/%s" % (softdir, base))
818        # If we're collecting seers somewhere the controllers need to talk to
819        # the master.  In testbeds that export the service, that will be a
820        # local node that we'll add below.  Elsewhere it will be the control
821        # portal that will port forward to the exporting master.
822        if local_seer:
823            if collect_seer:
824                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
825            else:
826                startup = self.local_seer_start
827            self.add_seer_node(topo, 'control', startup)
828        # If this is the seer master, add that node, too.
829        if seer_master:
830            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
831
832    def retrieve_software(self, topo, certfile, softdir):
833        """
834        Collect the software that nodes in the topology need loaded and stage
835        it locally.  This implies retrieving it from the experiment_controller
836        and placing it into softdir.  Certfile is used to prove that this node
837        has access to that data (it's the allocation/segment fedid).  Finally
838        local portal and federation software is also copied to the same staging
839        directory for simplicity - all software needed for experiment creation
840        is in softdir.
841        """
842        sw = set()
843        for e in topo.elements:
844            for s in getattr(e, 'software', []):
845                sw.add(s.location)
846        for s in sw:
847            self.log.debug("Retrieving %s" % s)
848            try:
849                get_url(s, certfile, softdir)
850            except:
851                t, v, st = sys.exc_info()
852                raise service_error(service_error.internal,
853                        "Error retrieving %s: %s" % (s, v))
854
855        # Copy local federation and portal node software to the tempdir
856        for s in (self.federation_software, self.portal_software):
857            for l, f in s:
858                base = os.path.basename(f)
859                copy_file(f, "%s/%s" % (softdir, base))
860
861
862    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
863        """
864        Gather common configuration files, retrieve or create an experiment
865        name and project name, and return the ssh_key filenames.  Create an
866        allocation log bound to the state log variable as well.
867        """
868        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
869        ename = None
870        pubkey_base = None
871        secretkey_base = None
872        proj = None
873        user = None
874        alloc_log = None
875
876        for a in attrs:
877            if a['attribute'] in configs:
878                try:
879                    self.log.debug("Retrieving %s from %s" % \
880                            (a['attribute'], a['value']))
881                    get_url(a['value'], certfile, tmpdir)
882                except:
883                    t, v, st = sys.exc_info()
884                    raise service_error(service_error.internal,
885                            "Error retrieving %s: %s" % (a.get('value', ""), v))
886            if a['attribute'] == 'ssh_pubkey':
887                pubkey_base = a['value'].rpartition('/')[2]
888            if a['attribute'] == 'ssh_secretkey':
889                secretkey_base = a['value'].rpartition('/')[2]
890            if a['attribute'] == 'experiment_name':
891                ename = a['value']
892
893        if not ename:
894            ename = ""
895            for i in range(0,5):
896                ename += random.choice(string.ascii_letters)
897            self.log.warn("No experiment name: picked one randomly: %s" \
898                    % ename)
899
900        if not pubkey_base:
901            raise service_error(service_error.req, 
902                    "No public key attribute")
903
904        if not secretkey_base:
905            raise service_error(service_error.req, 
906                    "No secret key attribute")
907
908        self.state_lock.acquire()
909        if aid in self.allocation:
910            proj = self.allocation[aid].get('project', None)
911            if not proj: 
912                proj = self.allocation[aid].get('sproject', None)
913            user = self.allocation[aid].get('user', None)
914            self.allocation[aid]['experiment'] = ename
915            self.allocation[aid]['log'] = [ ]
916            # Create a logger that logs to the experiment's state object as
917            # well as to the main log file.
918            alloc_log = logging.getLogger('fedd.access.%s' % ename)
919            h = logging.StreamHandler(
920                    list_log.list_log(self.allocation[aid]['log']))
921            # XXX: there should be a global one of these rather than
922            # repeating the code.
923            h.setFormatter(logging.Formatter(
924                "%(asctime)s %(name)s %(message)s",
925                        '%d %b %y %H:%M:%S'))
926            alloc_log.addHandler(h)
927            self.write_state()
928        self.state_lock.release()
929
930        if not proj:
931            raise service_error(service_error.internal, 
932                    "Can't find project for %s" %aid)
933
934        if not user:
935            raise service_error(service_error.internal, 
936                    "Can't find creation user for %s" %aid)
937
938        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
939
940    def finalize_experiment(self, starter, topo, aid, alloc_id):
941        """
942        Store key bits of experiment state in the global repository, including
943        the response that may need to be replayed, and return the response.
944        """
945        # Copy the assigned names into the return topology
946        embedding = [ ]
947        for n in starter.node:
948            embedding.append({ 
949                'toponame': n,
950                'physname': ["%s%s" %  (starter.node[n], self.domain)],
951                })
952        # Grab the log (this is some anal locking, but better safe than
953        # sorry)
954        self.state_lock.acquire()
955        logv = "".join(self.allocation[aid]['log'])
956        # It's possible that the StartSegment call gets retried (!).
957        # if the 'started' key is in the allocation, we'll return it rather
958        # than redo the setup.
959        self.allocation[aid]['started'] = { 
960                'allocID': alloc_id,
961                'allocationLog': logv,
962                'segmentdescription': { 
963                    'topdldescription': topo.clone().to_dict()
964                    },
965                'embedding': embedding
966                }
967        retval = copy.copy(self.allocation[aid]['started'])
968        self.write_state()
969        self.state_lock.release()
970        return retval
971   
972    # End of StartSegment support routines
973
974    def StartSegment(self, req, fid):
975        err = None  # Any service_error generated after tmpdir is created
976        rv = None   # Return value from segment creation
977
978        try:
979            req = req['StartSegmentRequestBody']
980            auth_attr = req['allocID']['fedid']
981            topref = req['segmentdescription']['topdldescription']
982        except KeyError:
983            raise service_error(server_error.req, "Badly formed request")
984
985        connInfo = req.get('connection', [])
986        services = req.get('service', [])
987        aid = "%s" % auth_attr
988        attrs = req.get('fedAttr', [])
989        if not self.auth.check_attribute(fid, auth_attr):
990            raise service_error(service_error.access, "Access denied")
991        else:
992            # See if this is a replay of an earlier succeeded StartSegment -
993            # sometimes SSL kills 'em.  If so, replay the response rather than
994            # redoing the allocation.
995            self.state_lock.acquire()
996            retval = self.allocation[aid].get('started', None)
997            self.state_lock.release()
998            if retval:
999                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1000                        "replaying response")
1001                return retval
1002
1003        # A new request.  Do it.
1004
1005        if topref: topo = topdl.Topology(**topref)
1006        else:
1007            raise service_error(service_error.req, 
1008                    "Request missing segmentdescription'")
1009       
1010        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1011        try:
1012            tmpdir = tempfile.mkdtemp(prefix="access-")
1013            softdir = "%s/software" % tmpdir
1014            os.mkdir(softdir)
1015        except EnvironmentError:
1016            raise service_error(service_error.internal, "Cannot create tmp dir")
1017
1018        # Try block alllows us to clean up temporary files.
1019        try:
1020            self.retrieve_software(topo, certfile, softdir)
1021            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1022                    self.initialize_experiment_info(attrs, aid, 
1023                            certfile, tmpdir)
1024
1025            # Set up userconf and seer if needed
1026            self.configure_userconf(services, tmpdir)
1027            self.configure_seer_services(services, topo, softdir)
1028            # Get and send synch store variables
1029            self.export_store_info(certfile, proj, ename, connInfo)
1030            self.import_store_info(certfile, connInfo)
1031
1032            expfile = "%s/experiment.tcl" % tmpdir
1033
1034            self.generate_portal_configs(topo, pubkey_base, 
1035                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1036            self.generate_ns2(topo, expfile, 
1037                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1038
1039            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1040                    debug=self.create_debug, log=alloc_log)
1041            rv = starter(self, ename, proj, user, expfile, tmpdir)
1042        except service_error, e:
1043            err = e
1044        except:
1045            t, v, st = sys.exc_info()
1046            err = service_error(service_error.internal, "%s: %s" % \
1047                    (v, traceback.extract_tb(st)))
1048
1049        # Walk up tmpdir, deleting as we go
1050        if self.cleanup: self.remove_dirs(tmpdir)
1051        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1052
1053        if rv:
1054            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1055        elif err:
1056            raise service_error(service_error.federant,
1057                    "Swapin failed: %s" % err)
1058        else:
1059            raise service_error(service_error.federant, "Swapin failed")
1060
1061    def TerminateSegment(self, req, fid):
1062        try:
1063            req = req['TerminateSegmentRequestBody']
1064        except KeyError:
1065            raise service_error(server_error.req, "Badly formed request")
1066
1067        auth_attr = req['allocID']['fedid']
1068        aid = "%s" % auth_attr
1069        attrs = req.get('fedAttr', [])
1070        if not self.auth.check_attribute(fid, auth_attr):
1071            raise service_error(service_error.access, "Access denied")
1072
1073        self.state_lock.acquire()
1074        if aid in self.allocation:
1075            proj = self.allocation[aid].get('project', None)
1076            if not proj: 
1077                proj = self.allocation[aid].get('sproject', None)
1078            user = self.allocation[aid].get('user', None)
1079            ename = self.allocation[aid].get('experiment', None)
1080        else:
1081            proj = None
1082            user = None
1083            ename = None
1084        self.state_lock.release()
1085
1086        if not proj:
1087            raise service_error(service_error.internal, 
1088                    "Can't find project for %s" % aid)
1089
1090        if not user:
1091            raise service_error(service_error.internal, 
1092                    "Can't find creation user for %s" % aid)
1093        if not ename:
1094            raise service_error(service_error.internal, 
1095                    "Can't find experiment name for %s" % aid)
1096        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1097                debug=self.create_debug)
1098        stopper(self, user, proj, ename)
1099        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.