source: fedd/federation/emulab_access.py @ f771e2f

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f771e2f was f771e2f, checked in by Ted Faber <faber@…>, 14 years ago

Beginning cleanup of access controllers/plugins for general release. Separating function out to a base class and cleaning up some of the code. Removing all the proxy code, and much of the resources stuff in the access requests.

  • Property mode set to 100644
File size: 35.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import stat # for chmod constants
5import re
6import random
7import string
8import copy
9import pickle
10import logging
11import subprocess
12
13from threading import *
14from M2Crypto.SSL import SSLError
15
16from access import access_base
17
18from util import *
19from allocate_project import allocate_project_local, allocate_project_remote
20from access_project import access_project
21from fedid import fedid, generate_fedid
22from authorizer import authorizer
23from service_error import service_error
24from remote_service import xmlrpc_handler, soap_handler, service_caller
25
26import httplib
27import tempfile
28from urlparse import urlparse
29
30import topdl
31import list_log
32import proxy_emulab_segment
33import local_emulab_segment
34
35
36# Make log messages disappear if noone configures a fedd logger
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.access")
41fl.addHandler(nullHandler())
42
43class access(access_base):
44    """
45    The implementation of access control based on mapping users to projects.
46
47    Users can be mapped to existing projects or have projects created
48    dynamically.  This implements both direct requests and proxies.
49    """
50
51    proxy_RequestAccess= service_caller('RequestAccess')
52    proxy_ReleaseAccess= service_caller('ReleaseAccess')
53
54    def __init__(self, config=None, auth=None):
55        """
56        Initializer.  Pulls parameters out of the ConfigParser's access section.
57        """
58
59        access_base.__init__(self, config, auth)
60
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.boss = config.get("access", "boss")
64        self.ops = config.get("access", "ops")
65        self.domain = config.get("access", "domain")
66        self.fileserver = config.get("access", "fileserver")
67        self.eventserver = config.get("access", "eventserver")
68        self.userconfdir = config.get("access","userconfdir")
69        self.userconfcmd = config.get("access","userconfcmd")
70        self.userconfurl = config.get("access","userconfurl")
71        self.federation_software = config.get("access", "federation_software")
72        self.portal_software = config.get("access", "portal_software")
73        self.local_seer_software = config.get("access", "local_seer_software")
74        self.local_seer_image = config.get("access", "local_seer_image")
75        self.local_seer_start = config.get("access", "local_seer_start")
76        self.seer_master_start = config.get("access", "seer_master_start")
77        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
78        self.ssh_pubkey_file = config.get("access","ssh_pubkey_file")
79        self.ssh_port = config.get("access","ssh_port") or "22"
80
81        self.dragon_endpoint = config.get("access", "dragon")
82        self.dragon_vlans = config.get("access", "dragon_vlans")
83        self.deter_internal = config.get("access", "deter_internal")
84
85        self.tunnel_config = config.getboolean("access", "tunnel_config")
86        self.portal_command = config.get("access", "portal_command")
87        self.portal_image = config.get("access", "portal_image")
88        self.portal_type = config.get("access", "portal_type") or "pc"
89        self.portal_startcommand = config.get("access", "portal_startcommand")
90        self.node_startcommand = config.get("access", "node_startcommand")
91
92        self.federation_software = self.software_list(self.federation_software)
93        self.portal_software = self.software_list(self.portal_software)
94        self.local_seer_software = self.software_list(self.local_seer_software)
95
96        self.access_type = self.access_type.lower()
97        if self.access_type == 'remote_emulab':
98            self.start_segment = proxy_emulab_segment.start_segment
99            self.stop_segment = proxy_emulab_segment.stop_segment
100        elif self.access_type == 'local_emulab':
101            self.start_segment = local_emulab_segment.start_segment
102            self.stop_segment = local_emulab_segment.stop_segment
103        else:
104            self.start_segment = None
105            self.stop_segment = None
106
107        self.restricted = [ ]
108        self.projects = { }
109        self.keys = { }
110        self.types = { }
111        self.allocation = { }
112        self.state = { 
113            'projects': self.projects,
114            'allocation' : self.allocation,
115            'keys' : self.keys,
116            'types': self.types
117        }
118        self.access = { }
119        if config.has_option("access", "accessdb"):
120            self.read_access(config.get("access", "accessdb"))
121
122        if not self.local_seer_image or not self.local_seer_software:
123            self.exports.discard('local_seer_control')
124            self.exports.discard('seer_master')
125
126        if not self.local_seer_start:
127            self.exports.discard('local_seer_control')
128
129        if not self.seer_master_start:
130            self.exports.discard('seer_master')
131
132        tb = config.get('access', 'testbed')
133        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
134        else: self.testbed = [ ]
135
136        if config.has_option("access", "accessdb"):
137            self.read_access(config.get("access", "accessdb"), 
138                    self.make_access_project)
139
140        # read state in the base_class
141        self.state_lock.acquire()
142        self.allocation = self.state['allocation']
143        self.projects = self.state['projects']
144        self.keys = self.state['keys']
145        self.types = self.state['types']
146        # Add the ownership attributes to the authorizer.  Note that the
147        # indices of the allocation dict are strings, but the attributes are
148        # fedids, so there is a conversion.
149        for k in self.allocation.keys():
150            for o in self.allocation[k].get('owners', []):
151                self.auth.set_attribute(o, fedid(hexstr=k))
152            if self.allocation[k].has_key('userconfig'):
153                sfid = self.allocation[k]['userconfig']
154                fid = fedid(hexstr=sfid)
155                self.auth.set_attribute(fid, "/%s" % sfid)
156        self.state_lock.release()
157
158
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment",
164                self.TerminateSegment),
165            }
166        self.xmlrpc_services =  {\
167            'RequestAccess': xmlrpc_handler('RequestAccess',
168                self.RequestAccess),
169            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
170                self.ReleaseAccess),
171            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
172            'TerminateSegment': xmlrpc_handler('TerminateSegment',
173                self.TerminateSegment),
174            }
175
176        self.call_SetValue = service_caller('SetValue')
177        self.call_GetValue = service_caller('GetValue', log=self.log)
178
179        if not config.has_option("allocate", "uri"):
180            self.allocate_project = \
181                allocate_project_local(config, auth)
182        else:
183            self.allocate_project = \
184                allocate_project_remote(config, auth)
185
186
187        # If the project allocator exports services, put them in this object's
188        # maps so that classes that instantiate this can call the services.
189        self.soap_services.update(self.allocate_project.soap_services)
190        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
191
192    @staticmethod
193    def make_access_project(str):
194        def parse_name(n):
195            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
196            else: return n
197
198        str = str.strip()
199        if str.startswith('(') and str.endswith(')'):
200            str = str[1:-1]
201            names = [ s.strip() for s in str.split(",")]
202            if len(names) > 3:
203                raise self.parse_error("More than three fields in name")
204            first = names[0].split(":")
205            if first == 'fedid:':
206                del first[0]
207                first[0] = fedid(hexstr=first[0])
208            names[0] = access_project(first[0], first[1:])
209
210            for i in range(1,2):
211                names[i] = parse_name(names[i])
212
213            return tuple(names)
214        else:
215            raise self.parse_error('Bad mapping (unbalanced parens)')
216
217
218    def lookup_access(self, req, fid):
219        """
220        Determine the allowed access for this request.  Return the access and
221        which fields are dynamic.
222
223        The fedid is needed to construct the request
224        """
225        # Return values
226        rp = access_project(None, ())
227        ru = None
228        # This maps a valid user to the Emulab projects and users to use
229        found, match = self.lookup_access_base(req, fid)
230        tb, project, user = match
231       
232        if found == None:
233            raise service_error(service_error.access,
234                    "Access denied - cannot map access")
235
236        # resolve <dynamic> and <same> in found
237        dyn_proj = False
238        dyn_create_user = False
239        dyn_service_user = False
240
241        if found[0].name == "<same>":
242            if project != None:
243                rp.name = project
244            else : 
245                raise service_error(\
246                        service_error.server_config,
247                        "Project matched <same> when no project given")
248        elif found[0].name == "<dynamic>":
249            rp.name = None
250            dyn_proj = True
251        else:
252            rp.name = found[0].name
253        rp.node_types = found[0].node_types;
254
255        if found[1] == "<same>":
256            if user_match == "<any>":
257                if user != None: rcu = user[0]
258                else: raise service_error(\
259                        service_error.server_config,
260                        "Matched <same> on anonymous request")
261            else:
262                rcu = user_match
263        elif found[1] == "<dynamic>":
264            rcu = None
265            dyn_create_user = True
266        else:
267            rcu = found[1]
268       
269        if found[2] == "<same>":
270            if user_match == "<any>":
271                if user != None: rsu = user[0]
272                else: raise service_error(\
273                        service_error.server_config,
274                        "Matched <same> on anonymous request")
275            else:
276                rsu = user_match
277        elif found[2] == "<dynamic>":
278            rsu = None
279            dyn_service_user = True
280        else:
281            rsu = found[2]
282
283        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
284                [ fid ]
285
286    def do_project_allocation(self, dyn, project, user):
287        """
288        Call the project allocation routines and return the info.
289        """
290        if dyn: 
291            # Compose the dynamic project request
292            # (only dynamic, dynamic currently allowed)
293            preq = { 'AllocateProjectRequestBody': \
294                        { 'project' : {\
295                            'user': [ \
296                            { \
297                                'access': [ { 
298                                    'sshPubkey': self.ssh_pubkey_file } ],
299                                 'role': "serviceAccess",\
300                            }, \
301                            { \
302                                'access': [ { 
303                                    'sshPubkey': self.ssh_pubkey_file } ],
304                                 'role': "experimentCreation",\
305                            }, \
306                            ], \
307                            }\
308                        }\
309                    }
310            return self.allocate_project.dynamic_project(preq)
311        else:
312            preq = {'StaticProjectRequestBody' : \
313                    { 'project': \
314                        { 'name' : { 'localname' : project },\
315                          'user' : [ \
316                            {\
317                                'userID': { 'localname' : user }, \
318                                'access': [ { 
319                                    'sshPubkey': self.ssh_pubkey_file } ],
320                                'role': 'experimentCreation'\
321                            },\
322                            {\
323                                'userID': { 'localname' : user}, \
324                                'access': [ { 
325                                    'sshPubkey': self.ssh_pubkey_file } ],
326                                'role': 'serviceAccess'\
327                            },\
328                        ]}\
329                    }\
330            }
331            return self.allocate_project.static_project(preq)
332
333    def save_project_state(self, aid, ap, dyn, owners):
334        """
335        Parse out and save the information relevant to the project created for
336        this experiment.  That info is largely in ap and owners.  dyn indicates
337        that the project was created dynamically.  Return the user and project
338        names.
339        """
340        self.state_lock.acquire()
341        self.allocation[aid] = { }
342        try:
343            pname = ap['project']['name']['localname']
344        except KeyError:
345            pname = None
346
347        if dyn:
348            if not pname:
349                self.state_lock.release()
350                raise service_error(service_error.internal,
351                        "Misformed allocation response?")
352            if pname in self.projects: self.projects[pname] += 1
353            else: self.projects[pname] = 1
354            self.allocation[aid]['project'] = pname
355        else:
356            # sproject is a static project associated with this allocation.
357            self.allocation[aid]['sproject'] = pname
358
359        self.allocation[aid]['keys'] = [ ]
360
361        try:
362            for u in ap['project']['user']:
363                uname = u['userID']['localname']
364                if u['role'] == 'experimentCreation':
365                    self.allocation[aid]['user'] = uname
366                for k in [ k['sshPubkey'] for k in u['access'] \
367                        if k.has_key('sshPubkey') ]:
368                    kv = "%s:%s" % (uname, k)
369                    if self.keys.has_key(kv): self.keys[kv] += 1
370                    else: self.keys[kv] = 1
371                    self.allocation[aid]['keys'].append((uname, k))
372        except KeyError:
373            self.state_lock.release()
374            raise service_error(service_error.internal,
375                    "Misformed allocation response?")
376
377        self.allocation[aid]['owners'] = owners
378        self.write_state()
379        self.state_lock.release()
380        return (pname, uname)
381
382    def RequestAccess(self, req, fid):
383        """
384        Handle the access request.  Proxy if not for us.
385
386        Parse out the fields and make the allocations or rejections if for us,
387        otherwise, assuming we're willing to proxy, proxy the request out.
388        """
389
390        def gateway_hardware(h):
391            if h == 'GWTYPE': return self.portal_type or 'GWTYPE'
392            else: return h
393
394        def get_export_project(svcs):
395            """
396            if the service requests includes one to export a project, return
397            that project.
398            """
399            rv = None
400            for s in svcs:
401                if s.get('name', '') == 'project_export' and \
402                        s.get('visibility', '') == 'export':
403                    if not rv: 
404                        for a in s.get('feddAttr', []):
405                            if a.get('attribute', '') == 'project' \
406                                    and 'value' in a:
407                                rv = a['value']
408                    else:
409                        raise service_error(service_error, access, 
410                                'Requesting multiple project exports is ' + \
411                                        'not supported');
412            return rv
413
414        # The dance to get into the request body
415        if req.has_key('RequestAccessRequestBody'):
416            req = req['RequestAccessRequestBody']
417        else:
418            raise service_error(service_error.req, "No request!?")
419
420
421        found, dyn, owners = self.lookup_access(req, fid)
422        ap = None
423
424        # if this includes a project export request and the exported
425        # project is not the access project, access denied.
426        if 'service' in req:
427            ep = get_export_project(req['service'])
428            if ep and ep != found[0].name:
429                raise service_error(service_error.access,
430                        "Cannot export %s" % ep)
431
432        if self.ssh_pubkey_file:
433            ap = self.do_project_allocation(dyn[1], found[0].name, found[1])
434        else:
435            raise service_error(service_error.internal, 
436                    "SSH access parameters required")
437        # keep track of what's been added
438        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
439        aid = unicode(allocID)
440
441        pname, uname = self.save_project_state(aid, ap, dyn[1], owners)
442
443        services, svc_state = self.export_services(req.get('service',[]),
444                pname, uname)
445        self.state_lock.acquire()
446        # Store services state in global state
447        for k, v in svc_state.items():
448            self.allocation[aid][k] = v
449        self.write_state()
450        self.state_lock.release()
451        # Give the owners the right to change this allocation
452        for o in owners:
453            self.auth.set_attribute(o, allocID)
454        try:
455            f = open("%s/%s.pem" % (self.certdir, aid), "w")
456            print >>f, alloc_cert
457            f.close()
458        except EnvironmentError, e:
459            raise service_error(service_error.internal, 
460                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
461        resp = self.build_access_response({ 'fedid': allocID } ,
462                ap, services)
463        return resp
464
465    def ReleaseAccess(self, req, fid):
466        # The dance to get into the request body
467        if req.has_key('ReleaseAccessRequestBody'):
468            req = req['ReleaseAccessRequestBody']
469        else:
470            raise service_error(service_error.req, "No request!?")
471
472        if req.has_key('destinationTestbed'):
473            dt = unpack_id(req['destinationTestbed'])
474        else:
475            dt = None
476
477        if dt == None or dt in self.testbed:
478            # Local request
479            try:
480                if req['allocID'].has_key('localname'):
481                    auth_attr = aid = req['allocID']['localname']
482                elif req['allocID'].has_key('fedid'):
483                    aid = unicode(req['allocID']['fedid'])
484                    auth_attr = req['allocID']['fedid']
485                else:
486                    raise service_error(service_error.req,
487                            "Only localnames and fedids are understood")
488            except KeyError:
489                raise service_error(service_error.req, "Badly formed request")
490
491            self.log.debug("[access] deallocation requested for %s", aid)
492            if not self.auth.check_attribute(fid, auth_attr):
493                self.log.debug("[access] deallocation denied for %s", aid)
494                raise service_error(service_error.access, "Access Denied")
495
496            # If we know this allocation, reduce the reference counts and
497            # remove the local allocations.  Otherwise report an error.  If
498            # there is an allocation to delete, del_users will be a dictonary
499            # of sets where the key is the user that owns the keys in the set.
500            # We use a set to avoid duplicates.  del_project is just the name
501            # of any dynamic project to delete.  We're somewhat lazy about
502            # deleting authorization attributes.  Having access to something
503            # that doesn't exist isn't harmful.
504            del_users = { }
505            del_project = None
506            del_types = set()
507
508            if self.allocation.has_key(aid):
509                self.log.debug("Found allocation for %s" %aid)
510                self.state_lock.acquire()
511                for k in self.allocation[aid]['keys']:
512                    kk = "%s:%s" % k
513                    self.keys[kk] -= 1
514                    if self.keys[kk] == 0:
515                        if not del_users.has_key(k[0]):
516                            del_users[k[0]] = set()
517                        del_users[k[0]].add(k[1])
518                        del self.keys[kk]
519
520                if self.allocation[aid].has_key('project'):
521                    pname = self.allocation[aid]['project']
522                    self.projects[pname] -= 1
523                    if self.projects[pname] == 0:
524                        del_project = pname
525                        del self.projects[pname]
526
527                if self.allocation[aid].has_key('types'):
528                    for t in self.allocation[aid]['types']:
529                        self.types[t] -= 1
530                        if self.types[t] == 0:
531                            if not del_project: del_project = t[0]
532                            del_types.add(t[1])
533                            del self.types[t]
534
535                del self.allocation[aid]
536                self.write_state()
537                self.state_lock.release()
538                # If we actually have resources to deallocate, prepare the call.
539                if del_project or del_users:
540                    msg = { 'project': { }}
541                    if del_project:
542                        msg['project']['name']= {'localname': del_project}
543                    users = [ ]
544                    for u in del_users.keys():
545                        users.append({ 'userID': { 'localname': u },\
546                            'access' :  \
547                                    [ {'sshPubkey' : s } for s in del_users[u]]\
548                        })
549                    if users: 
550                        msg['project']['user'] = users
551                    if len(del_types) > 0:
552                        msg['resources'] = { 'node': \
553                                [ {'hardware': [ h ] } for h in del_types ]\
554                            }
555                    if self.allocate_project.release_project:
556                        msg = { 'ReleaseProjectRequestBody' : msg}
557                        self.allocate_project.release_project(msg)
558                # And remove the access cert
559                cf = "%s/%s.pem" % (self.certdir, aid)
560                self.log.debug("Removing %s" % cf)
561                os.remove(cf)
562                return { 'allocID': req['allocID'] } 
563            else:
564                raise service_error(service_error.req, "No such allocation")
565
566        else:
567            if self.allow_proxy:
568                resp = self.proxy_ReleaseAccess.call_service(dt, req,
569                            self.cert_file, self.cert_pwd,
570                            self.trusted_certs)
571                if resp.has_key('ReleaseAccessResponseBody'):
572                    return resp['ReleaseAccessResponseBody']
573                else:
574                    return None
575            else:
576                raise service_error(service_error.access,
577                        "Access proxying denied")
578
579    def generate_ns2(self, topo, expfn, softdir, master, connInfo):
580        class dragon_commands:
581            """
582            Functor to spit out approrpiate dragon commands for nodes listed in
583            the connectivity description.  The constructor makes a dict mapping
584            dragon nodes to their parameters and the __call__ checks each
585            element in turn for membership.
586            """
587            def __init__(self, map):
588                self.node_info = map
589
590            def __call__(self, e):
591                s = ""
592                if isinstance(e, topdl.Computer):
593                    if self.node_info.has_key(e.name):
594                        info = self.node_info[e.name]
595                        for ifname, vlan, type in info:
596                            for i in e.interface:
597                                if i.name == ifname:
598                                    addr = i.get_attribute('ip4_address')
599                                    subs = i.substrate[0]
600                                    break
601                            else:
602                                raise service_error(service_error.internal,
603                                        "No interface %s on element %s" % \
604                                                (ifname, e.name))
605                            # XXX: do netmask right
606                            if type =='link':
607                                s = ("tb-allow-external ${%s} dragonportal " + \
608                                        "ip %s vlan %s netmask 255.255.255.0\n") % \
609                                        (e.name, addr, vlan)
610                            elif type =='lan':
611                                s = ("tb-allow-external ${%s} dragonportal " + \
612                                        "ip %s vlan %s usurp %s\n") % \
613                                        (e.name, addr, vlan, subs)
614                            else:
615                                raise service_error(service_error_internal,
616                                        "Unknown DRAGON type %s" % type)
617                return s
618
619        class not_dragon:
620            def __init__(self, map):
621                self.nodes = set(map.keys())
622
623            def __call__(self, e):
624                return e.name not in self.nodes
625
626
627        t = topo.clone()
628
629        dragon_map = { }
630        for i in [ i for i in connInfo if i['type'] == 'transit']:
631            for a in i.get('fedAttr', []):
632                if a['attribute'] == 'vlan_id':
633                    vlan = a['value']
634                    break
635            else:
636                raise service_error(service_error.internal, 
637                        "No vlan tag")
638            members = i.get('member', [])
639            if len(members) > 1: type = 'lan'
640            else: type = 'link'
641
642            try:
643                for m in members:
644                    if dragon_map.has_key(m['element']):
645                        dragon_map[m['element']].append(( m['interface'], 
646                            vlan, type))
647                    else:
648                        dragon_map[m['element']] = [( m['interface'], 
649                            vlan, type),]
650            except KeyError:
651                raise service_error(service_error.req,
652                        "Missing connectivity info")
653
654        # Weed out the things we aren't going to instantiate: Segments, portal
655        # substrates, and portal interfaces.  (The copy in the for loop allows
656        # us to delete from e.elements in side the for loop).  While we're
657        # touching all the elements, we also adjust paths from the original
658        # testbed to local testbed paths and put the federation commands into
659        # the start commands
660        for e in [e for e in t.elements]:
661            if isinstance(e, topdl.Segment):
662                t.elements.remove(e)
663            if isinstance(e, topdl.Computer):
664                self.add_kit(e, self.federation_software)
665                if e.get_attribute('portal') and self.portal_startcommand:
666                    # Add local portal support software
667                    self.add_kit(e, self.portal_software)
668                    # Portals never have a user-specified start command
669                    e.set_attribute('startup', self.portal_startcommand)
670                elif self.node_startcommand:
671                    if e.get_attribute('startup'):
672                        e.set_attribute('startup', "%s \\$USER '%s'" % \
673                                (self.node_startcommand, 
674                                    e.get_attribute('startup')))
675                    else:
676                        e.set_attribute('startup', self.node_startcommand)
677
678                dinf = [i[0] for i in dragon_map.get(e.name, []) ]
679                # Remove portal interfaces that do not connect to DRAGON
680                e.interface = [i for i in e.interface \
681                        if not i.get_attribute('portal') or i.name in dinf ]
682            # Fix software paths
683            for s in getattr(e, 'software', []):
684                s.location = re.sub("^.*/", softdir, s.location)
685
686        t.substrates = [ s.clone() for s in t.substrates ]
687        t.incorporate_elements()
688
689        # Customize the ns2 output for local portal commands and images
690        filters = []
691
692        if self.dragon_endpoint:
693            add_filter = not_dragon(dragon_map)
694            filters.append(dragon_commands(dragon_map))
695        else:
696            add_filter = None
697
698        if self.portal_command:
699            filters.append(topdl.generate_portal_command_filter(
700                self.portal_command, add_filter=add_filter))
701
702        if self.portal_image:
703            filters.append(topdl.generate_portal_image_filter(
704                self.portal_image))
705
706        if self.portal_type:
707            filters.append(topdl.generate_portal_hardware_filter(
708                self.portal_type))
709
710        # Convert to ns and write it out
711        expfile = topdl.topology_to_ns2(t, filters)
712        try:
713            f = open(expfn, "w")
714            print >>f, expfile
715            f.close()
716        except EnvironmentError:
717            raise service_error(service_error.internal,
718                    "Cannot write experiment file %s: %s" % (expfn,e))
719
720    def configure_userconf(self, services):
721        """
722        If the userconf service was imported, collect the configuration data.
723        """
724        for s in services:
725            s_name = s.get('name', '')
726            s_vis = s.get('visibility','')
727            if s_name  == 'userconfig' and s_vis == 'import':
728                # Collect ther server and certificate info.
729                u = s.get('server', None)
730                for a in s.get('fedAttr', []):
731                    if a.get('attribute',"") == 'cert':
732                        cert = a.get('value', None)
733                        break
734                else:
735                    cert = None
736
737                if cert:
738                    # Make a temporary certificate file for get_url.  The
739                    # finally clause removes it whether something goes
740                    # wrong (including an exception from get_url) or not.
741                    try:
742                        tfos, tn = tempfile.mkstemp(suffix=".pem")
743                        tf = os.fdopen(tfos, 'w')
744                        print >>tf, cert
745                        tf.close()
746                        self.log.debug("Getting userconf info: %s" % u)
747                        get_url(u, tn, tmpdir, "userconf")
748                        self.log.debug("Got userconf info: %s" % u)
749                    except EnvironmentError, e:
750                        raise service_error(service.error.internal, 
751                                "Cannot create temp file for " + 
752                                "userconfig certificates: %s e")
753                    except:
754                        t, v, st = sys.exc_info()
755                        raise service_error(service_error.internal,
756                                "Error retrieving %s: %s" % (s, v))
757                    finally:
758                        if tn: os.remove(tn)
759                else:
760                    raise service_error(service_error.req,
761                            "No certificate for retreiving userconfig")
762                break
763
764    def add_seer_node(self, topo, name, startup):
765        """
766        Add a seer node to the given topology, with the startup command passed
767        in.
768        """
769        c_node = topdl.Computer(
770                name=name, 
771                os= topdl.OperatingSystem(
772                    attribute=[
773                    { 'attribute': 'osid', 
774                        'value': self.local_seer_image },
775                    ]),
776                attribute=[
777                    { 'attribute': 'startup', 'value': startup },
778                    ]
779                )
780        self.add_kit(c_node, self.local_seer_software)
781        topo.elements.append(c_node)
782
783    def configure_seer_services(self, services, topo, softdir):
784        local_seer = False
785        collect_seer = False
786        seer_master= False
787        for s in services:
788            s_name = s.get('name', '')
789            s_vis = s.get('visibility','')
790
791            if s_name  == 'local_seer_control'  and s_vis == 'export':
792                local_seer = True
793            elif s_name == 'seer_master':
794                if s_vis == 'import':
795                    collect_seer = True
796                elif s_vis == 'export':
797                    seer_master = True
798       
799        # We've got the whole picture now, so add nodes if needed and configure
800        # them to interconnect properly.
801        if local_seer or seer_master:
802            # Copy local seer control node software to the tempdir
803            for l, f in self.local_seer_software:
804                base = os.path.basename(f)
805                copy_file(f, "%s/%s" % (softdir, base))
806        # If we're collecting seers somewhere the controllers need to talk to
807        # the master.  In testbeds that export the service, that will be a
808        # local node that we'll add below.  Elsewhere it will be the control
809        # portal that will port forward to the exporting master.
810        if local_seer:
811            if collect_seer:
812                startup = "%s -C %s" % (self.local_seer_start, "seer-master")
813            else:
814                startup = self.local_seer_start
815            self.add_seer_node(topo, 'control', startup)
816        # If this is the seer master, add that node, too.
817        if seer_master:
818            self.add_seer_node(topo, 'seer-master', self.seer_master_start)
819
820
821
822    def StartSegment(self, req, fid):
823
824        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
825
826        err = None  # Any service_error generated after tmpdir is created
827        rv = None   # Return value from segment creation
828
829        try:
830            req = req['StartSegmentRequestBody']
831        except KeyError:
832            raise service_error(server_error.req, "Badly formed request")
833
834        connInfo = req.get('connection', [])
835        services = req.get('service', [])
836        auth_attr = req['allocID']['fedid']
837        aid = "%s" % auth_attr
838        attrs = req.get('fedAttr', [])
839        if not self.auth.check_attribute(fid, auth_attr):
840            raise service_error(service_error.access, "Access denied")
841        else:
842            # See if this is a replay of an earlier succeeded StartSegment -
843            # sometimes SSL kills 'em.  If so, replay the response rather than
844            # redoing the allocation.
845            self.state_lock.acquire()
846            retval = self.allocation[aid].get('started', None)
847            self.state_lock.release()
848            if retval:
849                self.log.warning("Duplicate StartSegment for %s: " % aid + \
850                        "replaying response")
851                return retval
852
853        # A new request.  Do it.
854
855        if req.has_key('segmentdescription') and \
856                req['segmentdescription'].has_key('topdldescription'):
857            topo = \
858                topdl.Topology(**req['segmentdescription']['topdldescription'])
859        else:
860            raise service_error(service_error.req, 
861                    "Request missing segmentdescription'")
862       
863        master = req.get('master', False)
864
865        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
866        try:
867            tmpdir = tempfile.mkdtemp(prefix="access-")
868            softdir = "%s/software" % tmpdir
869            os.mkdir(softdir)
870        except EnvironmentError:
871            raise service_error(service_error.internal, "Cannot create tmp dir")
872
873        # Try block alllows us to clean up temporary files.
874        try:
875            sw = set()
876            for e in topo.elements:
877                for s in getattr(e, 'software', []):
878                    sw.add(s.location)
879            for s in sw:
880                self.log.debug("Retrieving %s" % s)
881                try:
882                    get_url(s, certfile, softdir)
883                except:
884                    t, v, st = sys.exc_info()
885                    raise service_error(service_error.internal,
886                            "Error retrieving %s: %s" % (s, v))
887
888            # Copy local federation and portal node software to the tempdir
889            for s in (self.federation_software, self.portal_software):
890                for l, f in s:
891                    base = os.path.basename(f)
892                    copy_file(f, "%s/%s" % (softdir, base))
893
894            ename = None
895            pubkey_base = None
896            secretkey_base = None
897            for a in attrs:
898                if a['attribute'] in configs:
899                    try:
900                        self.log.debug("Retrieving %s from %s" % \
901                                (a['attribute'], a['value']))
902                        get_url(a['value'], certfile, tmpdir)
903                    except:
904                        t, v, st = sys.exc_info()
905                        raise service_error(service_error.internal,
906                                "Error retrieving %s: %s" % (s, v))
907                if a['attribute'] == 'ssh_pubkey':
908                    pubkey_base = a['value'].rpartition('/')[2]
909                if a['attribute'] == 'ssh_secretkey':
910                    secretkey_base = a['value'].rpartition('/')[2]
911                if a['attribute'] == 'experiment_name':
912                    ename = a['value']
913
914            if not ename:
915                ename = ""
916                for i in range(0,5):
917                    ename += random.choice(string.ascii_letters)
918                self.log.warn("No experiment name: picked one randomly: %s" \
919                        % ename)
920
921            if not pubkey_base:
922                raise service_error(service_error.req, 
923                        "No public key attribute")
924
925            if not secretkey_base:
926                raise service_error(service_error.req, 
927                        "No secret key attribute")
928
929            # If the userconf service was imported, collect the configuration
930            # data.
931            self.configure_userconf(services)
932            self.configure_seer_services(services, topo, softdir)
933
934            proj = None
935            user = None
936            self.state_lock.acquire()
937            if self.allocation.has_key(aid):
938                proj = self.allocation[aid].get('project', None)
939                if not proj: 
940                    proj = self.allocation[aid].get('sproject', None)
941                user = self.allocation[aid].get('user', None)
942                self.allocation[aid]['experiment'] = ename
943                self.allocation[aid]['log'] = [ ]
944                # Create a logger that logs to the experiment's state object as
945                # well as to the main log file.
946                alloc_log = logging.getLogger('fedd.access.%s' % ename)
947                h = logging.StreamHandler(
948                        list_log.list_log(self.allocation[aid]['log']))
949                # XXX: there should be a global one of these rather than
950                # repeating the code.
951                h.setFormatter(logging.Formatter(
952                    "%(asctime)s %(name)s %(message)s",
953                            '%d %b %y %H:%M:%S'))
954                alloc_log.addHandler(h)
955                self.write_state()
956            self.state_lock.release()
957
958            if not proj:
959                raise service_error(service_error.internal, 
960                        "Can't find project for %s" %aid)
961
962            if not user:
963                raise service_error(service_error.internal, 
964                        "Can't find creation user for %s" %aid)
965
966            self.export_store_info(certfile, proj, ename, connInfo)
967            self.import_store_info(certfile, connInfo)
968
969            expfile = "%s/experiment.tcl" % tmpdir
970
971            self.generate_portal_configs(topo, pubkey_base, 
972                    secretkey_base, tmpdir, master, proj, ename, connInfo, 
973                    services)
974            self.generate_ns2(topo, expfile, 
975                    "/proj/%s/software/%s/" % (proj, ename), master, connInfo)
976
977            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
978                    debug=self.create_debug, log=alloc_log)
979            rv = starter(self, ename, proj, user, expfile, tmpdir)
980            rvtopo = topo.clone()
981
982            # Copy the assigned names into the return topology
983            embedding = [ ]
984            for n in starter.node:
985                embedding.append({ 
986                    'toponame': n,
987                    'physname': ["%s%s" %  (starter.node[n], self.domain)],
988                    })
989
990
991        except service_error, e:
992            err = e
993        except e:
994            err = service_error(service_error.internal, str(e))
995
996        # Walk up tmpdir, deleting as we go
997        if self.cleanup:
998            self.log.debug("[StartSegment]: removing %s" % tmpdir)
999            for path, dirs, files in os.walk(tmpdir, topdown=False):
1000                for f in files:
1001                    os.remove(os.path.join(path, f))
1002                for d in dirs:
1003                    os.rmdir(os.path.join(path, d))
1004            os.rmdir(tmpdir)
1005        else:
1006            self.log.debug("[StartSegment]: not removing %s" % tmpdir)
1007
1008        if rv:
1009            # Grab the log (this is some anal locking, but better safe than
1010            # sorry)
1011            self.state_lock.acquire()
1012            logv = "".join(self.allocation[aid]['log'])
1013            # It's possible that the StartSegment call gets retried (!).
1014            # if the 'started' key is in the allocation, we'll return it rather
1015            # than redo the setup.
1016            self.allocation[aid]['started'] = { 
1017                    'allocID': req['allocID'],
1018                    'allocationLog': logv,
1019                    'segmentdescription': { 
1020                        'topdldescription': rvtopo.to_dict()
1021                        },
1022                    'embedding': embedding
1023                    }
1024            retval = copy.copy(self.allocation[aid]['started'])
1025            self.write_state()
1026            self.state_lock.release()
1027            return retval
1028        elif err:
1029            raise service_error(service_error.federant,
1030                    "Swapin failed: %s" % err)
1031        else:
1032            raise service_error(service_error.federant, "Swapin failed")
1033
1034    def TerminateSegment(self, req, fid):
1035        try:
1036            req = req['TerminateSegmentRequestBody']
1037        except KeyError:
1038            raise service_error(server_error.req, "Badly formed request")
1039
1040        auth_attr = req['allocID']['fedid']
1041        aid = "%s" % auth_attr
1042        attrs = req.get('fedAttr', [])
1043        if not self.auth.check_attribute(fid, auth_attr):
1044            raise service_error(service_error.access, "Access denied")
1045
1046        self.state_lock.acquire()
1047        if self.allocation.has_key(aid):
1048            proj = self.allocation[aid].get('project', None)
1049            if not proj: 
1050                proj = self.allocation[aid].get('sproject', None)
1051            user = self.allocation[aid].get('user', None)
1052            ename = self.allocation[aid].get('experiment', None)
1053        else:
1054            proj = None
1055            user = None
1056            ename = None
1057        self.state_lock.release()
1058
1059        if not proj:
1060            raise service_error(service_error.internal, 
1061                    "Can't find project for %s" % aid)
1062
1063        if not user:
1064            raise service_error(service_error.internal, 
1065                    "Can't find creation user for %s" % aid)
1066        if not ename:
1067            raise service_error(service_error.internal, 
1068                    "Can't find experiment name for %s" % aid)
1069        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1070                debug=self.create_debug)
1071        stopper(self, user, proj, ename)
1072        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.