source: fedd/federation/emulab_access.py @ 814b5e5

axis_examplecompt_changesinfo-ops
Last change on this file since 814b5e5 was 814b5e5, checked in by Ted Faber <faber@…>, 13 years ago

Merge fixes from stable branch

  • Property mode set to 100644
File size: 36.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.ops = config.get("access", "ops")
76        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
77        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
78
79        self.dragon_endpoint = config.get("access", "dragon")
80        self.dragon_vlans = config.get("access", "dragon_vlans")
81        self.deter_internal = config.get("access", "deter_internal")
82
83        self.tunnel_config = config.getboolean("access", "tunnel_config")
84        self.portal_command = config.get("access", "portal_command")
85        self.portal_image = config.get("access", "portal_image")
86        self.portal_type = config.get("access", "portal_type") or "pc"
87        self.portal_startcommand = config.get("access", "portal_startcommand")
88        self.node_startcommand = config.get("access", "node_startcommand")
89
90        self.federation_software = self.software_list(self.federation_software)
91        self.portal_software = self.software_list(self.portal_software)
92        self.local_seer_software = self.software_list(self.local_seer_software)
93
94        self.access_type = self.access_type.lower()
95        if self.access_type == 'remote_emulab':
96            self.start_segment = proxy_emulab_segment.start_segment
97            self.stop_segment = proxy_emulab_segment.stop_segment
98        elif self.access_type == 'local_emulab':
99            self.start_segment = local_emulab_segment.start_segment
100            self.stop_segment = local_emulab_segment.stop_segment
101        else:
102            self.start_segment = None
103            self.stop_segment = None
104
105        self.restricted = [ ]
106        self.access = { }
107        if config.has_option("access", "accessdb"):
108            self.read_access(config.get("access", "accessdb"))
109        tb = config.get('access', 'testbed')
110        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
111        else: self.testbed = [ ]
112
113        if config.has_option("access", "accessdb"):
114            self.read_access(config.get("access", "accessdb"), 
115                    self.make_access_project)
116
117        # read_state in the base_class
118        self.state_lock.acquire()
119        for a  in ('allocation', 'projects', 'keys', 'types'):
120            if a not in self.state:
121                self.state[a] = { }
122        self.allocation = self.state['allocation']
123        self.projects = self.state['projects']
124        self.keys = self.state['keys']
125        self.types = self.state['types']
126        # Add the ownership attributes to the authorizer.  Note that the
127        # indices of the allocation dict are strings, but the attributes are
128        # fedids, so there is a conversion.
129        for k in self.allocation.keys():
130            for o in self.allocation[k].get('owners', []):
131                self.auth.set_attribute(o, fedid(hexstr=k))
132            if self.allocation[k].has_key('userconfig'):
133                sfid = self.allocation[k]['userconfig']
134                fid = fedid(hexstr=sfid)
135                self.auth.set_attribute(fid, "/%s" % sfid)
136        self.state_lock.release()
137        self.exports = {
138                'SMB': self.export_SMB,
139                'seer': self.export_seer,
140                'tmcd': self.export_tmcd,
141                'userconfig': self.export_userconfig,
142                'project_export': self.export_project_export,
143                'local_seer_control': self.export_local_seer,
144                'seer_master': self.export_seer_master,
145                'hide_hosts': self.export_hide_hosts,
146                }
147
148        if not self.local_seer_image or not self.local_seer_software or \
149                not self.local_seer_start:
150            if 'local_seer_control' in self.exports:
151                del self.exports['local_seer_control']
152
153        if not self.local_seer_image or not self.local_seer_software or \
154                not self.seer_master_start:
155            if 'seer_master' in self.exports:
156                del self.exports['seer_master']
157
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment",
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176        self.call_SetValue = service_caller('SetValue')
177        self.call_GetValue = service_caller('GetValue', log=self.log)
178
179        if not config.has_option("allocate", "uri"):
180            self.allocate_project = \
181                allocate_project_local(config, auth)
182        else:
183            self.allocate_project = \
184                allocate_project_remote(config, auth)
185
186
187        # If the project allocator exports services, put them in this object's
188        # maps so that classes that instantiate this can call the services.
189        self.soap_services.update(self.allocate_project.soap_services)
190        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
191
192    @staticmethod
193    def make_access_project(str):
194        """
195        Convert a string of the form (id[:resources:resouces], id, id) into an
196        access_project.  This is called by read_access to convert to local
197        attributes.  It returns a tuple of the form (project, user, user) where
198        users may be names or fedids.
199        """
200        def parse_name(n):
201            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
202            else: return n
203
204        str = str.strip()
205        if str.startswith('(') and str.endswith(')'):
206            str = str[1:-1]
207            names = [ s.strip() for s in str.split(",")]
208            if len(names) > 3:
209                raise self.parse_error("More than three fields in name")
210            first = names[0].split(":")
211            if first == 'fedid:':
212                del first[0]
213                first[0] = fedid(hexstr=first[0])
214            names[0] = access_project(first[0], first[1:])
215
216            for i in range(1,2):
217                names[i] = parse_name(names[i])
218
219            return tuple(names)
220        else:
221            raise self.parse_error('Bad mapping (unbalanced parens)')
222
223
224    # RequestAccess support routines
225
226    def lookup_access(self, req, fid):
227        """
228        Look up the local access control information mapped to this fedid and
229        credentials.  In this case it is a (project, create_user, access_user)
230        triple, and a triple of three booleans indicating which, if any will
231        need to be dynamically created.  Finally a list of owners for that
232        allocation is returned.
233
234        lookup_access_base pulls the first triple out, and it is parsed by this
235        routine into the boolean map.  Owners is always the controlling fedid.
236        """
237        # Return values
238        rp = access_project(None, ())
239        ru = None
240        # This maps a valid user to the Emulab projects and users to use
241        found, match = self.lookup_access_base(req, fid)
242        tb, project, user = match
243       
244        if found == None:
245            raise service_error(service_error.access,
246                    "Access denied - cannot map access")
247
248        # resolve <dynamic> and <same> in found
249        dyn_proj = False
250        dyn_create_user = False
251        dyn_service_user = False
252
253        if found[0].name == "<same>":
254            if project != None:
255                rp.name = project
256            else : 
257                raise service_error(\
258                        service_error.server_config,
259                        "Project matched <same> when no project given")
260        elif found[0].name == "<dynamic>":
261            rp.name = None
262            dyn_proj = True
263        else:
264            rp.name = found[0].name
265        rp.node_types = found[0].node_types;
266
267        if found[1] == "<same>":
268            if user_match == "<any>":
269                if user != None: rcu = user[0]
270                else: raise service_error(\
271                        service_error.server_config,
272                        "Matched <same> on anonymous request")
273            else:
274                rcu = user_match
275        elif found[1] == "<dynamic>":
276            rcu = None
277            dyn_create_user = True
278        else:
279            rcu = found[1]
280       
281        if found[2] == "<same>":
282            if user_match == "<any>":
283                if user != None: rsu = user[0]
284                else: raise service_error(\
285                        service_error.server_config,
286                        "Matched <same> on anonymous request")
287            else:
288                rsu = user_match
289        elif found[2] == "<dynamic>":
290            rsu = None
291            dyn_service_user = True
292        else:
293            rsu = found[2]
294
295        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
296                [ fid ]
297
298    def do_project_allocation(self, dyn, project, user):
299        """
300        Call the project allocation routines and return the info.
301        """
302        if dyn: 
303            # Compose the dynamic project request
304            # (only dynamic, dynamic currently allowed)
305            preq = { 'AllocateProjectRequestBody': \
306                        { 'project' : {\
307                            'user': [ \
308                            { \
309                                'access': [ { 
310                                    'sshPubkey': self.ssh_pubkey_file } ],
311                                 'role': "serviceAccess",\
312                            }, \
313                            { \
314                                'access': [ { 
315                                    'sshPubkey': self.ssh_pubkey_file } ],
316                                 'role': "experimentCreation",\
317                            }, \
318                            ], \
319                            }\
320                        }\
321                    }
322            return self.allocate_project.dynamic_project(preq)
323        else:
324            preq = {'StaticProjectRequestBody' : \
325                    { 'project': \
326                        { 'name' : { 'localname' : project },\
327                          'user' : [ \
328                            {\
329                                'userID': { 'localname' : user }, \
330                                'access': [ { 
331                                    'sshPubkey': self.ssh_pubkey_file } ],
332                                'role': 'experimentCreation'\
333                            },\
334                            {\
335                                'userID': { 'localname' : user}, \
336                                'access': [ { 
337                                    'sshPubkey': self.ssh_pubkey_file } ],
338                                'role': 'serviceAccess'\
339                            },\
340                        ]}\
341                    }\
342            }
343            return self.allocate_project.static_project(preq)
344
345    def save_project_state(self, aid, ap, dyn, owners):
346        """
347        Parse out and save the information relevant to the project created for
348        this experiment.  That info is largely in ap and owners.  dyn indicates
349        that the project was created dynamically.  Return the user and project
350        names.
351        """
352        self.state_lock.acquire()
353        self.allocation[aid] = { }
354        try:
355            pname = ap['project']['name']['localname']
356        except KeyError:
357            pname = None
358
359        if dyn:
360            if not pname:
361                self.state_lock.release()
362                raise service_error(service_error.internal,
363                        "Misformed allocation response?")
364            if pname in self.projects: self.projects[pname] += 1
365            else: self.projects[pname] = 1
366            self.allocation[aid]['project'] = pname
367        else:
368            # sproject is a static project associated with this allocation.
369            self.allocation[aid]['sproject'] = pname
370
371        self.allocation[aid]['keys'] = [ ]
372
373        try:
374            for u in ap['project']['user']:
375                uname = u['userID']['localname']
376                if u['role'] == 'experimentCreation':
377                    self.allocation[aid]['user'] = uname
378                for k in [ k['sshPubkey'] for k in u['access'] \
379                        if k.has_key('sshPubkey') ]:
380                    kv = "%s:%s" % (uname, k)
381                    if self.keys.has_key(kv): self.keys[kv] += 1
382                    else: self.keys[kv] = 1
383                    self.allocation[aid]['keys'].append((uname, k))
384        except KeyError:
385            self.state_lock.release()
386            raise service_error(service_error.internal,
387                    "Misformed allocation response?")
388
389        self.allocation[aid]['owners'] = owners
390        self.write_state()
391        self.state_lock.release()
392        return (pname, uname)
393
394    # End of RequestAccess support routines
395
396    def RequestAccess(self, req, fid):
397        """
398        Handle the access request.  Proxy if not for us.
399
400        Parse out the fields and make the allocations or rejections if for us,
401        otherwise, assuming we're willing to proxy, proxy the request out.
402        """
403
404        def gateway_hardware(h):
405            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
406            else: return h
407
408        def get_export_project(svcs):
409            """
410            if the service requests includes one to export a project, return
411            that project.
412            """
413            rv = None
414            for s in svcs:
415                if s.get('name', '') == 'project_export' and \
416                        s.get('visibility', '') == 'export':
417                    if not rv: 
418                        for a in s.get('feddAttr', []):
419                            if a.get('attribute', '') == 'project' \
420                                    and 'value' in a:
421                                rv = a['value']
422                    else:
423                        raise service_error(service_error, access, 
424                                'Requesting multiple project exports is ' + \
425                                        'not supported');
426            return rv
427
428        # The dance to get into the request body
429        if req.has_key('RequestAccessRequestBody'):
430            req = req['RequestAccessRequestBody']
431        else:
432            raise service_error(service_error.req, "No request!?")
433
434
435        found, dyn, owners = self.lookup_access(req, fid)
436        ap = None
437
438        # if this includes a project export request and the exported
439        # project is not the access project, access denied.
440        if 'service' in req:
441            ep = get_export_project(req['service'])
442            if ep and ep != found[0].name:
443                raise service_error(service_error.access,
444                        "Cannot export %s" % ep)
445
446        if self.ssh_pubkey_file:
447            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
448        else:
449            raise service_error(service_error.internal, 
450                    "SSH access parameters required")
451        # keep track of what's been added
452        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
453        aid = unicode(allocID)
454
455        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
456
457        services, svc_state = self.export_services(req.get('service',[]),
458                pname, uname)
459        self.state_lock.acquire()
460        # Store services state in global state
461        for k, v in svc_state.items():
462            self.allocation[aid][k] = v
463        self.write_state()
464        self.state_lock.release()
465        # Give the owners the right to change this allocation
466        for o in owners:
467            self.auth.set_attribute(o, allocID)
468        try:
469            f = open("%s/%s.pem" % (self.certdir, aid), "w")
470            print >>f, alloc_cert
471            f.close()
472        except EnvironmentError, e:
473            raise service_error(service_error.internal, 
474                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
475        resp = self.build_access_response({ 'fedid': allocID } ,
476                ap, services)
477        return resp
478
479    def do_release_project(self, del_project, del_users, del_types):
480        """
481        If a project and users has to be deleted, make the call.
482        """
483        msg = { 'project': { }}
484        if del_project:
485            msg['project']['name']= {'localname': del_project}
486        users = [ ]
487        for u in del_users.keys():
488            users.append({ 'userID': { 'localname': u },\
489                'access' :  \
490                        [ {'sshPubkey' : s } for s in del_users[u]]\
491            })
492        if users: 
493            msg['project']['user'] = users
494        if len(del_types) > 0:
495            msg['resources'] = { 'node': \
496                    [ {'hardware': [ h ] } for h in del_types ]\
497                }
498        if self.allocate_project.release_project:
499            msg = { 'ReleaseProjectRequestBody' : msg}
500            self.allocate_project.release_project(msg)
501
502    def ReleaseAccess(self, req, fid):
503        # The dance to get into the request body
504        if req.has_key('ReleaseAccessRequestBody'):
505            req = req['ReleaseAccessRequestBody']
506        else:
507            raise service_error(service_error.req, "No request!?")
508
509        try:
510            if req['allocID'].has_key('localname'):
511                auth_attr = aid = req['allocID']['localname']
512            elif req['allocID'].has_key('fedid'):
513                aid = unicode(req['allocID']['fedid'])
514                auth_attr = req['allocID']['fedid']
515            else:
516                raise service_error(service_error.req,
517                        "Only localnames and fedids are understood")
518        except KeyError:
519            raise service_error(service_error.req, "Badly formed request")
520
521        self.log.debug("[access] deallocation requested for %s", aid)
522        if not self.auth.check_attribute(fid, auth_attr):
523            self.log.debug("[access] deallocation denied for %s", aid)
524            raise service_error(service_error.access, "Access Denied")
525
526        # If we know this allocation, reduce the reference counts and
527        # remove the local allocations.  Otherwise report an error.  If
528        # there is an allocation to delete, del_users will be a dictonary
529        # of sets where the key is the user that owns the keys in the set.
530        # We use a set to avoid duplicates.  del_project is just the name
531        # of any dynamic project to delete.  We're somewhat lazy about
532        # deleting authorization attributes.  Having access to something
533        # that doesn't exist isn't harmful.
534        del_users = { }
535        del_project = None
536        del_types = set()
537
538        self.state_lock.acquire()
539        if aid in self.allocation:
540            self.log.debug("Found allocation for %s" %aid)
541            for k in self.allocation[aid]['keys']:
542                kk = "%s:%s" % k
543                self.keys[kk] -= 1
544                if self.keys[kk] == 0:
545                    if not del_users.has_key(k[0]):
546                        del_users[k[0]] = set()
547                    del_users[k[0]].add(k[1])
548                    del self.keys[kk]
549
550            if 'project' in self.allocation[aid]:
551                pname = self.allocation[aid]['project']
552                self.projects[pname] -= 1
553                if self.projects[pname] == 0:
554                    del_project = pname
555                    del self.projects[pname]
556
557            if 'types' in self.allocation[aid]:
558                for t in self.allocation[aid]['types']:
559                    self.types[t] -= 1
560                    if self.types[t] == 0:
561                        if not del_project: del_project = t[0]
562                        del_types.add(t[1])
563                        del self.types[t]
564
565            del self.allocation[aid]
566            self.write_state()
567            self.state_lock.release()
568            # If we actually have resources to deallocate, prepare the call.
569            if del_project or del_users:
570                self.do_release_project(del_project, del_users, del_types)
571            # And remove the access cert
572            cf = "%s/%s.pem" % (self.certdir, aid)
573            self.log.debug("Removing %s" % cf)
574            os.remove(cf)
575            return { 'allocID': req['allocID'] } 
576        else:
577            self.state_lock.release()
578            raise service_error(service_error.req, "No such allocation")
579
580    # These are subroutines for StartSegment
581    def generate_ns2(self, topo, expfn, softdir, connInfo):
582        """
583        Convert topo into an ns2 file, decorated with appropriate commands for
584        the particular testbed setup.  Convert all requests for software, etc
585        to point at the staged copies on this testbed and add the federation
586        startcommands.
587        """
588        class dragon_commands:
589            """
590            Functor to spit out approrpiate dragon commands for nodes listed in
591            the connectivity description.  The constructor makes a dict mapping
592            dragon nodes to their parameters and the __call__ checks each
593            element in turn for membership.
594            """
595            def __init__(self, map):
596                self.node_info = map
597
598            def __call__(self, e):
599                s = ""
600                if isinstance(e, topdl.Computer):
601                    if self.node_info.has_key(e.name):
602                        info = self.node_info[e.name]
603                        for ifname, vlan, type in info:
604                            for i in e.interface:
605                                if i.name == ifname:
606                                    addr = i.get_attribute('ip4_address')
607                                    subs = i.substrate[0]
608                                    break
609                            else:
610                                raise service_error(service_error.internal,
611                                        "No interface %s on element %s" % \
612                                                (ifname, e.name))
613                            # XXX: do netmask right
614                            if type =='link':
615                                s = ("tb-allow-external ${%s} " + \
616                                        "dragonportal ip %s vlan %s " + \
617                                        "netmask 255.255.255.0\n") % \
618                                        (topdl.to_tcl_name(e.name), addr, vlan)
619                            elif type =='lan':
620                                s = ("tb-allow-external ${%s} " + \
621                                        "dragonportal " + \
622                                        "ip %s vlan %s usurp %s\n") % \
623                                        (topdl.to_tcl_name(e.name), addr, 
624                                                vlan, subs)
625                            else:
626                                raise service_error(service_error_internal,
627                                        "Unknown DRAGON type %s" % type)
628                return s
629
630        class not_dragon:
631            """
632            Return true if a node is in the given map of dragon nodes.
633            """
634            def __init__(self, map):
635                self.nodes = set(map.keys())
636
637            def __call__(self, e):
638                return e.name not in self.nodes
639
640        # Main line of generate_ns2
641        t = topo.clone()
642
643        # Create the map of nodes that need direct connections (dragon
644        # connections) from the connInfo
645        dragon_map = { }
646        for i in [ i for i in connInfo if i['type'] == 'transit']:
647            for a in i.get('fedAttr', []):
648                if a['attribute'] == 'vlan_id':
649                    vlan = a['value']
650                    break
651            else:
652                raise service_error(service_error.internal, "No vlan tag")
653            members = i.get('member', [])
654            if len(members) > 1: type = 'lan'
655            else: type = 'link'
656
657            try:
658                for m in members:
659                    if m['element'] in dragon_map:
660                        dragon_map[m['element']].append(( m['interface'], 
661                            vlan, type))
662                    else:
663                        dragon_map[m['element']] = [( m['interface'], 
664                            vlan, type),]
665            except KeyError:
666                raise service_error(service_error.req,
667                        "Missing connectivity info")
668
669        # Weed out the things we aren't going to instantiate: Segments, portal
670        # substrates, and portal interfaces.  (The copy in the for loop allows
671        # us to delete from e.elements in side the for loop).  While we're
672        # touching all the elements, we also adjust paths from the original
673        # testbed to local testbed paths and put the federation commands into
674        # the start commands
675        for e in [e for e in t.elements]:
676            if isinstance(e, topdl.Segment):
677                t.elements.remove(e)
678            if isinstance(e, topdl.Computer):
679                self.add_kit(e, self.federation_software)
680                if e.get_attribute('portal') and self.portal_startcommand:
681                    # Add local portal support software
682                    self.add_kit(e, self.portal_software)
683                    # Portals never have a user-specified start command
684                    e.set_attribute('startup', self.portal_startcommand)
685                elif self.node_startcommand:
686                    if e.get_attribute('startup'):
687                        e.set_attribute('startup', "%s \\$USER '%s'" % \
688                                (self.node_startcommand, 
689                                    e.get_attribute('startup')))
690                    else:
691                        e.set_attribute('startup', self.node_startcommand)
692
693                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
694                # Remove portal interfaces that do not connect to DRAGON
695                e.interface = [i for i in e.interface \
696                        if not i.get_attribute('portal') or i.name in dinf ]
697            # Fix software paths
698            for s in getattr(e, 'software', []):
699                s.location = re.sub("^.*/", softdir, s.location)
700
701        t.substrates = [ s.clone() for s in t.substrates ]
702        t.incorporate_elements()
703
704        # Customize the ns2 output for local portal commands and images
705        filters = []
706
707        if self.dragon_endpoint:
708            add_filter = not_dragon(dragon_map)
709            filters.append(dragon_commands(dragon_map))
710        else:
711            add_filter = None
712
713        if self.portal_command:
714            filters.append(topdl.generate_portal_command_filter(
715                self.portal_command, add_filter=add_filter))
716
717        if self.portal_image:
718            filters.append(topdl.generate_portal_image_filter(
719                self.portal_image))
720
721        if self.portal_type:
722            filters.append(topdl.generate_portal_hardware_filter(
723                self.portal_type))
724
725        # Convert to ns and write it out
726        expfile = topdl.topology_to_ns2(t, filters)
727        try:
728            f = open(expfn, "w")
729            print >>f, expfile
730            f.close()
731        except EnvironmentError:
732            raise service_error(service_error.internal,
733                    "Cannot write experiment file %s: %s" % (expfn,e))
734
735    def export_store_info(self, cf, proj, ename, connInfo):
736        """
737        For the export requests in the connection info, install the peer names
738        at the experiment controller via SetValue calls.
739        """
740
741        for c in connInfo:
742            for p in [ p for p in c.get('parameter', []) \
743                    if p.get('type', '') == 'output']:
744
745                if p.get('name', '') == 'peer':
746                    k = p.get('key', None)
747                    surl = p.get('store', None)
748                    if surl and k and k.index('/') != -1:
749                        value = "%s.%s.%s%s" % \
750                                (k[k.index('/')+1:], ename, proj, self.domain)
751                        req = { 'name': k, 'value': value }
752                        self.log.debug("Setting %s to %s on %s" % \
753                                (k, value, surl))
754                        self.call_SetValue(surl, req, cf)
755                    else:
756                        self.log.error("Bad export request: %s" % p)
757                elif p.get('name', '') == 'ssh_port':
758                    k = p.get('key', None)
759                    surl = p.get('store', None)
760                    if surl and k:
761                        req = { 'name': k, 'value': self.ssh_port }
762                        self.log.debug("Setting %s to %s on %s" % \
763                                (k, self.ssh_port, surl))
764                        self.call_SetValue(surl, req, cf)
765                    else:
766                        self.log.error("Bad export request: %s" % p)
767                else:
768                    self.log.error("Unknown export parameter: %s" % \
769                            p.get('name'))
770                    continue
771
772    def add_seer_node(self, topo, name, startup):
773        """
774        Add a seer node to the given topology, with the startup command passed
775        in.  Used by configure seer_services.
776        """
777        c_node = topdl.Computer(
778                name=name, 
779                os= topdl.OperatingSystem(
780                    attribute=[
781                    { 'attribute': 'osid', 
782                        'value': self.local_seer_image },
783                    ]),
784                attribute=[
785                    { 'attribute': 'startup', 'value': startup },
786                    ]
787                )
788        self.add_kit(c_node, self.local_seer_software)
789        topo.elements.append(c_node)
790
791    def configure_seer_services(self, services, topo, softdir):
792        """
793        Make changes to the topology required for the seer requests being made.
794        Specifically, add any control or master nodes required and set up the
795        start commands on the nodes to interconnect them.
796        """
797        local_seer = False      # True if we need to add a control node
798        collect_seer = False    # True if there is a seer-master node
799        seer_master= False      # True if we need to add the seer-master
800        for s in services:
801            s_name = s.get('name', '')
802            s_vis = s.get('visibility','')
803
804            if s_name  == 'local_seer_control' and s_vis == 'export':
805                local_seer = True
806            elif s_name == 'seer_master':
807                if s_vis == 'import':
808                    collect_seer = True
809                elif s_vis == 'export':
810                    seer_master = True
811       
812        # We've got the whole picture now, so add nodes if needed and configure
813        # them to interconnect properly.
814        if local_seer or seer_master:
815            # Copy local seer control node software to the tempdir
816            for l, f in self.local_seer_software:
817                base = os.path.basename(f)
818                copy_file(f, "%s/%s" % (softdir, base))
819        # If we're collecting seers somewhere the controllers need to talk to
820        # the master.  In testbeds that export the service, that will be a
821        # local node that we'll add below.  Elsewhere it will be the control
822        # portal that will port forward to the exporting master.
823        if local_seer:
824            if collect_seer:
825                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
826            else:
827                startup = self.local_seer_start
828            self.add_seer_node(topo, 'control', startup)
829        # If this is the seer master, add that node, too.
830        if seer_master:
831            self.add_seer_node(topo, 'seer-master', 
832                    "%s -R -n -R seer-master -R -A -R sink" % \
833                            self.seer_master_start)
834
835    def retrieve_software(self, topo, certfile, softdir):
836        """
837        Collect the software that nodes in the topology need loaded and stage
838        it locally.  This implies retrieving it from the experiment_controller
839        and placing it into softdir.  Certfile is used to prove that this node
840        has access to that data (it's the allocation/segment fedid).  Finally
841        local portal and federation software is also copied to the same staging
842        directory for simplicity - all software needed for experiment creation
843        is in softdir.
844        """
845        sw = set()
846        for e in topo.elements:
847            for s in getattr(e, 'software', []):
848                sw.add(s.location)
849        for s in sw:
850            self.log.debug("Retrieving %s" % s)
851            try:
852                get_url(s, certfile, softdir)
853            except:
854                t, v, st = sys.exc_info()
855                raise service_error(service_error.internal,
856                        "Error retrieving %s: %s" % (s, v))
857
858        # Copy local federation and portal node software to the tempdir
859        for s in (self.federation_software, self.portal_software):
860            for l, f in s:
861                base = os.path.basename(f)
862                copy_file(f, "%s/%s" % (softdir, base))
863
864
865    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
866        """
867        Gather common configuration files, retrieve or create an experiment
868        name and project name, and return the ssh_key filenames.  Create an
869        allocation log bound to the state log variable as well.
870        """
871        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
872        ename = None
873        pubkey_base = None
874        secretkey_base = None
875        proj = None
876        user = None
877        alloc_log = None
878
879        for a in attrs:
880            if a['attribute'] in configs:
881                try:
882                    self.log.debug("Retrieving %s from %s" % \
883                            (a['attribute'], a['value']))
884                    get_url(a['value'], certfile, tmpdir)
885                except:
886                    t, v, st = sys.exc_info()
887                    raise service_error(service_error.internal,
888                            "Error retrieving %s: %s" % (a.get('value', ""), v))
889            if a['attribute'] == 'ssh_pubkey':
890                pubkey_base = a['value'].rpartition('/')[2]
891            if a['attribute'] == 'ssh_secretkey':
892                secretkey_base = a['value'].rpartition('/')[2]
893            if a['attribute'] == 'experiment_name':
894                ename = a['value']
895
896        if not ename:
897            ename = ""
898            for i in range(0,5):
899                ename += random.choice(string.ascii_letters)
900            self.log.warn("No experiment name: picked one randomly: %s" \
901                    % ename)
902
903        if not pubkey_base:
904            raise service_error(service_error.req, 
905                    "No public key attribute")
906
907        if not secretkey_base:
908            raise service_error(service_error.req, 
909                    "No secret key attribute")
910
911        self.state_lock.acquire()
912        if aid in self.allocation:
913            proj = self.allocation[aid].get('project', None)
914            if not proj: 
915                proj = self.allocation[aid].get('sproject', None)
916            user = self.allocation[aid].get('user', None)
917            self.allocation[aid]['experiment'] = ename
918            self.allocation[aid]['log'] = [ ]
919            # Create a logger that logs to the experiment's state object as
920            # well as to the main log file.
921            alloc_log = logging.getLogger('fedd.access.%s' % ename)
922            h = logging.StreamHandler(
923                    list_log.list_log(self.allocation[aid]['log']))
924            # XXX: there should be a global one of these rather than
925            # repeating the code.
926            h.setFormatter(logging.Formatter(
927                "%(asctime)s %(name)s %(message)s",
928                        '%d %b %y %H:%M:%S'))
929            alloc_log.addHandler(h)
930            self.write_state()
931        self.state_lock.release()
932
933        if not proj:
934            raise service_error(service_error.internal, 
935                    "Can't find project for %s" %aid)
936
937        if not user:
938            raise service_error(service_error.internal, 
939                    "Can't find creation user for %s" %aid)
940
941        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
942
943    def finalize_experiment(self, starter, topo, aid, alloc_id):
944        """
945        Store key bits of experiment state in the global repository, including
946        the response that may need to be replayed, and return the response.
947        """
948        # Copy the assigned names into the return topology
949        embedding = [ ]
950        for n in starter.node:
951            embedding.append({ 
952                'toponame': n,
953                'physname': ["%s%s" %  (starter.node[n], self.domain)],
954                })
955        # Grab the log (this is some anal locking, but better safe than
956        # sorry)
957        self.state_lock.acquire()
958        logv = "".join(self.allocation[aid]['log'])
959        # It's possible that the StartSegment call gets retried (!).
960        # if the 'started' key is in the allocation, we'll return it rather
961        # than redo the setup.
962        self.allocation[aid]['started'] = { 
963                'allocID': alloc_id,
964                'allocationLog': logv,
965                'segmentdescription': { 
966                    'topdldescription': topo.clone().to_dict()
967                    },
968                'embedding': embedding
969                }
970        retval = copy.copy(self.allocation[aid]['started'])
971        self.write_state()
972        self.state_lock.release()
973        return retval
974   
975    # End of StartSegment support routines
976
977    def StartSegment(self, req, fid):
978        err = None  # Any service_error generated after tmpdir is created
979        rv = None   # Return value from segment creation
980
981        try:
982            req = req['StartSegmentRequestBody']
983            auth_attr = req['allocID']['fedid']
984            topref = req['segmentdescription']['topdldescription']
985        except KeyError:
986            raise service_error(server_error.req, "Badly formed request")
987
988        connInfo = req.get('connection', [])
989        services = req.get('service', [])
990        aid = "%s" % auth_attr
991        attrs = req.get('fedAttr', [])
992        if not self.auth.check_attribute(fid, auth_attr):
993            raise service_error(service_error.access, "Access denied")
994        else:
995            # See if this is a replay of an earlier succeeded StartSegment -
996            # sometimes SSL kills 'em.  If so, replay the response rather than
997            # redoing the allocation.
998            self.state_lock.acquire()
999            retval = self.allocation[aid].get('started', None)
1000            self.state_lock.release()
1001            if retval:
1002                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1003                        "replaying response")
1004                return retval
1005
1006        # A new request.  Do it.
1007
1008        if topref: topo = topdl.Topology(**topref)
1009        else:
1010            raise service_error(service_error.req, 
1011                    "Request missing segmentdescription'")
1012       
1013        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1014        try:
1015            tmpdir = tempfile.mkdtemp(prefix="access-")
1016            softdir = "%s/software" % tmpdir
1017            os.mkdir(softdir)
1018        except EnvironmentError:
1019            raise service_error(service_error.internal, "Cannot create tmp dir")
1020
1021        # Try block alllows us to clean up temporary files.
1022        try:
1023            self.retrieve_software(topo, certfile, softdir)
1024            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1025                    self.initialize_experiment_info(attrs, aid, 
1026                            certfile, tmpdir)
1027
1028            # Set up userconf and seer if needed
1029            self.configure_userconf(services, tmpdir)
1030            self.configure_seer_services(services, topo, softdir)
1031            # Get and send synch store variables
1032            self.export_store_info(certfile, proj, ename, connInfo)
1033            self.import_store_info(certfile, connInfo)
1034
1035            expfile = "%s/experiment.tcl" % tmpdir
1036
1037            self.generate_portal_configs(topo, pubkey_base, 
1038                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1039            self.generate_ns2(topo, expfile, 
1040                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1041
1042            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1043                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1044                    cert=self.xmlrpc_cert)
1045            rv = starter(self, ename, proj, user, expfile, tmpdir)
1046        except service_error, e:
1047            err = e
1048        except:
1049            t, v, st = sys.exc_info()
1050            err = service_error(service_error.internal, "%s: %s" % \
1051                    (v, traceback.extract_tb(st)))
1052
1053        # Walk up tmpdir, deleting as we go
1054        if self.cleanup: self.remove_dirs(tmpdir)
1055        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1056
1057        if rv:
1058            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1059        elif err:
1060            raise service_error(service_error.federant,
1061                    "Swapin failed: %s" % err)
1062        else:
1063            raise service_error(service_error.federant, "Swapin failed")
1064
1065    def TerminateSegment(self, req, fid):
1066        try:
1067            req = req['TerminateSegmentRequestBody']
1068        except KeyError:
1069            raise service_error(server_error.req, "Badly formed request")
1070
1071        auth_attr = req['allocID']['fedid']
1072        aid = "%s" % auth_attr
1073        attrs = req.get('fedAttr', [])
1074        if not self.auth.check_attribute(fid, auth_attr):
1075            raise service_error(service_error.access, "Access denied")
1076
1077        self.state_lock.acquire()
1078        if aid in self.allocation:
1079            proj = self.allocation[aid].get('project', None)
1080            if not proj: 
1081                proj = self.allocation[aid].get('sproject', None)
1082            user = self.allocation[aid].get('user', None)
1083            ename = self.allocation[aid].get('experiment', None)
1084        else:
1085            proj = None
1086            user = None
1087            ename = None
1088        self.state_lock.release()
1089
1090        if not proj:
1091            raise service_error(service_error.internal, 
1092                    "Can't find project for %s" % aid)
1093
1094        if not user:
1095            raise service_error(service_error.internal, 
1096                    "Can't find creation user for %s" % aid)
1097        if not ename:
1098            raise service_error(service_error.internal, 
1099                    "Can't find experiment name for %s" % aid)
1100        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1101                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1102        stopper(self, user, proj, ename)
1103        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.