source: fedd/federation/emulab_access.py @ 8cf2b90e

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 8cf2b90e was 8cf2b90e, checked in by Ted Faber <faber@…>, 14 years ago

more cleanup

  • Property mode set to 100644
File size: 34.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from access import access_base
17
18from util import *
19from allocate_project import allocate_project_local, allocate_project_remote
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30import topdl
31import list_log
32import proxy_emulab_segment
33import local_emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    proxy_RequestAccess= service_caller('RequestAccess')
52    proxy_ReleaseAccess= service_caller('ReleaseAccess')
53
54    def __init__(self, config=None, auth=None):
55        """
56        Initializer.  Pulls parameters out of the ConfigParser's access section.
57        """
58
59        access_base.__init__(self, config, auth)
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.boss = config.get("access", "boss")
64        self.ops = config.get("access", "ops")
65        self.domain = config.get("access", "domain")
66        self.fileserver = config.get("access", "fileserver")
67        self.eventserver = config.get("access", "eventserver")
68        self.userconfdir = config.get("access","userconfdir")
69        self.userconfcmd = config.get("access","userconfcmd")
70        self.userconfurl = config.get("access","userconfurl")
71        self.federation_software = config.get("access", "federation_software")
72        self.portal_software = config.get("access", "portal_software")
73        self.local_seer_software = config.get("access", "local_seer_software")
74        self.local_seer_image = config.get("access", "local_seer_image")
75        self.local_seer_start = config.get("access", "local_seer_start")
76        self.seer_master_start = config.get("access", "seer_master_start")
77        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
78        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
79        self.ssh_port = config.get("access","ssh_port") or "22"
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        if self.access_type == 'remote_emulab':
98            self.start_segment = proxy_emulab_segment.start_segment
99            self.stop_segment = proxy_emulab_segment.stop_segment
100        elif self.access_type == 'local_emulab':
101            self.start_segment = local_emulab_segment.start_segment
102            self.stop_segment = local_emulab_segment.stop_segment
103        else:
104            self.start_segment = None
105            self.stop_segment = None
106
107        self.restricted = [ ]
108        self.projects = { }
109        self.keys = { }
110        self.types = { }
111        self.allocation = { }
112        self.state = { 
113            'projects': self.projects,
114            'allocation' : self.allocation,
115            'keys' : self.keys,
116            'types': self.types
117        }
118        self.access = { }
119        if config.has_option("access", "accessdb"):
120            self.read_access(config.get("access", "accessdb"))
121
122        if not self.local_seer_image or not self.local_seer_software:
123            self.exports.discard('local_seer_control')
124            self.exports.discard('seer_master')
125
126        if not self.local_seer_start:
127            self.exports.discard('local_seer_control')
128
129        if not self.seer_master_start:
130            self.exports.discard('seer_master')
131
132        tb = config.get('access', 'testbed')
133        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
134        else: self.testbed = [ ]
135
136        if config.has_option("access", "accessdb"):
137            self.read_access(config.get("access", "accessdb"), 
138                    self.make_access_project)
139
140        # read state in the base_class
141        self.state_lock.acquire()
142        self.allocation = self.state['allocation']
143        self.projects = self.state['projects']
144        self.keys = self.state['keys']
145        self.types = self.state['types']
146        # Add the ownership attributes to the authorizer.  Note that the
147        # indices of the allocation dict are strings, but the attributes are
148        # fedids, so there is a conversion.
149        for k in self.allocation.keys():
150            for o in self.allocation[k].get('owners', []):
151                self.auth.set_attribute(o, fedid(hexstr=k))
152            if self.allocation[k].has_key('userconfig'):
153                sfid = self.allocation[k]['userconfig']
154                fid = fedid(hexstr=sfid)
155                self.auth.set_attribute(fid, "/%s" % sfid)
156        self.state_lock.release()
157
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment",
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176        self.call_SetValue = service_caller('SetValue')
177        self.call_GetValue = service_caller('GetValue', log=self.log)
178
179        if not config.has_option("allocate", "uri"):
180            self.allocate_project = \
181                allocate_project_local(config, auth)
182        else:
183            self.allocate_project = \
184                allocate_project_remote(config, auth)
185
186
187        # If the project allocator exports services, put them in this object's
188        # maps so that classes that instantiate this can call the services.
189        self.soap_services.update(self.allocate_project.soap_services)
190        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
191
192    @staticmethod
193    def make_access_project(str):
194        def parse_name(n):
195            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
196            else: return n
197
198        str = str.strip()
199        if str.startswith('(') and str.endswith(')'):
200            str = str[1:-1]
201            names = [ s.strip() for s in str.split(",")]
202            if len(names) > 3:
203                raise self.parse_error("More than three fields in name")
204            first = names[0].split(":")
205            if first == 'fedid:':
206                del first[0]
207                first[0] = fedid(hexstr=first[0])
208            names[0] = access_project(first[0], first[1:])
209
210            for i in range(1,2):
211                names[i] = parse_name(names[i])
212
213            return tuple(names)
214        else:
215            raise self.parse_error('Bad mapping (unbalanced parens)')
216
217
218    def lookup_access(self, req, fid):
219        """
220        Determine the allowed access for this request.  Return the access and
221        which fields are dynamic.
222
223        The fedid is needed to construct the request
224        """
225        # Return values
226        rp = access_project(None, ())
227        ru = None
228        # This maps a valid user to the Emulab projects and users to use
229        found, match = self.lookup_access_base(req, fid)
230        tb, project, user = match
231       
232        if found == None:
233            raise service_error(service_error.access,
234                    "Access denied - cannot map access")
235
236        # resolve <dynamic> and <same> in found
237        dyn_proj = False
238        dyn_create_user = False
239        dyn_service_user = False
240
241        if found[0].name == "<same>":
242            if project != None:
243                rp.name = project
244            else : 
245                raise service_error(\
246                        service_error.server_config,
247                        "Project matched <same> when no project given")
248        elif found[0].name == "<dynamic>":
249            rp.name = None
250            dyn_proj = True
251        else:
252            rp.name = found[0].name
253        rp.node_types = found[0].node_types;
254
255        if found[1] == "<same>":
256            if user_match == "<any>":
257                if user != None: rcu = user[0]
258                else: raise service_error(\
259                        service_error.server_config,
260                        "Matched <same> on anonymous request")
261            else:
262                rcu = user_match
263        elif found[1] == "<dynamic>":
264            rcu = None
265            dyn_create_user = True
266        else:
267            rcu = found[1]
268       
269        if found[2] == "<same>":
270            if user_match == "<any>":
271                if user != None: rsu = user[0]
272                else: raise service_error(\
273                        service_error.server_config,
274                        "Matched <same> on anonymous request")
275            else:
276                rsu = user_match
277        elif found[2] == "<dynamic>":
278            rsu = None
279            dyn_service_user = True
280        else:
281            rsu = found[2]
282
283        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
284                [ fid ]
285
286    def do_project_allocation(self, dyn, project, user):
287        """
288        Call the project allocation routines and return the info.
289        """
290        if dyn: 
291            # Compose the dynamic project request
292            # (only dynamic, dynamic currently allowed)
293            preq = { 'AllocateProjectRequestBody': \
294                        { 'project' : {\
295                            'user': [ \
296                            { \
297                                'access': [ { 
298                                    'sshPubkey': self.ssh_pubkey_file } ],
299                                 'role': "serviceAccess",\
300                            }, \
301                            { \
302                                'access': [ { 
303                                    'sshPubkey': self.ssh_pubkey_file } ],
304                                 'role': "experimentCreation",\
305                            }, \
306                            ], \
307                            }\
308                        }\
309                    }
310            return self.allocate_project.dynamic_project(preq)
311        else:
312            preq = {'StaticProjectRequestBody' : \
313                    { 'project': \
314                        { 'name' : { 'localname' : project },\
315                          'user' : [ \
316                            {\
317                                'userID': { 'localname' : user }, \
318                                'access': [ { 
319                                    'sshPubkey': self.ssh_pubkey_file } ],
320                                'role': 'experimentCreation'\
321                            },\
322                            {\
323                                'userID': { 'localname' : user}, \
324                                'access': [ { 
325                                    'sshPubkey': self.ssh_pubkey_file } ],
326                                'role': 'serviceAccess'\
327                            },\
328                        ]}\
329                    }\
330            }
331            return self.allocate_project.static_project(preq)
332
333    def save_project_state(self, aid, ap, dyn, owners):
334        """
335        Parse out and save the information relevant to the project created for
336        this experiment.  That info is largely in ap and owners.  dyn indicates
337        that the project was created dynamically.  Return the user and project
338        names.
339        """
340        self.state_lock.acquire()
341        self.allocation[aid] = { }
342        try:
343            pname = ap['project']['name']['localname']
344        except KeyError:
345            pname = None
346
347        if dyn:
348            if not pname:
349                self.state_lock.release()
350                raise service_error(service_error.internal,
351                        "Misformed allocation response?")
352            if pname in self.projects: self.projects[pname] += 1
353            else: self.projects[pname] = 1
354            self.allocation[aid]['project'] = pname
355        else:
356            # sproject is a static project associated with this allocation.
357            self.allocation[aid]['sproject'] = pname
358
359        self.allocation[aid]['keys'] = [ ]
360
361        try:
362            for u in ap['project']['user']:
363                uname = u['userID']['localname']
364                if u['role'] == 'experimentCreation':
365                    self.allocation[aid]['user'] = uname
366                for k in [ k['sshPubkey'] for k in u['access'] \
367                        if k.has_key('sshPubkey') ]:
368                    kv = "%s:%s" % (uname, k)
369                    if self.keys.has_key(kv): self.keys[kv] += 1
370                    else: self.keys[kv] = 1
371                    self.allocation[aid]['keys'].append((uname, k))
372        except KeyError:
373            self.state_lock.release()
374            raise service_error(service_error.internal,
375                    "Misformed allocation response?")
376
377        self.allocation[aid]['owners'] = owners
378        self.write_state()
379        self.state_lock.release()
380        return (pname, uname)
381
382    def RequestAccess(self, req, fid):
383        """
384        Handle the access request.  Proxy if not for us.
385
386        Parse out the fields and make the allocations or rejections if for us,
387        otherwise, assuming we're willing to proxy, proxy the request out.
388        """
389
390        def gateway_hardware(h):
391            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
392            else: return h
393
394        def get_export_project(svcs):
395            """
396            if the service requests includes one to export a project, return
397            that project.
398            """
399            rv = None
400            for s in svcs:
401                if s.get('name', '') == 'project_export' and \
402                        s.get('visibility', '') == 'export':
403                    if not rv: 
404                        for a in s.get('feddAttr', []):
405                            if a.get('attribute', '') == 'project' \
406                                    and 'value' in a:
407                                rv = a['value']
408                    else:
409                        raise service_error(service_error, access, 
410                                'Requesting multiple project exports is ' + \
411                                        'not supported');
412            return rv
413
414        # The dance to get into the request body
415        if req.has_key('RequestAccessRequestBody'):
416            req = req['RequestAccessRequestBody']
417        else:
418            raise service_error(service_error.req, "No request!?")
419
420
421        found, dyn, owners = self.lookup_access(req, fid)
422        ap = None
423
424        # if this includes a project export request and the exported
425        # project is not the access project, access denied.
426        if 'service' in req:
427            ep = get_export_project(req['service'])
428            if ep and ep != found[0].name:
429                raise service_error(service_error.access,
430                        "Cannot export %s" % ep)
431
432        if self.ssh_pubkey_file:
433            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
434        else:
435            raise service_error(service_error.internal, 
436                    "SSH access parameters required")
437        # keep track of what's been added
438        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
439        aid = unicode(allocID)
440
441        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
442
443        services, svc_state = self.export_services(req.get('service',[]),
444                pname, uname)
445        self.state_lock.acquire()
446        # Store services state in global state
447        for k, v in svc_state.items():
448            self.allocation[aid][k] = v
449        self.write_state()
450        self.state_lock.release()
451        # Give the owners the right to change this allocation
452        for o in owners:
453            self.auth.set_attribute(o, allocID)
454        try:
455            f = open("%s/%s.pem" % (self.certdir, aid), "w")
456            print >>f, alloc_cert
457            f.close()
458        except EnvironmentError, e:
459            raise service_error(service_error.internal, 
460                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
461        resp = self.build_access_response({ 'fedid': allocID } ,
462                ap, services)
463        return resp
464
465    def do_release_project(self, del_project, del_users, del_types):
466        """
467        If a project and users has to be deleted, make the call.
468        """
469        msg = { 'project': { }}
470        if del_project:
471            msg['project']['name']= {'localname': del_project}
472        users = [ ]
473        for u in del_users.keys():
474            users.append({ 'userID': { 'localname': u },\
475                'access' :  \
476                        [ {'sshPubkey' : s } for s in del_users[u]]\
477            })
478        if users: 
479            msg['project']['user'] = users
480        if len(del_types) > 0:
481            msg['resources'] = { 'node': \
482                    [ {'hardware': [ h ] } for h in del_types ]\
483                }
484        if self.allocate_project.release_project:
485            msg = { 'ReleaseProjectRequestBody' : msg}
486            self.allocate_project.release_project(msg)
487
488    def ReleaseAccess(self, req, fid):
489        # The dance to get into the request body
490        if req.has_key('ReleaseAccessRequestBody'):
491            req = req['ReleaseAccessRequestBody']
492        else:
493            raise service_error(service_error.req, "No request!?")
494
495        try:
496            if req['allocID'].has_key('localname'):
497                auth_attr = aid = req['allocID']['localname']
498            elif req['allocID'].has_key('fedid'):
499                aid = unicode(req['allocID']['fedid'])
500                auth_attr = req['allocID']['fedid']
501            else:
502                raise service_error(service_error.req,
503                        "Only localnames and fedids are understood")
504        except KeyError:
505            raise service_error(service_error.req, "Badly formed request")
506
507        self.log.debug("[access] deallocation requested for %s", aid)
508        if not self.auth.check_attribute(fid, auth_attr):
509            self.log.debug("[access] deallocation denied for %s", aid)
510            raise service_error(service_error.access, "Access Denied")
511
512        # If we know this allocation, reduce the reference counts and
513        # remove the local allocations.  Otherwise report an error.  If
514        # there is an allocation to delete, del_users will be a dictonary
515        # of sets where the key is the user that owns the keys in the set.
516        # We use a set to avoid duplicates.  del_project is just the name
517        # of any dynamic project to delete.  We're somewhat lazy about
518        # deleting authorization attributes.  Having access to something
519        # that doesn't exist isn't harmful.
520        del_users = { }
521        del_project = None
522        del_types = set()
523
524        self.state_lock.acquire()
525        if aid in self.allocation:
526            self.log.debug("Found allocation for %s" %aid)
527            for k in self.allocation[aid]['keys']:
528                kk = "%s:%s" % k
529                self.keys[kk] -= 1
530                if self.keys[kk] == 0:
531                    if not del_users.has_key(k[0]):
532                        del_users[k[0]] = set()
533                    del_users[k[0]].add(k[1])
534                    del self.keys[kk]
535
536            if 'project' in self.allocation[aid]:
537                pname = self.allocation[aid]['project']
538                self.projects[pname] -= 1
539                if self.projects[pname] == 0:
540                    del_project = pname
541                    del self.projects[pname]
542
543            if 'types' in self.allocation[aid]:
544                for t in self.allocation[aid]['types']:
545                    self.types[t] -= 1
546                    if self.types[t] == 0:
547                        if not del_project: del_project = t[0]
548                        del_types.add(t[1])
549                        del self.types[t]
550
551            del self.allocation[aid]
552            self.write_state()
553            self.state_lock.release()
554            # If we actually have resources to deallocate, prepare the call.
555            if del_project or del_users:
556                self.do_release_project(del_project, del_users, del_types)
557            # And remove the access cert
558            cf = "%s/%s.pem" % (self.certdir, aid)
559            self.log.debug("Removing %s" % cf)
560            os.remove(cf)
561            return { 'allocID': req['allocID'] } 
562        else:
563            self.state_lock.release()
564            raise service_error(service_error.req, "No such allocation")
565
566
567    def generate_ns2(self, topo, expfn, softdir, connInfo):
568        class dragon_commands:
569            """
570            Functor to spit out approrpiate dragon commands for nodes listed in
571            the connectivity description.  The constructor makes a dict mapping
572            dragon nodes to their parameters and the __call__ checks each
573            element in turn for membership.
574            """
575            def __init__(self, map):
576                self.node_info = map
577
578            def __call__(self, e):
579                s = ""
580                if isinstance(e, topdl.Computer):
581                    if self.node_info.has_key(e.name):
582                        info = self.node_info[e.name]
583                        for ifname, vlan, type in info:
584                            for i in e.interface:
585                                if i.name == ifname:
586                                    addr = i.get_attribute('ip4_address')
587                                    subs = i.substrate[0]
588                                    break
589                            else:
590                                raise service_error(service_error.internal,
591                                        "No interface %s on element %s" % \
592                                                (ifname, e.name))
593                            # XXX: do netmask right
594                            if type =='link':
595                                s = ("tb-allow-external ${%s} dragonportal " + \
596                                        "ip %s vlan %s netmask 255.255.255.0\n") % \
597                                        (e.name, addr, vlan)
598                            elif type =='lan':
599                                s = ("tb-allow-external ${%s} dragonportal " + \
600                                        "ip %s vlan %s usurp %s\n") % \
601                                        (e.name, addr, vlan, subs)
602                            else:
603                                raise service_error(service_error_internal,
604                                        "Unknown DRAGON type %s" % type)
605                return s
606
607        class not_dragon:
608            def __init__(self, map):
609                self.nodes = set(map.keys())
610
611            def __call__(self, e):
612                return e.name not in self.nodes
613
614
615        t = topo.clone()
616
617        dragon_map = { }
618        for i in [ i for i in connInfo if i['type'] == 'transit']:
619            for a in i.get('fedAttr', []):
620                if a['attribute'] == 'vlan_id':
621                    vlan = a['value']
622                    break
623            else:
624                raise service_error(service_error.internal, "No vlan tag")
625            members = i.get('member', [])
626            if len(members) > 1: type = 'lan'
627            else: type = 'link'
628
629            try:
630                for m in members:
631                    if m['element'] in dragon_map:
632                        dragon_map[m['element']].append(( m['interface'], 
633                            vlan, type))
634                    else:
635                        dragon_map[m['element']] = [( m['interface'], 
636                            vlan, type),]
637            except KeyError:
638                raise service_error(service_error.req,
639                        "Missing connectivity info")
640
641        # Weed out the things we aren't going to instantiate: Segments, portal
642        # substrates, and portal interfaces.  (The copy in the for loop allows
643        # us to delete from e.elements in side the for loop).  While we're
644        # touching all the elements, we also adjust paths from the original
645        # testbed to local testbed paths and put the federation commands into
646        # the start commands
647        for e in [e for e in t.elements]:
648            if isinstance(e, topdl.Segment):
649                t.elements.remove(e)
650            if isinstance(e, topdl.Computer):
651                self.add_kit(e, self.federation_software)
652                if e.get_attribute('portal') and self.portal_startcommand:
653                    # Add local portal support software
654                    self.add_kit(e, self.portal_software)
655                    # Portals never have a user-specified start command
656                    e.set_attribute('startup', self.portal_startcommand)
657                elif self.node_startcommand:
658                    if e.get_attribute('startup'):
659                        e.set_attribute('startup', "%s \\$USER '%s'" % \
660                                (self.node_startcommand, 
661                                    e.get_attribute('startup')))
662                    else:
663                        e.set_attribute('startup', self.node_startcommand)
664
665                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
666                # Remove portal interfaces that do not connect to DRAGON
667                e.interface = [i for i in e.interface \
668                        if not i.get_attribute('portal') or i.name in dinf ]
669            # Fix software paths
670            for s in getattr(e, 'software', []):
671                s.location = re.sub("^.*/", softdir, s.location)
672
673        t.substrates = [ s.clone() for s in t.substrates ]
674        t.incorporate_elements()
675
676        # Customize the ns2 output for local portal commands and images
677        filters = []
678
679        if self.dragon_endpoint:
680            add_filter = not_dragon(dragon_map)
681            filters.append(dragon_commands(dragon_map))
682        else:
683            add_filter = None
684
685        if self.portal_command:
686            filters.append(topdl.generate_portal_command_filter(
687                self.portal_command, add_filter=add_filter))
688
689        if self.portal_image:
690            filters.append(topdl.generate_portal_image_filter(
691                self.portal_image))
692
693        if self.portal_type:
694            filters.append(topdl.generate_portal_hardware_filter(
695                self.portal_type))
696
697        # Convert to ns and write it out
698        expfile = topdl.topology_to_ns2(t, filters)
699        try:
700            f = open(expfn, "w")
701            print >>f, expfile
702            f.close()
703        except EnvironmentError:
704            raise service_error(service_error.internal,
705                    "Cannot write experiment file %s: %s" % (expfn,e))
706
707    def configure_userconf(self, services):
708        """
709        If the userconf service was imported, collect the configuration data.
710        """
711        for s in services:
712            s_name = s.get('name', '')
713            s_vis = s.get('visibility','')
714            if s_name  == 'userconfig' and s_vis == 'import':
715                # Collect ther server and certificate info.
716                u = s.get('server', None)
717                for a in s.get('fedAttr', []):
718                    if a.get('attribute',"") == 'cert':
719                        cert = a.get('value', None)
720                        break
721                else:
722                    cert = None
723
724                if cert:
725                    # Make a temporary certificate file for get_url.  The
726                    # finally clause removes it whether something goes
727                    # wrong (including an exception from get_url) or not.
728                    try:
729                        tfos, tn = tempfile.mkstemp(suffix=".pem")
730                        tf = os.fdopen(tfos, 'w')
731                        print >>tf, cert
732                        tf.close()
733                        self.log.debug("Getting userconf info: %s" % u)
734                        get_url(u, tn, tmpdir, "userconf")
735                        self.log.debug("Got userconf info: %s" % u)
736                    except EnvironmentError, e:
737                        raise service_error(service.error.internal, 
738                                "Cannot create temp file for " + 
739                                "userconfig certificates: %s e")
740                    except:
741                        t, v, st = sys.exc_info()
742                        raise service_error(service_error.internal,
743                                "Error retrieving %s: %s" % (s, v))
744                    finally:
745                        if tn: os.remove(tn)
746                else:
747                    raise service_error(service_error.req,
748                            "No certificate for retreiving userconfig")
749                break
750
751    def add_seer_node(self, topo, name, startup):
752        """
753        Add a seer node to the given topology, with the startup command passed
754        in.
755        """
756        c_node = topdl.Computer(
757                name=name, 
758                os= topdl.OperatingSystem(
759                    attribute=[
760                    { 'attribute': 'osid', 
761                        'value': self.local_seer_image },
762                    ]),
763                attribute=[
764                    { 'attribute': 'startup', 'value': startup },
765                    ]
766                )
767        self.add_kit(c_node, self.local_seer_software)
768        topo.elements.append(c_node)
769
770    def configure_seer_services(self, services, topo, softdir):
771        """
772        Make changes to the topology required for the seer requests being made.
773        Specifically, add any control or master nodes required and set up the
774        start commands on the nodes to interconnect them.
775        """
776        local_seer = False      # True if we need to add a control node
777        collect_seer = False    # True if there is a seer-master node
778        seer_master= False      # True if we need to add the seer-master
779        for s in services:
780            s_name = s.get('name', '')
781            s_vis = s.get('visibility','')
782
783            if s_name  == 'local_seer_control' and s_vis == 'export':
784                local_seer = True
785            elif s_name == 'seer_master':
786                if s_vis == 'import':
787                    collect_seer = True
788                elif s_vis == 'export':
789                    seer_master = True
790       
791        # We've got the whole picture now, so add nodes if needed and configure
792        # them to interconnect properly.
793        if local_seer or seer_master:
794            # Copy local seer control node software to the tempdir
795            for l, f in self.local_seer_software:
796                base = os.path.basename(f)
797                copy_file(f, "%s/%s" % (softdir, base))
798        # If we're collecting seers somewhere the controllers need to talk to
799        # the master.  In testbeds that export the service, that will be a
800        # local node that we'll add below.  Elsewhere it will be the control
801        # portal that will port forward to the exporting master.
802        if local_seer:
803            if collect_seer:
804                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
805            else:
806                startup = self.local_seer_start
807            self.add_seer_node(topo, 'control', startup)
808        # If this is the seer master, add that node, too.
809        if seer_master:
810            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
811
812
813
814    def StartSegment(self, req, fid):
815
816        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
817
818        err = None  # Any service_error generated after tmpdir is created
819        rv = None   # Return value from segment creation
820
821        try:
822            req = req['StartSegmentRequestBody']
823        except KeyError:
824            raise service_error(server_error.req, "Badly formed request")
825
826        connInfo = req.get('connection', [])
827        services = req.get('service', [])
828        auth_attr = req['allocID']['fedid']
829        aid = "%s" % auth_attr
830        attrs = req.get('fedAttr', [])
831        if not self.auth.check_attribute(fid, auth_attr):
832            raise service_error(service_error.access, "Access denied")
833        else:
834            # See if this is a replay of an earlier succeeded StartSegment -
835            # sometimes SSL kills 'em.  If so, replay the response rather than
836            # redoing the allocation.
837            self.state_lock.acquire()
838            retval = self.allocation[aid].get('started', None)
839            self.state_lock.release()
840            if retval:
841                self.log.warning("Duplicate StartSegment for %s: " % aid + \
842                        "replaying response")
843                return retval
844
845        # A new request.  Do it.
846
847        if req.has_key('segmentdescription') and \
848                req['segmentdescription'].has_key('topdldescription'):
849            topo = \
850                topdl.Topology(**req['segmentdescription']['topdldescription'])
851        else:
852            raise service_error(service_error.req, 
853                    "Request missing segmentdescription'")
854       
855        master = req.get('master', False)
856
857        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
858        try:
859            tmpdir = tempfile.mkdtemp(prefix="access-")
860            softdir = "%s/software" % tmpdir
861            os.mkdir(softdir)
862        except EnvironmentError:
863            raise service_error(service_error.internal, "Cannot create tmp dir")
864
865        # Try block alllows us to clean up temporary files.
866        try:
867            sw = set()
868            for e in topo.elements:
869                for s in getattr(e, 'software', []):
870                    sw.add(s.location)
871            for s in sw:
872                self.log.debug("Retrieving %s" % s)
873                try:
874                    get_url(s, certfile, softdir)
875                except:
876                    t, v, st = sys.exc_info()
877                    raise service_error(service_error.internal,
878                            "Error retrieving %s: %s" % (s, v))
879
880            # Copy local federation and portal node software to the tempdir
881            for s in (self.federation_software, self.portal_software):
882                for l, f in s:
883                    base = os.path.basename(f)
884                    copy_file(f, "%s/%s" % (softdir, base))
885
886            ename = None
887            pubkey_base = None
888            secretkey_base = None
889            for a in attrs:
890                if a['attribute'] in configs:
891                    try:
892                        self.log.debug("Retrieving %s from %s" % \
893                                (a['attribute'], a['value']))
894                        get_url(a['value'], certfile, tmpdir)
895                    except:
896                        t, v, st = sys.exc_info()
897                        raise service_error(service_error.internal,
898                                "Error retrieving %s: %s" % (s, v))
899                if a['attribute'] == 'ssh_pubkey':
900                    pubkey_base = a['value'].rpartition('/')[2]
901                if a['attribute'] == 'ssh_secretkey':
902                    secretkey_base = a['value'].rpartition('/')[2]
903                if a['attribute'] == 'experiment_name':
904                    ename = a['value']
905
906            if not ename:
907                ename = ""
908                for i in range(0,5):
909                    ename += random.choice(string.ascii_letters)
910                self.log.warn("No experiment name: picked one randomly: %s" \
911                        % ename)
912
913            if not pubkey_base:
914                raise service_error(service_error.req, 
915                        "No public key attribute")
916
917            if not secretkey_base:
918                raise service_error(service_error.req, 
919                        "No secret key attribute")
920
921            # If the userconf service was imported, collect the configuration
922            # data.
923            self.configure_userconf(services)
924            self.configure_seer_services(services, topo, softdir)
925
926            proj = None
927            user = None
928            self.state_lock.acquire()
929            if self.allocation.has_key(aid):
930                proj = self.allocation[aid].get('project', None)
931                if not proj: 
932                    proj = self.allocation[aid].get('sproject', None)
933                user = self.allocation[aid].get('user', None)
934                self.allocation[aid]['experiment'] = ename
935                self.allocation[aid]['log'] = [ ]
936                # Create a logger that logs to the experiment's state object as
937                # well as to the main log file.
938                alloc_log = logging.getLogger('fedd.access.%s' % ename)
939                h = logging.StreamHandler(
940                        list_log.list_log(self.allocation[aid]['log']))
941                # XXX: there should be a global one of these rather than
942                # repeating the code.
943                h.setFormatter(logging.Formatter(
944                    "%(asctime)s %(name)s %(message)s",
945                            '%d %b %y %H:%M:%S'))
946                alloc_log.addHandler(h)
947                self.write_state()
948            self.state_lock.release()
949
950            if not proj:
951                raise service_error(service_error.internal, 
952                        "Can't find project for %s" %aid)
953
954            if not user:
955                raise service_error(service_error.internal, 
956                        "Can't find creation user for %s" %aid)
957
958            self.export_store_info(certfile, proj, ename, connInfo)
959            self.import_store_info(certfile, connInfo)
960
961            expfile = "%s/experiment.tcl" % tmpdir
962
963            self.generate_portal_configs(topo, pubkey_base, 
964                    secretkey_base, tmpdir, master, proj, ename, connInfo, 
965                    services)
966            self.generate_ns2(topo, expfile, 
967                    "/proj/%s/software/%s/" % (proj, ename), connInfo)
968
969            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
970                    debug=self.create_debug, log=alloc_log)
971            rv = starter(self, ename, proj, user, expfile, tmpdir)
972            rvtopo = topo.clone()
973
974            # Copy the assigned names into the return topology
975            embedding = [ ]
976            for n in starter.node:
977                embedding.append({ 
978                    'toponame': n,
979                    'physname': ["%s%s" %  (starter.node[n], self.domain)],
980                    })
981
982
983        except service_error, e:
984            err = e
985        except e:
986            err = service_error(service_error.internal, str(e))
987
988        # Walk up tmpdir, deleting as we go
989        if self.cleanup:
990            self.log.debug("[StartSegment]: removing %s" % tmpdir)
991            for path, dirs, files in os.walk(tmpdir, topdown=False):
992                for f in files:
993                    os.remove(os.path.join(path, f))
994                for d in dirs:
995                    os.rmdir(os.path.join(path, d))
996            os.rmdir(tmpdir)
997        else:
998            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
999
1000        if rv:
1001            # Grab the log (this is some anal locking, but better safe than
1002            # sorry)
1003            self.state_lock.acquire()
1004            logv = "".join(self.allocation[aid]['log'])
1005            # It's possible that the StartSegment call gets retried (!).
1006            # if the 'started' key is in the allocation, we'll return it rather
1007            # than redo the setup.
1008            self.allocation[aid]['started'] = { 
1009                    'allocID': req['allocID'],
1010                    'allocationLog': logv,
1011                    'segmentdescription': { 
1012                        'topdldescription': rvtopo.to_dict()
1013                        },
1014                    'embedding': embedding
1015                    }
1016            retval = copy.copy(self.allocation[aid]['started'])
1017            self.write_state()
1018            self.state_lock.release()
1019            return retval
1020        elif err:
1021            raise service_error(service_error.federant,
1022                    "Swapin failed: %s" % err)
1023        else:
1024            raise service_error(service_error.federant, "Swapin failed")
1025
1026    def TerminateSegment(self, req, fid):
1027        try:
1028            req = req['TerminateSegmentRequestBody']
1029        except KeyError:
1030            raise service_error(server_error.req, "Badly formed request")
1031
1032        auth_attr = req['allocID']['fedid']
1033        aid = "%s" % auth_attr
1034        attrs = req.get('fedAttr', [])
1035        if not self.auth.check_attribute(fid, auth_attr):
1036            raise service_error(service_error.access, "Access denied")
1037
1038        self.state_lock.acquire()
1039        if self.allocation.has_key(aid):
1040            proj = self.allocation[aid].get('project', None)
1041            if not proj: 
1042                proj = self.allocation[aid].get('sproject', None)
1043            user = self.allocation[aid].get('user', None)
1044            ename = self.allocation[aid].get('experiment', None)
1045        else:
1046            proj = None
1047            user = None
1048            ename = None
1049        self.state_lock.release()
1050
1051        if not proj:
1052            raise service_error(service_error.internal, 
1053                    "Can't find project for %s" % aid)
1054
1055        if not user:
1056            raise service_error(service_error.internal, 
1057                    "Can't find creation user for %s" % aid)
1058        if not ename:
1059            raise service_error(service_error.internal, 
1060                    "Can't find experiment name for %s" % aid)
1061        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1062                debug=self.create_debug)
1063        stopper(self, user, proj, ename)
1064        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.