source: fedd/federation/emulab_access.py @ 5b19c3a

version-3.01
Last change on this file since 5b19c3a was 5b19c3a, checked in by Ted Faber <faber@…>, 13 years ago

This rolls back the 3.01 branch to the original 3.01 state. I erroneously kept the changes between 3.01 and 3.02 in this branch. With the new branch created, this restores the 3.01 tag's correctness.

  • Property mode set to 100644
File size: 36.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12import traceback
13
14from threading import *
15from M2Crypto.SSL import SSLError
16
17from access import access_base
18
19from util import *
20from allocate_project import allocate_project_local, allocate_project_remote
21from access_project import access_project
22from fedid import fedid, generate_fedid
23from authorizer import authorizer
24from service_error import service_error
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26
27import httplib
28import tempfile
29from urlparse import urlparse
30
31import topdl
32import list_log
33import proxy_emulab_segment
34import local_emulab_segment
35
36
37# Make log messages disappear if noone configures a fedd logger
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.access")
42fl.addHandler(nullHandler())
43
44class access(access_base):
45    """
46    The implementation of access control based on mapping users to projects.
47
48    Users can be mapped to existing projects or have projects created
49    dynamically.  This implements both direct requests and proxies.
50    """
51
52    def __init__(self, config=None, auth=None):
53        """
54        Initializer.  Pulls parameters out of the ConfigParser's access section.
55        """
56
57        access_base.__init__(self, config, auth)
58
59        self.allow_proxy = config.getboolean("access", "allow_proxy")
60
61        self.domain = config.get("access", "domain")
62        self.userconfdir = config.get("access","userconfdir")
63        self.userconfcmd = config.get("access","userconfcmd")
64        self.userconfurl = config.get("access","userconfurl")
65        self.federation_software = config.get("access", "federation_software")
66        self.portal_software = config.get("access", "portal_software")
67        self.local_seer_software = config.get("access", "local_seer_software")
68        self.local_seer_image = config.get("access", "local_seer_image")
69        self.local_seer_start = config.get("access", "local_seer_start")
70        self.seer_master_start = config.get("access", "seer_master_start")
71        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
72        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
73        self.ssh_port = config.get("access","ssh_port") or "22"
74        self.boss = config.get("access", "boss")
75        self.xmlrpc_cert = config.get("access", "xmlrpc_cert")
76        self.xmlrpc_certpw = config.get("access", "xmlrpc_certpw")
77
78        self.dragon_endpoint = config.get("access", "dragon")
79        self.dragon_vlans = config.get("access", "dragon_vlans")
80        self.deter_internal = config.get("access", "deter_internal")
81
82        self.tunnel_config = config.getboolean("access", "tunnel_config")
83        self.portal_command = config.get("access", "portal_command")
84        self.portal_image = config.get("access", "portal_image")
85        self.portal_type = config.get("access", "portal_type") or "pc"
86        self.portal_startcommand = config.get("access", "portal_startcommand")
87        self.node_startcommand = config.get("access", "node_startcommand")
88
89        self.federation_software = self.software_list(self.federation_software)
90        self.portal_software = self.software_list(self.portal_software)
91        self.local_seer_software = self.software_list(self.local_seer_software)
92
93        self.access_type = self.access_type.lower()
94        if self.access_type == 'remote_emulab':
95            self.start_segment = proxy_emulab_segment.start_segment
96            self.stop_segment = proxy_emulab_segment.stop_segment
97        elif self.access_type == 'local_emulab':
98            self.start_segment = local_emulab_segment.start_segment
99            self.stop_segment = local_emulab_segment.stop_segment
100        else:
101            self.start_segment = None
102            self.stop_segment = None
103
104        self.restricted = [ ]
105        self.access = { }
106        if config.has_option("access", "accessdb"):
107            self.read_access(config.get("access", "accessdb"))
108        tb = config.get('access', 'testbed')
109        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
110        else: self.testbed = [ ]
111
112        if config.has_option("access", "accessdb"):
113            self.read_access(config.get("access", "accessdb"), 
114                    self.make_access_project)
115
116        # read_state in the base_class
117        self.state_lock.acquire()
118        for a  in ('allocation', 'projects', 'keys', 'types'):
119            if a not in self.state:
120                self.state[a] = { }
121        self.allocation = self.state['allocation']
122        self.projects = self.state['projects']
123        self.keys = self.state['keys']
124        self.types = self.state['types']
125        # Add the ownership attributes to the authorizer.  Note that the
126        # indices of the allocation dict are strings, but the attributes are
127        # fedids, so there is a conversion.
128        for k in self.allocation.keys():
129            for o in self.allocation[k].get('owners', []):
130                self.auth.set_attribute(o, fedid(hexstr=k))
131            if self.allocation[k].has_key('userconfig'):
132                sfid = self.allocation[k]['userconfig']
133                fid = fedid(hexstr=sfid)
134                self.auth.set_attribute(fid, "/%s" % sfid)
135        self.state_lock.release()
136        self.exports = {
137                'SMB': self.export_SMB,
138                'seer': self.export_seer,
139                'tmcd': self.export_tmcd,
140                'userconfig': self.export_userconfig,
141                'project_export': self.export_project_export,
142                'local_seer_control': self.export_local_seer,
143                'seer_master': self.export_seer_master,
144                'hide_hosts': self.export_hide_hosts,
145                }
146
147        if not self.local_seer_image or not self.local_seer_software or \
148                not self.local_seer_start:
149            if 'local_seer_control' in self.exports:
150                del self.exports['local_seer_control']
151
152        if not self.local_seer_image or not self.local_seer_software or \
153                not self.seer_master_start:
154            if 'seer_master' in self.exports:
155                del self.exports['seer_master']
156
157
158        self.soap_services = {\
159            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
160            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
161            'StartSegment': soap_handler("StartSegment", self.StartSegment),
162            'TerminateSegment': soap_handler("TerminateSegment",
163                self.TerminateSegment),
164            }
165        self.xmlrpc_services =  {\
166            'RequestAccess': xmlrpc_handler('RequestAccess',
167                self.RequestAccess),
168            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
169                self.ReleaseAccess),
170            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
171            'TerminateSegment': xmlrpc_handler('TerminateSegment',
172                self.TerminateSegment),
173            }
174
175        self.call_SetValue = service_caller('SetValue')
176        self.call_GetValue = service_caller('GetValue', log=self.log)
177
178        if not config.has_option("allocate", "uri"):
179            self.allocate_project = \
180                allocate_project_local(config, auth)
181        else:
182            self.allocate_project = \
183                allocate_project_remote(config, auth)
184
185
186        # If the project allocator exports services, put them in this object's
187        # maps so that classes that instantiate this can call the services.
188        self.soap_services.update(self.allocate_project.soap_services)
189        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
190
191    @staticmethod
192    def make_access_project(str):
193        """
194        Convert a string of the form (id[:resources:resouces], id, id) into an
195        access_project.  This is called by read_access to convert to local
196        attributes.  It returns a tuple of the form (project, user, user) where
197        users may be names or fedids.
198        """
199        def parse_name(n):
200            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
201            else: return n
202
203        str = str.strip()
204        if str.startswith('(') and str.endswith(')'):
205            str = str[1:-1]
206            names = [ s.strip() for s in str.split(",")]
207            if len(names) > 3:
208                raise self.parse_error("More than three fields in name")
209            first = names[0].split(":")
210            if first == 'fedid:':
211                del first[0]
212                first[0] = fedid(hexstr=first[0])
213            names[0] = access_project(first[0], first[1:])
214
215            for i in range(1,2):
216                names[i] = parse_name(names[i])
217
218            return tuple(names)
219        else:
220            raise self.parse_error('Bad mapping (unbalanced parens)')
221
222
223    # RequestAccess support routines
224
225    def lookup_access(self, req, fid):
226        """
227        Look up the local access control information mapped to this fedid and
228        credentials.  In this case it is a (project, create_user, access_user)
229        triple, and a triple of three booleans indicating which, if any will
230        need to be dynamically created.  Finally a list of owners for that
231        allocation is returned.
232
233        lookup_access_base pulls the first triple out, and it is parsed by this
234        routine into the boolean map.  Owners is always the controlling fedid.
235        """
236        # Return values
237        rp = access_project(None, ())
238        ru = None
239        # This maps a valid user to the Emulab projects and users to use
240        found, match = self.lookup_access_base(req, fid)
241        tb, project, user = match
242       
243        if found == None:
244            raise service_error(service_error.access,
245                    "Access denied - cannot map access")
246
247        # resolve <dynamic> and <same> in found
248        dyn_proj = False
249        dyn_create_user = False
250        dyn_service_user = False
251
252        if found[0].name == "<same>":
253            if project != None:
254                rp.name = project
255            else : 
256                raise service_error(\
257                        service_error.server_config,
258                        "Project matched <same> when no project given")
259        elif found[0].name == "<dynamic>":
260            rp.name = None
261            dyn_proj = True
262        else:
263            rp.name = found[0].name
264        rp.node_types = found[0].node_types;
265
266        if found[1] == "<same>":
267            if user_match == "<any>":
268                if user != None: rcu = user[0]
269                else: raise service_error(\
270                        service_error.server_config,
271                        "Matched <same> on anonymous request")
272            else:
273                rcu = user_match
274        elif found[1] == "<dynamic>":
275            rcu = None
276            dyn_create_user = True
277        else:
278            rcu = found[1]
279       
280        if found[2] == "<same>":
281            if user_match == "<any>":
282                if user != None: rsu = user[0]
283                else: raise service_error(\
284                        service_error.server_config,
285                        "Matched <same> on anonymous request")
286            else:
287                rsu = user_match
288        elif found[2] == "<dynamic>":
289            rsu = None
290            dyn_service_user = True
291        else:
292            rsu = found[2]
293
294        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
295                [ fid ]
296
297    def do_project_allocation(self, dyn, project, user):
298        """
299        Call the project allocation routines and return the info.
300        """
301        if dyn: 
302            # Compose the dynamic project request
303            # (only dynamic, dynamic currently allowed)
304            preq = { 'AllocateProjectRequestBody': \
305                        { 'project' : {\
306                            'user': [ \
307                            { \
308                                'access': [ { 
309                                    'sshPubkey': self.ssh_pubkey_file } ],
310                                 'role': "serviceAccess",\
311                            }, \
312                            { \
313                                'access': [ { 
314                                    'sshPubkey': self.ssh_pubkey_file } ],
315                                 'role': "experimentCreation",\
316                            }, \
317                            ], \
318                            }\
319                        }\
320                    }
321            return self.allocate_project.dynamic_project(preq)
322        else:
323            preq = {'StaticProjectRequestBody' : \
324                    { 'project': \
325                        { 'name' : { 'localname' : project },\
326                          'user' : [ \
327                            {\
328                                'userID': { 'localname' : user }, \
329                                'access': [ { 
330                                    'sshPubkey': self.ssh_pubkey_file } ],
331                                'role': 'experimentCreation'\
332                            },\
333                            {\
334                                'userID': { 'localname' : user}, \
335                                'access': [ { 
336                                    'sshPubkey': self.ssh_pubkey_file } ],
337                                'role': 'serviceAccess'\
338                            },\
339                        ]}\
340                    }\
341            }
342            return self.allocate_project.static_project(preq)
343
344    def save_project_state(self, aid, ap, dyn, owners):
345        """
346        Parse out and save the information relevant to the project created for
347        this experiment.  That info is largely in ap and owners.  dyn indicates
348        that the project was created dynamically.  Return the user and project
349        names.
350        """
351        self.state_lock.acquire()
352        self.allocation[aid] = { }
353        try:
354            pname = ap['project']['name']['localname']
355        except KeyError:
356            pname = None
357
358        if dyn:
359            if not pname:
360                self.state_lock.release()
361                raise service_error(service_error.internal,
362                        "Misformed allocation response?")
363            if pname in self.projects: self.projects[pname] += 1
364            else: self.projects[pname] = 1
365            self.allocation[aid]['project'] = pname
366        else:
367            # sproject is a static project associated with this allocation.
368            self.allocation[aid]['sproject'] = pname
369
370        self.allocation[aid]['keys'] = [ ]
371
372        try:
373            for u in ap['project']['user']:
374                uname = u['userID']['localname']
375                if u['role'] == 'experimentCreation':
376                    self.allocation[aid]['user'] = uname
377                for k in [ k['sshPubkey'] for k in u['access'] \
378                        if k.has_key('sshPubkey') ]:
379                    kv = "%s:%s" % (uname, k)
380                    if self.keys.has_key(kv): self.keys[kv] += 1
381                    else: self.keys[kv] = 1
382                    self.allocation[aid]['keys'].append((uname, k))
383        except KeyError:
384            self.state_lock.release()
385            raise service_error(service_error.internal,
386                    "Misformed allocation response?")
387
388        self.allocation[aid]['owners'] = owners
389        self.write_state()
390        self.state_lock.release()
391        return (pname, uname)
392
393    # End of RequestAccess support routines
394
395    def RequestAccess(self, req, fid):
396        """
397        Handle the access request.  Proxy if not for us.
398
399        Parse out the fields and make the allocations or rejections if for us,
400        otherwise, assuming we're willing to proxy, proxy the request out.
401        """
402
403        def gateway_hardware(h):
404            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
405            else: return h
406
407        def get_export_project(svcs):
408            """
409            if the service requests includes one to export a project, return
410            that project.
411            """
412            rv = None
413            for s in svcs:
414                if s.get('name', '') == 'project_export' and \
415                        s.get('visibility', '') == 'export':
416                    if not rv: 
417                        for a in s.get('feddAttr', []):
418                            if a.get('attribute', '') == 'project' \
419                                    and 'value' in a:
420                                rv = a['value']
421                    else:
422                        raise service_error(service_error, access, 
423                                'Requesting multiple project exports is ' + \
424                                        'not supported');
425            return rv
426
427        # The dance to get into the request body
428        if req.has_key('RequestAccessRequestBody'):
429            req = req['RequestAccessRequestBody']
430        else:
431            raise service_error(service_error.req, "No request!?")
432
433
434        found, dyn, owners = self.lookup_access(req, fid)
435        ap = None
436
437        # if this includes a project export request and the exported
438        # project is not the access project, access denied.
439        if 'service' in req:
440            ep = get_export_project(req['service'])
441            if ep and ep != found[0].name:
442                raise service_error(service_error.access,
443                        "Cannot export %s" % ep)
444
445        if self.ssh_pubkey_file:
446            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
447        else:
448            raise service_error(service_error.internal, 
449                    "SSH access parameters required")
450        # keep track of what's been added
451        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
452        aid = unicode(allocID)
453
454        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
455
456        services, svc_state = self.export_services(req.get('service',[]),
457                pname, uname)
458        self.state_lock.acquire()
459        # Store services state in global state
460        for k, v in svc_state.items():
461            self.allocation[aid][k] = v
462        self.write_state()
463        self.state_lock.release()
464        # Give the owners the right to change this allocation
465        for o in owners:
466            self.auth.set_attribute(o, allocID)
467        try:
468            f = open("%s/%s.pem" % (self.certdir, aid), "w")
469            print >>f, alloc_cert
470            f.close()
471        except EnvironmentError, e:
472            raise service_error(service_error.internal, 
473                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
474        resp = self.build_access_response({ 'fedid': allocID } ,
475                ap, services)
476        return resp
477
478    def do_release_project(self, del_project, del_users, del_types):
479        """
480        If a project and users has to be deleted, make the call.
481        """
482        msg = { 'project': { }}
483        if del_project:
484            msg['project']['name']= {'localname': del_project}
485        users = [ ]
486        for u in del_users.keys():
487            users.append({ 'userID': { 'localname': u },\
488                'access' :  \
489                        [ {'sshPubkey' : s } for s in del_users[u]]\
490            })
491        if users: 
492            msg['project']['user'] = users
493        if len(del_types) > 0:
494            msg['resources'] = { 'node': \
495                    [ {'hardware': [ h ] } for h in del_types ]\
496                }
497        if self.allocate_project.release_project:
498            msg = { 'ReleaseProjectRequestBody' : msg}
499            self.allocate_project.release_project(msg)
500
501    def ReleaseAccess(self, req, fid):
502        # The dance to get into the request body
503        if req.has_key('ReleaseAccessRequestBody'):
504            req = req['ReleaseAccessRequestBody']
505        else:
506            raise service_error(service_error.req, "No request!?")
507
508        try:
509            if req['allocID'].has_key('localname'):
510                auth_attr = aid = req['allocID']['localname']
511            elif req['allocID'].has_key('fedid'):
512                aid = unicode(req['allocID']['fedid'])
513                auth_attr = req['allocID']['fedid']
514            else:
515                raise service_error(service_error.req,
516                        "Only localnames and fedids are understood")
517        except KeyError:
518            raise service_error(service_error.req, "Badly formed request")
519
520        self.log.debug("[access] deallocation requested for %s", aid)
521        if not self.auth.check_attribute(fid, auth_attr):
522            self.log.debug("[access] deallocation denied for %s", aid)
523            raise service_error(service_error.access, "Access Denied")
524
525        # If we know this allocation, reduce the reference counts and
526        # remove the local allocations.  Otherwise report an error.  If
527        # there is an allocation to delete, del_users will be a dictonary
528        # of sets where the key is the user that owns the keys in the set.
529        # We use a set to avoid duplicates.  del_project is just the name
530        # of any dynamic project to delete.  We're somewhat lazy about
531        # deleting authorization attributes.  Having access to something
532        # that doesn't exist isn't harmful.
533        del_users = { }
534        del_project = None
535        del_types = set()
536
537        self.state_lock.acquire()
538        if aid in self.allocation:
539            self.log.debug("Found allocation for %s" %aid)
540            for k in self.allocation[aid]['keys']:
541                kk = "%s:%s" % k
542                self.keys[kk] -= 1
543                if self.keys[kk] == 0:
544                    if not del_users.has_key(k[0]):
545                        del_users[k[0]] = set()
546                    del_users[k[0]].add(k[1])
547                    del self.keys[kk]
548
549            if 'project' in self.allocation[aid]:
550                pname = self.allocation[aid]['project']
551                self.projects[pname] -= 1
552                if self.projects[pname] == 0:
553                    del_project = pname
554                    del self.projects[pname]
555
556            if 'types' in self.allocation[aid]:
557                for t in self.allocation[aid]['types']:
558                    self.types[t] -= 1
559                    if self.types[t] == 0:
560                        if not del_project: del_project = t[0]
561                        del_types.add(t[1])
562                        del self.types[t]
563
564            del self.allocation[aid]
565            self.write_state()
566            self.state_lock.release()
567            # If we actually have resources to deallocate, prepare the call.
568            if del_project or del_users:
569                self.do_release_project(del_project, del_users, del_types)
570            # And remove the access cert
571            cf = "%s/%s.pem" % (self.certdir, aid)
572            self.log.debug("Removing %s" % cf)
573            os.remove(cf)
574            return { 'allocID': req['allocID'] } 
575        else:
576            self.state_lock.release()
577            raise service_error(service_error.req, "No such allocation")
578
579    # These are subroutines for StartSegment
580    def generate_ns2(self, topo, expfn, softdir, connInfo):
581        """
582        Convert topo into an ns2 file, decorated with appropriate commands for
583        the particular testbed setup.  Convert all requests for software, etc
584        to point at the staged copies on this testbed and add the federation
585        startcommands.
586        """
587        class dragon_commands:
588            """
589            Functor to spit out approrpiate dragon commands for nodes listed in
590            the connectivity description.  The constructor makes a dict mapping
591            dragon nodes to their parameters and the __call__ checks each
592            element in turn for membership.
593            """
594            def __init__(self, map):
595                self.node_info = map
596
597            def __call__(self, e):
598                s = ""
599                if isinstance(e, topdl.Computer):
600                    if self.node_info.has_key(e.name):
601                        info = self.node_info[e.name]
602                        for ifname, vlan, type in info:
603                            for i in e.interface:
604                                if i.name == ifname:
605                                    addr = i.get_attribute('ip4_address')
606                                    subs = i.substrate[0]
607                                    break
608                            else:
609                                raise service_error(service_error.internal,
610                                        "No interface %s on element %s" % \
611                                                (ifname, e.name))
612                            # XXX: do netmask right
613                            if type =='link':
614                                s = ("tb-allow-external ${%s} " + \
615                                        "dragonportal ip %s vlan %s " + \
616                                        "netmask 255.255.255.0\n") % \
617                                        (topdl.to_tcl_name(e.name), addr, vlan)
618                            elif type =='lan':
619                                s = ("tb-allow-external ${%s} " + \
620                                        "dragonportal " + \
621                                        "ip %s vlan %s usurp %s\n") % \
622                                        (topdl.to_tcl_name(e.name), addr, 
623                                                vlan, subs)
624                            else:
625                                raise service_error(service_error_internal,
626                                        "Unknown DRAGON type %s" % type)
627                return s
628
629        class not_dragon:
630            """
631            Return true if a node is in the given map of dragon nodes.
632            """
633            def __init__(self, map):
634                self.nodes = set(map.keys())
635
636            def __call__(self, e):
637                return e.name not in self.nodes
638
639        # Main line of generate_ns2
640        t = topo.clone()
641
642        # Create the map of nodes that need direct connections (dragon
643        # connections) from the connInfo
644        dragon_map = { }
645        for i in [ i for i in connInfo if i['type'] == 'transit']:
646            for a in i.get('fedAttr', []):
647                if a['attribute'] == 'vlan_id':
648                    vlan = a['value']
649                    break
650            else:
651                raise service_error(service_error.internal, "No vlan tag")
652            members = i.get('member', [])
653            if len(members) > 1: type = 'lan'
654            else: type = 'link'
655
656            try:
657                for m in members:
658                    if m['element'] in dragon_map:
659                        dragon_map[m['element']].append(( m['interface'], 
660                            vlan, type))
661                    else:
662                        dragon_map[m['element']] = [( m['interface'], 
663                            vlan, type),]
664            except KeyError:
665                raise service_error(service_error.req,
666                        "Missing connectivity info")
667
668        # Weed out the things we aren't going to instantiate: Segments, portal
669        # substrates, and portal interfaces.  (The copy in the for loop allows
670        # us to delete from e.elements in side the for loop).  While we're
671        # touching all the elements, we also adjust paths from the original
672        # testbed to local testbed paths and put the federation commands into
673        # the start commands
674        for e in [e for e in t.elements]:
675            if isinstance(e, topdl.Segment):
676                t.elements.remove(e)
677            if isinstance(e, topdl.Computer):
678                self.add_kit(e, self.federation_software)
679                if e.get_attribute('portal') and self.portal_startcommand:
680                    # Add local portal support software
681                    self.add_kit(e, self.portal_software)
682                    # Portals never have a user-specified start command
683                    e.set_attribute('startup', self.portal_startcommand)
684                elif self.node_startcommand:
685                    if e.get_attribute('startup'):
686                        e.set_attribute('startup', "%s \\$USER '%s'" % \
687                                (self.node_startcommand, 
688                                    e.get_attribute('startup')))
689                    else:
690                        e.set_attribute('startup', self.node_startcommand)
691
692                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
693                # Remove portal interfaces that do not connect to DRAGON
694                e.interface = [i for i in e.interface \
695                        if not i.get_attribute('portal') or i.name in dinf ]
696            # Fix software paths
697            for s in getattr(e, 'software', []):
698                s.location = re.sub("^.*/", softdir, s.location)
699
700        t.substrates = [ s.clone() for s in t.substrates ]
701        t.incorporate_elements()
702
703        # Customize the ns2 output for local portal commands and images
704        filters = []
705
706        if self.dragon_endpoint:
707            add_filter = not_dragon(dragon_map)
708            filters.append(dragon_commands(dragon_map))
709        else:
710            add_filter = None
711
712        if self.portal_command:
713            filters.append(topdl.generate_portal_command_filter(
714                self.portal_command, add_filter=add_filter))
715
716        if self.portal_image:
717            filters.append(topdl.generate_portal_image_filter(
718                self.portal_image))
719
720        if self.portal_type:
721            filters.append(topdl.generate_portal_hardware_filter(
722                self.portal_type))
723
724        # Convert to ns and write it out
725        expfile = topdl.topology_to_ns2(t, filters)
726        try:
727            f = open(expfn, "w")
728            print >>f, expfile
729            f.close()
730        except EnvironmentError:
731            raise service_error(service_error.internal,
732                    "Cannot write experiment file %s: %s" % (expfn,e))
733
734    def export_store_info(self, cf, proj, ename, connInfo):
735        """
736        For the export requests in the connection info, install the peer names
737        at the experiment controller via SetValue calls.
738        """
739
740        for c in connInfo:
741            for p in [ p for p in c.get('parameter', []) \
742                    if p.get('type', '') == 'output']:
743
744                if p.get('name', '') == 'peer':
745                    k = p.get('key', None)
746                    surl = p.get('store', None)
747                    if surl and k and k.index('/') != -1:
748                        value = "%s.%s.%s%s" % \
749                                (k[k.index('/')+1:], ename, proj, self.domain)
750                        req = { 'name': k, 'value': value }
751                        self.log.debug("Setting %s to %s on %s" % \
752                                (k, value, surl))
753                        self.call_SetValue(surl, req, cf)
754                    else:
755                        self.log.error("Bad export request: %s" % p)
756                elif p.get('name', '') == 'ssh_port':
757                    k = p.get('key', None)
758                    surl = p.get('store', None)
759                    if surl and k:
760                        req = { 'name': k, 'value': self.ssh_port }
761                        self.log.debug("Setting %s to %s on %s" % \
762                                (k, self.ssh_port, surl))
763                        self.call_SetValue(surl, req, cf)
764                    else:
765                        self.log.error("Bad export request: %s" % p)
766                else:
767                    self.log.error("Unknown export parameter: %s" % \
768                            p.get('name'))
769                    continue
770
771    def add_seer_node(self, topo, name, startup):
772        """
773        Add a seer node to the given topology, with the startup command passed
774        in.  Used by configure seer_services.
775        """
776        c_node = topdl.Computer(
777                name=name, 
778                os= topdl.OperatingSystem(
779                    attribute=[
780                    { 'attribute': 'osid', 
781                        'value': self.local_seer_image },
782                    ]),
783                attribute=[
784                    { 'attribute': 'startup', 'value': startup },
785                    ]
786                )
787        self.add_kit(c_node, self.local_seer_software)
788        topo.elements.append(c_node)
789
790    def configure_seer_services(self, services, topo, softdir):
791        """
792        Make changes to the topology required for the seer requests being made.
793        Specifically, add any control or master nodes required and set up the
794        start commands on the nodes to interconnect them.
795        """
796        local_seer = False      # True if we need to add a control node
797        collect_seer = False    # True if there is a seer-master node
798        seer_master= False      # True if we need to add the seer-master
799        for s in services:
800            s_name = s.get('name', '')
801            s_vis = s.get('visibility','')
802
803            if s_name  == 'local_seer_control' and s_vis == 'export':
804                local_seer = True
805            elif s_name == 'seer_master':
806                if s_vis == 'import':
807                    collect_seer = True
808                elif s_vis == 'export':
809                    seer_master = True
810       
811        # We've got the whole picture now, so add nodes if needed and configure
812        # them to interconnect properly.
813        if local_seer or seer_master:
814            # Copy local seer control node software to the tempdir
815            for l, f in self.local_seer_software:
816                base = os.path.basename(f)
817                copy_file(f, "%s/%s" % (softdir, base))
818        # If we're collecting seers somewhere the controllers need to talk to
819        # the master.  In testbeds that export the service, that will be a
820        # local node that we'll add below.  Elsewhere it will be the control
821        # portal that will port forward to the exporting master.
822        if local_seer:
823            if collect_seer:
824                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
825            else:
826                startup = self.local_seer_start
827            self.add_seer_node(topo, 'control', startup)
828        # If this is the seer master, add that node, too.
829        if seer_master:
830            self.add_seer_node(topo, 'seer-master', 
831                    "%s -R -n -R seer-master -R -A -R sink" % \
832                            self.seer_master_start)
833
834    def retrieve_software(self, topo, certfile, softdir):
835        """
836        Collect the software that nodes in the topology need loaded and stage
837        it locally.  This implies retrieving it from the experiment_controller
838        and placing it into softdir.  Certfile is used to prove that this node
839        has access to that data (it's the allocation/segment fedid).  Finally
840        local portal and federation software is also copied to the same staging
841        directory for simplicity - all software needed for experiment creation
842        is in softdir.
843        """
844        sw = set()
845        for e in topo.elements:
846            for s in getattr(e, 'software', []):
847                sw.add(s.location)
848        for s in sw:
849            self.log.debug("Retrieving %s" % s)
850            try:
851                get_url(s, certfile, softdir)
852            except:
853                t, v, st = sys.exc_info()
854                raise service_error(service_error.internal,
855                        "Error retrieving %s: %s" % (s, v))
856
857        # Copy local federation and portal node software to the tempdir
858        for s in (self.federation_software, self.portal_software):
859            for l, f in s:
860                base = os.path.basename(f)
861                copy_file(f, "%s/%s" % (softdir, base))
862
863
864    def initialize_experiment_info(self, attrs, aid, certfile, tmpdir):
865        """
866        Gather common configuration files, retrieve or create an experiment
867        name and project name, and return the ssh_key filenames.  Create an
868        allocation log bound to the state log variable as well.
869        """
870        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
871        ename = None
872        pubkey_base = None
873        secretkey_base = None
874        proj = None
875        user = None
876        alloc_log = None
877
878        for a in attrs:
879            if a['attribute'] in configs:
880                try:
881                    self.log.debug("Retrieving %s from %s" % \
882                            (a['attribute'], a['value']))
883                    get_url(a['value'], certfile, tmpdir)
884                except:
885                    t, v, st = sys.exc_info()
886                    raise service_error(service_error.internal,
887                            "Error retrieving %s: %s" % (a.get('value', ""), v))
888            if a['attribute'] == 'ssh_pubkey':
889                pubkey_base = a['value'].rpartition('/')[2]
890            if a['attribute'] == 'ssh_secretkey':
891                secretkey_base = a['value'].rpartition('/')[2]
892            if a['attribute'] == 'experiment_name':
893                ename = a['value']
894
895        if not ename:
896            ename = ""
897            for i in range(0,5):
898                ename += random.choice(string.ascii_letters)
899            self.log.warn("No experiment name: picked one randomly: %s" \
900                    % ename)
901
902        if not pubkey_base:
903            raise service_error(service_error.req, 
904                    "No public key attribute")
905
906        if not secretkey_base:
907            raise service_error(service_error.req, 
908                    "No secret key attribute")
909
910        self.state_lock.acquire()
911        if aid in self.allocation:
912            proj = self.allocation[aid].get('project', None)
913            if not proj: 
914                proj = self.allocation[aid].get('sproject', None)
915            user = self.allocation[aid].get('user', None)
916            self.allocation[aid]['experiment'] = ename
917            self.allocation[aid]['log'] = [ ]
918            # Create a logger that logs to the experiment's state object as
919            # well as to the main log file.
920            alloc_log = logging.getLogger('fedd.access.%s' % ename)
921            h = logging.StreamHandler(
922                    list_log.list_log(self.allocation[aid]['log']))
923            # XXX: there should be a global one of these rather than
924            # repeating the code.
925            h.setFormatter(logging.Formatter(
926                "%(asctime)s %(name)s %(message)s",
927                        '%d %b %y %H:%M:%S'))
928            alloc_log.addHandler(h)
929            self.write_state()
930        self.state_lock.release()
931
932        if not proj:
933            raise service_error(service_error.internal, 
934                    "Can't find project for %s" %aid)
935
936        if not user:
937            raise service_error(service_error.internal, 
938                    "Can't find creation user for %s" %aid)
939
940        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
941
942    def finalize_experiment(self, starter, topo, aid, alloc_id):
943        """
944        Store key bits of experiment state in the global repository, including
945        the response that may need to be replayed, and return the response.
946        """
947        # Copy the assigned names into the return topology
948        embedding = [ ]
949        for n in starter.node:
950            embedding.append({ 
951                'toponame': n,
952                'physname': ["%s%s" %  (starter.node[n], self.domain)],
953                })
954        # Grab the log (this is some anal locking, but better safe than
955        # sorry)
956        self.state_lock.acquire()
957        logv = "".join(self.allocation[aid]['log'])
958        # It's possible that the StartSegment call gets retried (!).
959        # if the 'started' key is in the allocation, we'll return it rather
960        # than redo the setup.
961        self.allocation[aid]['started'] = { 
962                'allocID': alloc_id,
963                'allocationLog': logv,
964                'segmentdescription': { 
965                    'topdldescription': topo.clone().to_dict()
966                    },
967                'embedding': embedding
968                }
969        retval = copy.copy(self.allocation[aid]['started'])
970        self.write_state()
971        self.state_lock.release()
972        return retval
973   
974    # End of StartSegment support routines
975
976    def StartSegment(self, req, fid):
977        err = None  # Any service_error generated after tmpdir is created
978        rv = None   # Return value from segment creation
979
980        try:
981            req = req['StartSegmentRequestBody']
982            auth_attr = req['allocID']['fedid']
983            topref = req['segmentdescription']['topdldescription']
984        except KeyError:
985            raise service_error(server_error.req, "Badly formed request")
986
987        connInfo = req.get('connection', [])
988        services = req.get('service', [])
989        aid = "%s" % auth_attr
990        attrs = req.get('fedAttr', [])
991        if not self.auth.check_attribute(fid, auth_attr):
992            raise service_error(service_error.access, "Access denied")
993        else:
994            # See if this is a replay of an earlier succeeded StartSegment -
995            # sometimes SSL kills 'em.  If so, replay the response rather than
996            # redoing the allocation.
997            self.state_lock.acquire()
998            retval = self.allocation[aid].get('started', None)
999            self.state_lock.release()
1000            if retval:
1001                self.log.warning("Duplicate StartSegment for %s: " % aid + \
1002                        "replaying response")
1003                return retval
1004
1005        # A new request.  Do it.
1006
1007        if topref: topo = topdl.Topology(**topref)
1008        else:
1009            raise service_error(service_error.req, 
1010                    "Request missing segmentdescription'")
1011       
1012        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1013        try:
1014            tmpdir = tempfile.mkdtemp(prefix="access-")
1015            softdir = "%s/software" % tmpdir
1016            os.mkdir(softdir)
1017        except EnvironmentError:
1018            raise service_error(service_error.internal, "Cannot create tmp dir")
1019
1020        # Try block alllows us to clean up temporary files.
1021        try:
1022            self.retrieve_software(topo, certfile, softdir)
1023            ename, proj, user, pubkey_base, secretkey_base, alloc_log = \
1024                    self.initialize_experiment_info(attrs, aid, 
1025                            certfile, tmpdir)
1026
1027            # Set up userconf and seer if needed
1028            self.configure_userconf(services, tmpdir)
1029            self.configure_seer_services(services, topo, softdir)
1030            # Get and send synch store variables
1031            self.export_store_info(certfile, proj, ename, connInfo)
1032            self.import_store_info(certfile, connInfo)
1033
1034            expfile = "%s/experiment.tcl" % tmpdir
1035
1036            self.generate_portal_configs(topo, pubkey_base, 
1037                    secretkey_base, tmpdir, proj, ename, connInfo, services)
1038            self.generate_ns2(topo, expfile, 
1039                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
1040
1041            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1042                    debug=self.create_debug, log=alloc_log, boss=self.boss,
1043                    cert=self.xmlrpc_cert)
1044            rv = starter(self, ename, proj, user, expfile, tmpdir)
1045        except service_error, e:
1046            err = e
1047        except:
1048            t, v, st = sys.exc_info()
1049            err = service_error(service_error.internal, "%s: %s" % \
1050                    (v, traceback.extract_tb(st)))
1051
1052        # Walk up tmpdir, deleting as we go
1053        if self.cleanup: self.remove_dirs(tmpdir)
1054        else: self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1055
1056        if rv:
1057            return self.finalize_experiment(starter, topo, aid, req['allocID'])
1058        elif err:
1059            raise service_error(service_error.federant,
1060                    "Swapin failed: %s" % err)
1061        else:
1062            raise service_error(service_error.federant, "Swapin failed")
1063
1064    def TerminateSegment(self, req, fid):
1065        try:
1066            req = req['TerminateSegmentRequestBody']
1067        except KeyError:
1068            raise service_error(server_error.req, "Badly formed request")
1069
1070        auth_attr = req['allocID']['fedid']
1071        aid = "%s" % auth_attr
1072        attrs = req.get('fedAttr', [])
1073        if not self.auth.check_attribute(fid, auth_attr):
1074            raise service_error(service_error.access, "Access denied")
1075
1076        self.state_lock.acquire()
1077        if aid in self.allocation:
1078            proj = self.allocation[aid].get('project', None)
1079            if not proj: 
1080                proj = self.allocation[aid].get('sproject', None)
1081            user = self.allocation[aid].get('user', None)
1082            ename = self.allocation[aid].get('experiment', None)
1083        else:
1084            proj = None
1085            user = None
1086            ename = None
1087        self.state_lock.release()
1088
1089        if not proj:
1090            raise service_error(service_error.internal, 
1091                    "Can't find project for %s" % aid)
1092
1093        if not user:
1094            raise service_error(service_error.internal, 
1095                    "Can't find creation user for %s" % aid)
1096        if not ename:
1097            raise service_error(service_error.internal, 
1098                    "Can't find experiment name for %s" % aid)
1099        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1100                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
1101        stopper(self, user, proj, ename)
1102        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.