source: fedd/federation/access.py @ f07fa49

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since f07fa49 was f07fa49, checked in by Ted Faber <faber@…>, 15 years ago

better logging and cleanup

  • Property mode set to 100644
File size: 55.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9
10from threading import *
11import subprocess
12import signal
13import time
14
15from util import *
16from allocate_project import allocate_project_local, allocate_project_remote
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import topdl
24import list_log
25import httplib
26import tempfile
27from urlparse import urlparse
28
29
30# Make log messages disappear if noone configures a fedd logger
31class nullHandler(logging.Handler):
32    def emit(self, record): pass
33
34fl = logging.getLogger("fedd.access")
35fl.addHandler(nullHandler())
36
37class access:
38    """
39    The implementation of access control based on mapping users to projects.
40
41    Users can be mapped to existing projects or have projects created
42    dynamically.  This implements both direct requests and proxies.
43    """
44
45    class parse_error(RuntimeError): pass
46
47
48    proxy_RequestAccess= service_caller('RequestAccess')
49    proxy_ReleaseAccess= service_caller('ReleaseAccess')
50
51    def __init__(self, config=None, auth=None):
52        """
53        Initializer.  Pulls parameters out of the ConfigParser's access section.
54        """
55
56        # Make sure that the configuration is in place
57        if not config: 
58            raise RunTimeError("No config to fedd.access")
59
60        self.project_priority = config.getboolean("access", "project_priority")
61        self.allow_proxy = config.getboolean("access", "allow_proxy")
62
63        self.boss = config.get("access", "boss")
64        self.ops = config.get("access", "ops")
65        self.domain = config.get("access", "domain")
66        self.fileserver = config.get("access", "fileserver")
67        self.eventserver = config.get("access", "eventserver")
68        self.certdir = config.get("access","certdir")
69        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
70        self.create_debug = config.getboolean("access", "create_debug")
71
72        self.attrs = { }
73        self.access = { }
74        self.restricted = [ ]
75        self.projects = { }
76        self.keys = { }
77        self.types = { }
78        self.allocation = { }
79        self.state = { 
80            'projects': self.projects,
81            'allocation' : self.allocation,
82            'keys' : self.keys,
83            'types': self.types
84        }
85        self.log = logging.getLogger("fedd.access")
86        set_log_level(config, "access", self.log)
87        self.state_lock = Lock()
88
89        if auth: self.auth = auth
90        else:
91            self.log.error(\
92                    "[access]: No authorizer initialized, creating local one.")
93            auth = authorizer()
94
95        tb = config.get('access', 'testbed')
96        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
97        else: self.testbed = [ ]
98
99        if config.has_option("access", "accessdb"):
100            self.read_access(config.get("access", "accessdb"))
101
102        self.state_filename = config.get("access", "access_state")
103        self.read_state()
104
105        # Keep cert_file and cert_pwd coming from the same place
106        self.cert_file = config.get("access", "cert_file")
107        if self.cert_file:
108            self.sert_pwd = config.get("access", "cert_pw")
109        else:
110            self.cert_file = config.get("globals", "cert_file")
111            self.sert_pwd = config.get("globals", "cert_pw")
112
113        self.trusted_certs = config.get("access", "trusted_certs") or \
114                config.get("globals", "trusted_certs")
115
116        self.soap_services = {\
117            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
118            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
119            'StartSegment': soap_handler("StartSegment", self.StartSegment),
120            'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment),
121            }
122        self.xmlrpc_services =  {\
123            'RequestAccess': xmlrpc_handler('RequestAccess',
124                self.RequestAccess),
125            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
126                self.ReleaseAccess),
127            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
128            'TerminateSegment': xmlrpc_handler('TerminateSegment',
129                self.TerminateSegment),
130            }
131
132
133        if not config.has_option("allocate", "uri"):
134            self.allocate_project = \
135                allocate_project_local(config, auth)
136        else:
137            self.allocate_project = \
138                allocate_project_remote(config, auth)
139
140        # If the project allocator exports services, put them in this object's
141        # maps so that classes that instantiate this can call the services.
142        self.soap_services.update(self.allocate_project.soap_services)
143        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
144
145
146    def read_access(self, config):
147        """
148        Read a configuration file and set internal parameters.
149
150        The format is more complex than one might hope.  The basic format is
151        attribute value pairs separated by colons(:) on a signle line.  The
152        attributes in bool_attrs, emulab_attrs and id_attrs can all be set
153        directly using the name: value syntax.  E.g.
154        boss: hostname
155        sets self.boss to hostname.  In addition, there are access lines of the
156        form (tb, proj, user) -> (aproj, auser) that map the first tuple of
157        names to the second for access purposes.  Names in the key (left side)
158        can include "<NONE> or <ANY>" to act as wildcards or to require the
159        fields to be empty.  Similarly aproj or auser can be <SAME> or
160        <DYNAMIC> indicating that either the matching key is to be used or a
161        dynamic user or project will be created.  These names can also be
162        federated IDs (fedid's) if prefixed with fedid:.  Finally, the aproj
163        can be followed with a colon-separated list of node types to which that
164        project has access (or will have access if dynamic).
165        Testbed attributes outside the forms above can be given using the
166        format attribute: name value: value.  The name is a single word and the
167        value continues to the end of the line.  Empty lines and lines startin
168        with a # are ignored.
169
170        Parsing errors result in a self.parse_error exception being raised.
171        """
172        lineno=0
173        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
174        fedid_expr = "fedid:[" + string.hexdigits + "]+"
175        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
176        access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \
177                "<SAME>" + "(?::" + name_expr + ")*|" + \
178                fedid_expr + "(?::" + name_expr + ")*|" + \
179                name_expr + "(?::" + name_expr + ")*)"
180        access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")"
181
182        restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE)
183        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
184                re.IGNORECASE)
185        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
186                key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + 
187                access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE)
188
189        def parse_name(n):
190            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
191            else: return n
192       
193        def auth_name(n):
194            if isinstance(n, basestring):
195                if n =='<any>' or n =='<none>': return None
196                else: return unicode(n)
197            else:
198                return n
199
200        f = open(config, "r");
201        for line in f:
202            lineno += 1
203            line = line.strip();
204            if len(line) == 0 or line.startswith('#'):
205                continue
206
207            # Extended (attribute: x value: y) attribute line
208            m = attr_re.match(line)
209            if m != None:
210                attr, val = m.group(1,2)
211                self.attrs[attr] = val
212                continue
213
214            # Restricted entry
215            m = restricted_re.match(line)
216            if m != None:
217                val = m.group(1)
218                self.restricted.append(val)
219                continue
220
221            # Access line (t, p, u) -> (ap, cu, su) line
222            m = access_re.match(line)
223            if m != None:
224                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
225                auth_key = tuple([ auth_name(x) for x in access_key])
226                aps = m.group(4).split(":");
227                if aps[0] == 'fedid:':
228                    del aps[0]
229                    aps[0] = fedid(hexstr=aps[0])
230
231                cu = parse_name(m.group(5))
232                su = parse_name(m.group(6))
233
234                access_val = (access_project(aps[0], aps[1:]),
235                        parse_name(m.group(5)), parse_name(m.group(6)))
236
237                self.access[access_key] = access_val
238                self.auth.set_attribute(auth_key, "access")
239                continue
240
241            # Nothing matched to here: unknown line - raise exception
242            f.close()
243            raise self.parse_error("Unknown statement at line %d of %s" % \
244                    (lineno, config))
245        f.close()
246
247    def get_users(self, obj):
248        """
249        Return a list of the IDs of the users in dict
250        """
251        if obj.has_key('user'):
252            return [ unpack_id(u['userID']) \
253                    for u in obj['user'] if u.has_key('userID') ]
254        else:
255            return None
256
257    def write_state(self):
258        if self.state_filename:
259            try:
260                f = open(self.state_filename, 'w')
261                pickle.dump(self.state, f)
262            except IOError, e:
263                self.log.error("Can't write file %s: %s" % \
264                        (self.state_filename, e))
265            except pickle.PicklingError, e:
266                self.log.error("Pickling problem: %s" % e)
267            except TypeError, e:
268                self.log.error("Pickling problem (TypeError): %s" % e)
269
270
271    def read_state(self):
272        """
273        Read a new copy of access state.  Old state is overwritten.
274
275        State format is a simple pickling of the state dictionary.
276        """
277        if self.state_filename:
278            try:
279                f = open(self.state_filename, "r")
280                self.state = pickle.load(f)
281
282                self.allocation = self.state['allocation']
283                self.projects = self.state['projects']
284                self.keys = self.state['keys']
285                self.types = self.state['types']
286
287                self.log.debug("[read_state]: Read state from %s" % \
288                        self.state_filename)
289            except IOError, e:
290                self.log.warning(("[read_state]: No saved state: " +\
291                        "Can't open %s: %s") % (self.state_filename, e))
292            except EOFError, e:
293                self.log.warning(("[read_state]: " +\
294                        "Empty or damaged state file: %s:") % \
295                        self.state_filename)
296            except pickle.UnpicklingError, e:
297                self.log.warning(("[read_state]: No saved state: " + \
298                        "Unpickling failed: %s") % e)
299
300            # Add the ownership attributes to the authorizer.  Note that the
301            # indices of the allocation dict are strings, but the attributes are
302            # fedids, so there is a conversion.
303            for k in self.allocation.keys():
304                for o in self.allocation[k].get('owners', []):
305                    self.auth.set_attribute(o, fedid(hexstr=k))
306
307
308    def permute_wildcards(self, a, p):
309        """Return a copy of a with various fields wildcarded.
310
311        The bits of p control the wildcards.  A set bit is a wildcard
312        replacement with the lowest bit being user then project then testbed.
313        """
314        if p & 1: user = ["<any>"]
315        else: user = a[2]
316        if p & 2: proj = "<any>"
317        else: proj = a[1]
318        if p & 4: tb = "<any>"
319        else: tb = a[0]
320
321        return (tb, proj, user)
322
323    def find_access(self, search):
324        """
325        Search the access DB for a match on this tuple.  Return the matching
326        access tuple and the user that matched.
327       
328        NB, if the initial tuple fails to match we start inserting wildcards in
329        an order determined by self.project_priority.  Try the list of users in
330        order (when wildcarded, there's only one user in the list).
331        """
332        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
333        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
334
335        for p in perm: 
336            s = self.permute_wildcards(search, p)
337            # s[2] is None on an anonymous, unwildcarded request
338            if s[2] != None:
339                for u in s[2]:
340                    if self.access.has_key((s[0], s[1], u)):
341                        return (self.access[(s[0], s[1], u)], u)
342            else:
343                if self.access.has_key(s):
344                    return (self.access[s], None)
345        return None, None
346
347    def lookup_access(self, req, fid):
348        """
349        Determine the allowed access for this request.  Return the access and
350        which fields are dynamic.
351
352        The fedid is needed to construct the request
353        """
354        # Search keys
355        tb = None
356        project = None
357        user = None
358        # Return values
359        rp = access_project(None, ())
360        ru = None
361
362        if req.has_key('project'):
363            p = req['project']
364            if p.has_key('name'):
365                project = unpack_id(p['name'])
366            user = self.get_users(p)
367        else:
368            user = self.get_users(req)
369
370        user_fedids = [ u for u in user if isinstance(u, fedid)]
371        # Determine how the caller is representing itself.  If its fedid shows
372        # up as a project or a singleton user, let that stand.  If neither the
373        # usernames nor the project name is a fedid, the caller is a testbed.
374        if project and isinstance(project, fedid):
375            if project == fid:
376                # The caller is the project (which is already in the tuple
377                # passed in to the authorizer)
378                owners = user_fedids
379                owners.append(project)
380            else:
381                raise service_error(service_error.req,
382                        "Project asserting different fedid")
383        else:
384            if fid not in user_fedids:
385                tb = fid
386                owners = user_fedids
387                owners.append(fid)
388            else:
389                if len(fedids) > 1:
390                    raise service_error(service_error.req,
391                            "User asserting different fedid")
392                else:
393                    # Which is a singleton
394                    owners = user_fedids
395        # Confirm authorization
396
397        for u in user:
398            self.log.debug("[lookup_access] Checking access for %s" % \
399                    ((tb, project, u),))
400            if self.auth.check_attribute((tb, project, u), 'access'):
401                self.log.debug("[lookup_access] Access granted")
402                break
403            else:
404                self.log.debug("[lookup_access] Access Denied")
405        else:
406            raise service_error(service_error.access, "Access denied")
407
408        # This maps a valid user to the Emulab projects and users to use
409        found, user_match = self.find_access((tb, project, user))
410       
411        if found == None:
412            raise service_error(service_error.access,
413                    "Access denied - cannot map access")
414
415        # resolve <dynamic> and <same> in found
416        dyn_proj = False
417        dyn_create_user = False
418        dyn_service_user = False
419
420        if found[0].name == "<same>":
421            if project != None:
422                rp.name = project
423            else : 
424                raise service_error(\
425                        service_error.server_config,
426                        "Project matched <same> when no project given")
427        elif found[0].name == "<dynamic>":
428            rp.name = None
429            dyn_proj = True
430        else:
431            rp.name = found[0].name
432        rp.node_types = found[0].node_types;
433
434        if found[1] == "<same>":
435            if user_match == "<any>":
436                if user != None: rcu = user[0]
437                else: raise service_error(\
438                        service_error.server_config,
439                        "Matched <same> on anonymous request")
440            else:
441                rcu = user_match
442        elif found[1] == "<dynamic>":
443            rcu = None
444            dyn_create_user = True
445        else:
446            rcu = found[1]
447       
448        if found[2] == "<same>":
449            if user_match == "<any>":
450                if user != None: rsu = user[0]
451                else: raise service_error(\
452                        service_error.server_config,
453                        "Matched <same> on anonymous request")
454            else:
455                rsu = user_match
456        elif found[2] == "<dynamic>":
457            rsu = None
458            dyn_service_user = True
459        else:
460            rsu = found[2]
461
462        return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\
463                owners
464
465    def build_response(self, alloc_id, ap):
466        """
467        Create the SOAP response.
468
469        Build the dictionary description of the response and use
470        fedd_utils.pack_soap to create the soap message.  ap is the allocate
471        project message returned from a remote project allocation (even if that
472        allocation was done locally).
473        """
474        # Because alloc_id is already a fedd_services_types.IDType_Holder,
475        # there's no need to repack it
476        msg = { 
477                'allocID': alloc_id,
478                'emulab': { 
479                    'domain': self.domain,
480                    'boss': self.boss,
481                    'ops': self.ops,
482                    'fileServer': self.fileserver,
483                    'eventServer': self.eventserver,
484                    'project': ap['project']
485                },
486            }
487        if len(self.attrs) > 0:
488            msg['emulab']['fedAttr'] = \
489                [ { 'attribute': x, 'value' : y } \
490                        for x,y in self.attrs.iteritems()]
491        return msg
492
493    def RequestAccess(self, req, fid):
494        """
495        Handle the access request.  Proxy if not for us.
496
497        Parse out the fields and make the allocations or rejections if for us,
498        otherwise, assuming we're willing to proxy, proxy the request out.
499        """
500
501        def gateway_hardware(h):
502            if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE')
503            else: return h
504
505        # The dance to get into the request body
506        if req.has_key('RequestAccessRequestBody'):
507            req = req['RequestAccessRequestBody']
508        else:
509            raise service_error(service_error.req, "No request!?")
510
511        if req.has_key('destinationTestbed'):
512            dt = unpack_id(req['destinationTestbed'])
513
514        if dt == None or dt in self.testbed:
515            # Request for this fedd
516            found, dyn, owners = self.lookup_access(req, fid)
517            restricted = None
518            ap = None
519
520            # If this is a request to export a project and the access project
521            # is not the project to export, access denied.
522            if req.has_key('exportProject'):
523                ep = unpack_id(req['exportProject'])
524                if ep != found[0].name:
525                    raise service_error(service_error.access,
526                            "Cannot export %s" % ep)
527
528            # Check for access to restricted nodes
529            if req.has_key('resources') and req['resources'].has_key('node'):
530                resources = req['resources']
531                restricted = [ gateway_hardware(t) for n in resources['node'] \
532                                if n.has_key('hardware') \
533                                    for t in n['hardware'] \
534                                        if gateway_hardware(t) \
535                                            in self.restricted ]
536                inaccessible = [ t for t in restricted \
537                                    if t not in found[0].node_types]
538                if len(inaccessible) > 0:
539                    raise service_error(service_error.access,
540                            "Access denied (nodetypes %s)" % \
541                            str(', ').join(inaccessible))
542            # These collect the keys for the two roles into single sets, one
543            # for creation and one for service.  The sets are a simple way to
544            # eliminate duplicates
545            create_ssh = set([ x['sshPubkey'] \
546                    for x in req['createAccess'] \
547                        if x.has_key('sshPubkey')])
548
549            service_ssh = set([ x['sshPubkey'] \
550                    for x in req['serviceAccess'] \
551                        if x.has_key('sshPubkey')])
552
553            if len(create_ssh) > 0 and len(service_ssh) >0: 
554                if dyn[1]: 
555                    # Compose the dynamic project request
556                    # (only dynamic, dynamic currently allowed)
557                    preq = { 'AllocateProjectRequestBody': \
558                                { 'project' : {\
559                                    'user': [ \
560                                    { \
561                                        'access': [ { 'sshPubkey': s } \
562                                            for s in service_ssh ], 
563                                         'role': "serviceAccess",\
564                                    }, \
565                                    { \
566                                        'access': [ { 'sshPubkey': s } \
567                                            for s in create_ssh ], 
568                                         'role': "experimentCreation",\
569                                    }, \
570                                    ], \
571                                    }\
572                                }\
573                            }
574                    if restricted != None and len(restricted) > 0:
575                        preq['AllocateProjectRequestBody']['resources'] = \
576                             {'node': [ { 'hardware' :  [ h ] } \
577                                    for h in restricted ] } 
578                    ap = self.allocate_project.dynamic_project(preq)
579                else:
580                    preq = {'StaticProjectRequestBody' : \
581                            { 'project': \
582                                { 'name' : { 'localname' : found[0].name },\
583                                  'user' : [ \
584                                    {\
585                                        'userID': { 'localname' : found[1] }, \
586                                        'access': [ { 'sshPubkey': s } 
587                                            for s in create_ssh ],
588                                        'role': 'experimentCreation'\
589                                    },\
590                                    {\
591                                        'userID': { 'localname' : found[2] }, \
592                                        'access': [ { 'sshPubkey': s } 
593                                            for s in service_ssh ],
594                                        'role': 'serviceAccess'\
595                                    },\
596                                ]}\
597                            }\
598                    }
599                    if restricted != None and len(restricted) > 0:
600                        preq['StaticProjectRequestBody']['resources'] = \
601                            {'node': [ { 'hardware' :  [ h ] } \
602                                    for h in restricted ] } 
603                    ap = self.allocate_project.static_project(preq)
604            else:
605                raise service_error(service_error.req, 
606                        "SSH access parameters required")
607            # keep track of what's been added
608            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
609            aid = unicode(allocID)
610
611            self.state_lock.acquire()
612            self.allocation[aid] = { }
613            try:
614                pname = ap['project']['name']['localname']
615            except KeyError:
616                pname = None
617
618            if dyn[1]:
619                if not pname:
620                    self.state_lock.release()
621                    raise service_error(service_error.internal,
622                            "Misformed allocation response?")
623                if self.projects.has_key(pname): self.projects[pname] += 1
624                else: self.projects[pname] = 1
625                self.allocation[aid]['project'] = pname
626            else:
627                # sproject is a static project associated with this allocation.
628                self.allocation[aid]['sproject'] = pname
629
630            if ap.has_key('resources'):
631                if not pname:
632                    self.state_lock.release()
633                    raise service_error(service_error.internal,
634                            "Misformed allocation response?")
635                self.allocation[aid]['types'] = set()
636                nodes = ap['resources'].get('node', [])
637                for n in nodes:
638                    for h in n.get('hardware', []):
639                        if self.types.has_key((pname, h)):
640                            self.types[(pname, h)] += 1
641                        else:
642                            self.types[(pname, h)] = 1
643                        self.allocation[aid]['types'].add((pname,h))
644
645
646            self.allocation[aid]['keys'] = [ ]
647
648            try:
649                for u in ap['project']['user']:
650                    uname = u['userID']['localname']
651                    if u['role'] == 'experimentCreation':
652                        self.allocation[aid]['user'] = uname
653                    for k in [ k['sshPubkey'] for k in u['access'] \
654                            if k.has_key('sshPubkey') ]:
655                        kv = "%s:%s" % (uname, k)
656                        if self.keys.has_key(kv): self.keys[kv] += 1
657                        else: self.keys[kv] = 1
658                        self.allocation[aid]['keys'].append((uname, k))
659            except KeyError:
660                self.state_lock.release()
661                raise service_error(service_error.internal,
662                        "Misformed allocation response?")
663
664
665            self.allocation[aid]['owners'] = owners
666            self.write_state()
667            self.state_lock.release()
668            for o in owners:
669                self.auth.set_attribute(o, allocID)
670            try:
671                f = open("%s/%s.pem" % (self.certdir, aid), "w")
672                print >>f, alloc_cert
673                f.close()
674            except IOError, e:
675                raise service_error(service_error.internal, 
676                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
677            resp = self.build_response({ 'fedid': allocID } , ap)
678            return resp
679        else:
680            if self.allow_proxy:
681                resp = self.proxy_RequestAccess.call_service(dt, req,
682                            self.cert_file, self.cert_pwd,
683                            self.trusted_certs)
684                if resp.has_key('RequestAccessResponseBody'):
685                    return resp['RequestAccessResponseBody']
686                else:
687                    return None
688            else:
689                raise service_error(service_error.access,
690                        "Access proxying denied")
691
692    def ReleaseAccess(self, req, fid):
693        # The dance to get into the request body
694        if req.has_key('ReleaseAccessRequestBody'):
695            req = req['ReleaseAccessRequestBody']
696        else:
697            raise service_error(service_error.req, "No request!?")
698
699        if req.has_key('destinationTestbed'):
700            dt = unpack_id(req['destinationTestbed'])
701        else:
702            dt = None
703
704        if dt == None or dt in self.testbed:
705            # Local request
706            try:
707                if req['allocID'].has_key('localname'):
708                    auth_attr = aid = req['allocID']['localname']
709                elif req['allocID'].has_key('fedid'):
710                    aid = unicode(req['allocID']['fedid'])
711                    auth_attr = req['allocID']['fedid']
712                else:
713                    raise service_error(service_error.req,
714                            "Only localnames and fedids are understood")
715            except KeyError:
716                raise service_error(service_error.req, "Badly formed request")
717
718            self.log.debug("[access] deallocation requested for %s", aid)
719            if not self.auth.check_attribute(fid, auth_attr):
720                self.log.debug("[access] deallocation denied for %s", aid)
721                raise service_error(service_error.access, "Access Denied")
722
723            # If we know this allocation, reduce the reference counts and
724            # remove the local allocations.  Otherwise report an error.  If
725            # there is an allocation to delete, del_users will be a dictonary
726            # of sets where the key is the user that owns the keys in the set.
727            # We use a set to avoid duplicates.  del_project is just the name
728            # of any dynamic project to delete.  We're somewhat lazy about
729            # deleting authorization attributes.  Having access to something
730            # that doesn't exist isn't harmful.
731            del_users = { }
732            del_project = None
733            del_types = set()
734
735            if self.allocation.has_key(aid):
736                self.log.debug("Found allocation for %s" %aid)
737                self.state_lock.acquire()
738                for k in self.allocation[aid]['keys']:
739                    kk = "%s:%s" % k
740                    self.keys[kk] -= 1
741                    if self.keys[kk] == 0:
742                        if not del_users.has_key(k[0]):
743                            del_users[k[0]] = set()
744                        del_users[k[0]].add(k[1])
745                        del self.keys[kk]
746
747                if self.allocation[aid].has_key('project'):
748                    pname = self.allocation[aid]['project']
749                    self.projects[pname] -= 1
750                    if self.projects[pname] == 0:
751                        del_project = pname
752                        del self.projects[pname]
753
754                if self.allocation[aid].has_key('types'):
755                    for t in self.allocation[aid]['types']:
756                        self.types[t] -= 1
757                        if self.types[t] == 0:
758                            if not del_project: del_project = t[0]
759                            del_types.add(t[1])
760                            del self.types[t]
761
762                del self.allocation[aid]
763                self.write_state()
764                self.state_lock.release()
765                # If we actually have resources to deallocate, prepare the call.
766                if del_project or del_users:
767                    msg = { 'project': { }}
768                    if del_project:
769                        msg['project']['name']= {'localname': del_project}
770                    users = [ ]
771                    for u in del_users.keys():
772                        users.append({ 'userID': { 'localname': u },\
773                            'access' :  \
774                                    [ {'sshPubkey' : s } for s in del_users[u]]\
775                        })
776                    if users: 
777                        msg['project']['user'] = users
778                    if len(del_types) > 0:
779                        msg['resources'] = { 'node': \
780                                [ {'hardware': [ h ] } for h in del_types ]\
781                            }
782                    if self.allocate_project.release_project:
783                        msg = { 'ReleaseProjectRequestBody' : msg}
784                        self.allocate_project.release_project(msg)
785                # And remove the access cert
786                cf = "%s/%s.pem" % (self.certdir, aid)
787                self.log.debug("Removing %s" % cf)
788                os.remove(cf)
789                return { 'allocID': req['allocID'] } 
790            else:
791                raise service_error(service_error.req, "No such allocation")
792
793        else:
794            if self.allow_proxy:
795                resp = self.proxy_ReleaseAccess.call_service(dt, req,
796                            self.cert_file, self.cert_pwd,
797                            self.trusted_certs)
798                if resp.has_key('ReleaseAccessResponseBody'):
799                    return resp['ReleaseAccessResponseBody']
800                else:
801                    return None
802            else:
803                raise service_error(service_error.access,
804                        "Access proxying denied")
805
806
807
808    class proxy_emulab_segment:
809        class ssh_cmd_timeout(RuntimeError): pass
810
811        def __init__(self, log=None, keyfile=None, debug=False):
812            self.log = log or logging.getLogger(\
813                    'fedd.access.proxy_emulab_segment')
814            self.ssh_privkey_file = keyfile
815            self.debug = debug
816            self.ssh_exec="/usr/bin/ssh"
817            self.scp_exec = "/usr/bin/scp"
818            self.ssh_cmd_timeout = access.proxy_emulab_segment.ssh_cmd_timeout
819
820        def scp_file(self, file, user, host, dest=""):
821            """
822            scp a file to the remote host.  If debug is set the action is only
823            logged.
824            """
825
826            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 
827                    '-o', 'StrictHostKeyChecking yes', '-i', 
828                    self.ssh_privkey_file, file, 
829                    "%s@%s:%s" % (user, host, dest)]
830            rv = 0
831
832            try:
833                dnull = open("/dev/null", "w")
834            except IOError:
835                self.log.debug("[ssh_file]: failed to open " + \
836                        "/dev/null for redirect")
837                dnull = Null
838
839            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
840            if not self.debug:
841                rv = subprocess.call(scp_cmd, stdout=dnull, 
842                        stderr=dnull, close_fds=True, close_fds=True)
843
844            return rv == 0
845
846        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
847            """
848            Run a remote command on host as user.  If debug is set, the action
849            is only logged.  Commands are run without stdin, to avoid stray
850            SIGTTINs.
851            """
852            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
853                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
854                    (self.ssh_exec, self.ssh_privkey_file, 
855                            user, host, cmd)
856
857            try:
858                dnull = open("/dev/null", "w")
859            except IOError:
860                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
861                        "for redirect")
862                dnull = Null
863
864            self.log.debug("[ssh_cmd]: %s" % sh_str)
865            if not self.debug:
866                if dnull:
867                    sub = subprocess.Popen(sh_str, shell=True, stdout=dnull,
868                            stderr=dnull, close_fds=True)
869                else:
870                    sub = subprocess.Popen(sh_str, shell=True, close_fds=True)
871                if timeout:
872                    i = 0
873                    rv = sub.poll()
874                    while i < timeout:
875                        if rv is not None: break
876                        else:
877                            time.sleep(1)
878                            rv = sub.poll()
879                            i += 1
880                    else:
881                        self.log.debug("Process exceeded runtime: %s" % sh_str)
882                        os.kill(sub.pid, signal.SIGKILL)
883                        raise self.ssh_cmd_timeout();
884                    return rv == 0
885                else:
886                    return sub.wait() == 0
887            else:
888                if timeout == 0:
889                    self.log.debug("debug timeout raised on %s " % sh_str)
890                    raise self.ssh_cmd_timeout()
891                else:
892                    return True
893
894    class start_segment(proxy_emulab_segment):
895        def __init__(self, log=None, keyfile=None, debug=False):
896            access.proxy_emulab_segment.__init__(self, log=log, 
897                    keyfile=keyfile, debug=debug)
898            self.null = """
899set ns [new Simulator]
900source tb_compat.tcl
901
902set a [$ns node]
903
904$ns rtproto Session
905$ns run
906"""
907
908        def get_state(self, user, host, pid, eid):
909            # command to test experiment state
910            expinfo_exec = "/usr/testbed/bin/expinfo" 
911            # Regular expressions to parse the expinfo response
912            state_re = re.compile("State:\s+(\w+)")
913            no_exp_re = re.compile("^No\s+such\s+experiment")
914            swapping_re = re.compile("^No\s+information\s+available.")
915            state = None    # Experiment state parsed from expinfo
916            # The expinfo ssh command.  Note the identity restriction to use
917            # only the identity provided in the pubkey given.
918            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 
919                    'StrictHostKeyChecking yes', '-i', 
920                    self.ssh_privkey_file, "%s@%s" % (user, host), 
921                    expinfo_exec, pid, eid]
922
923            dev_null = None
924            try:
925                dev_null = open("/dev/null", "a")
926            except IOError, e:
927                self.log.error("[get_state]: can't open /dev/null: %s" %e)
928
929            if self.debug:
930                state = 'swapped'
931                rv = 0
932            else:
933                self.log.debug("Checking state")
934                status = subprocess.Popen(cmd, stdout=subprocess.PIPE,
935                        stderr=dev_null, close_fds=True)
936                for line in status.stdout:
937                    m = state_re.match(line)
938                    if m: state = m.group(1)
939                    else:
940                        for reg, st in ((no_exp_re, "none"),
941                                (swapping_re, "swapping")):
942                            m = reg.match(line)
943                            if m: state = st
944                rv = status.wait()
945
946            # If the experiment is not present the subcommand returns a
947            # non-zero return value.  If we successfully parsed a "none"
948            # outcome, ignore the return code.
949            if rv != 0 and state != 'none':
950                raise service_error(service_error.internal,
951                        "Cannot get status of segment:%s/%s" % (pid, eid))
952            elif state not in ('active', 'swapped', 'swapping', 'none'):
953                raise service_error(service_error.internal,
954                        "Cannot get status of segment:%s/%s" % (pid, eid))
955            else:
956                self.log.debug("State is %s" % state)
957                return state
958
959
960        def __call__(self, parent, eid, pid, user, tclfile, tmpdir, timeout=0):
961            """
962            Start a sub-experiment on a federant.
963
964            Get the current state, modify or create as appropriate, ship data
965            and configs and start the experiment.  There are small ordering
966            differences based on the initial state of the sub-experiment.
967            """
968            # ops node in the federant
969            host = "%s%s" % (parent.ops, parent.domain)
970            # Configuration directories on the remote machine
971            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
972            softdir = "/proj/%s/software/%s" % (pid, eid)
973            # Local software dir
974            lsoftdir = "%s/software" % tmpdir
975
976            state = self.get_state(user, host, pid, eid)
977
978            if not self.scp_file(tclfile, user, host):
979                return False
980           
981            if state == 'none':
982                # Create a null copy of the experiment so that we capture any
983                # logs there if the modify fails.  Emulab software discards the
984                # logs from a failed startexp
985                try:
986                    f = open("%s/null.tcl" % tmpdir, "w")
987                    print >>f, self.null
988                    f.close()
989                except IOError, e:
990                    raise service_error(service_error.internal,
991                            "Cannot stage tarfile/rpm: %s" % e.strerror)
992
993                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
994                    return False
995                self.log.info("[start_segment]: Creating %s" % eid)
996                timedout = False
997                try:
998                    if not self.ssh_cmd(user, host,
999                            ("/usr/testbed/bin/startexp -i -f -w -p %s " + 
1000                            "-e %s null.tcl") % (pid, eid), "startexp",
1001                            timeout=60 * 10):
1002                        return False
1003                except self.ssh_cmd_timeout:
1004                    timedout = True
1005
1006                if timedout:
1007                    state = self.get_state(user, host, pid, eid)
1008                    if state != "swapped":
1009                        return False
1010           
1011            # Open up a temporary file to contain a script for setting up the
1012            # filespace for the new experiment.
1013            self.log.info("[start_segment]: creating script file")
1014            try:
1015                sf, scriptname = tempfile.mkstemp()
1016                scriptfile = os.fdopen(sf, 'w')
1017            except IOError:
1018                return False
1019
1020            scriptbase = os.path.basename(scriptname)
1021
1022            # Script the filesystem changes
1023            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
1024            # Clear and create the software directory
1025            print >>scriptfile, "/bin/rm -rf %s/*" % softdir
1026            print >>scriptfile, 'mkdir -p %s' % proj_dir
1027            if os.path.isdir(lsoftdir):
1028                print >>scriptfile, 'mkdir -p %s' % softdir
1029            print >>scriptfile, "rm -f %s" % scriptbase
1030            scriptfile.close()
1031
1032            # Move the script to the remote machine
1033            # XXX: could collide tempfile names on the remote host
1034            if self.scp_file(scriptname, user, host, scriptbase):
1035                os.remove(scriptname)
1036            else:
1037                return False
1038
1039            # Execute the script (and the script's last line deletes it)
1040            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
1041                return False
1042
1043            for f in os.listdir(tmpdir):
1044                if not os.path.isdir("%s/%s" % (tmpdir, f)):
1045                    if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
1046                            "%s/%s" % (proj_dir, f)):
1047                        return False
1048            if os.path.isdir(lsoftdir):
1049                for f in os.listdir(lsoftdir):
1050                    if not os.path.isdir("%s/%s" % (lsoftdir, f)):
1051                        if not self.scp_file("%s/%s" % (lsoftdir, f), 
1052                                user, host, "%s/%s" % (softdir, f)):
1053                            return False
1054            # Stage the new configuration (active experiments will stay swapped
1055            # in now)
1056            self.log.info("[start_segment]: Modifying %s" % eid)
1057            try:
1058                if not self.ssh_cmd(user, host,
1059                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
1060                                (pid, eid, tclfile.rpartition('/')[2]),
1061                        "modexp", timeout= 60 * 10):
1062                    return False
1063            except self.ssh_cmd_timeout:
1064                self.log.error("Modify command failed to complete in time")
1065                # There's really no way to see if this succeeded or failed, so
1066                # if it hangs, assume the worst.
1067                return False
1068            # Active experiments are still swapped, this swaps the others in.
1069            if state != 'active':
1070                self.log.info("[start_segment]: Swapping %s" % eid)
1071                timedout = False
1072                try:
1073                    if not self.ssh_cmd(user, host,
1074                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
1075                            "swapexp", timeout=10*60):
1076                        return False
1077                except self.ssh_cmd_timeout:
1078                    timedout = True
1079               
1080                # If the command was terminated, but completed successfully,
1081                # report success.
1082                if timedout:
1083                    self.log.debug("[start_segment]: swapin timed out " +\
1084                            "checking state")
1085                    state = self.get_state(user, host, pid, eid)
1086                    self.log.debug("[start_segment]: state is %s" % state)
1087                    return state == 'active'
1088            # Everything has gone OK.
1089            return True
1090
1091    class stop_segment(proxy_emulab_segment):
1092        def __init__(self, log=None, keyfile=None, debug=False):
1093            access.proxy_emulab_segment.__init__(self,
1094                    log=log, keyfile=keyfile, debug=debug)
1095
1096        def __call__(self, parent, user, pid, eid):
1097            """
1098            Stop a sub experiment by calling swapexp on the federant
1099            """
1100            host = "%s%s" % (parent.ops, parent.domain)
1101            self.log.info("[stop_segment]: Stopping %s" % eid)
1102            rv = False
1103            try:
1104                # Clean out tar files: we've gone over quota in the past
1105                self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
1106                        (pid, eid))
1107                rv = self.ssh_cmd(user, host,
1108                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
1109            except self.ssh_cmd_timeout:
1110                rv = False
1111            return rv
1112
1113    def generate_portal_configs(self, topo, pubkey_base, secretkey_base, 
1114            tmpdir, master):
1115
1116        seer_out = False
1117        client_out = False
1118        for p in [ e for e in topo.elements \
1119                if isinstance(e, topdl.Computer) and e.get_attribute('portal')]:
1120            myname = e.name[0]
1121            peer = e.get_attribute('peer')
1122            lexp = e.get_attribute('experiment')
1123            lproj, leid = lexp.split('/', 1)
1124            ldomain = e.get_attribute('domain')
1125            mexp = e.get_attribute('masterexperiment')
1126            mproj, meid = mexp.split("/", 1)
1127            mdomain = e.get_attribute('masterdomain')
1128            muser = e.get_attribute('masteruser') or 'root'
1129            smbshare = e.get_attribute('smbshare') or 'USERS'
1130            scriptdir = e.get_attribute('scriptdir')
1131            active = e.get_attribute('active')
1132            type = e.get_attribute('portal_type')
1133            segid = fedid(hexstr=e.get_attribute('peer_segment'))
1134            for e in topo.elements:
1135                if isinstance(e, topdl.Segment) and e.id.fedid == segid:
1136                    seg = e
1137                    break
1138            else:
1139                raise service_error(service_error.req, 
1140                        "Can't find segment for portal %s" % myname)
1141
1142            rexp = seg.get_attribute('experiment')
1143            rproj, reid = rexp.split("/", 1)
1144            rdomain = seg.get_attribute('domain')
1145            cfn = "%s/%s.%s.%s%s.gw.conf" % \
1146                    (tmpdir, myname.lower(), leid.lower(),
1147                            lproj.lower(), ldomain.lower())
1148            tunnelconfig = self.attrs.has_key('TunnelCfg')
1149            try:
1150                f = open(cfn, "w")
1151                print >>f, "Active: %s" % active
1152                print >>f, "TunnelCfg: %s" % tunnelconfig
1153                print >>f, "BossName: boss"
1154                print >>f, "FsName: fs"
1155                print >>f, "EventServerName: event-server%s" % ldomain
1156                print >>f, "RemoteEventServerName: event-server%s" % rdomain
1157                print >>f, "SeerControl: control.%s.%s%s" % \
1158                        (meid.lower(), mproj.lower(), mdomain)
1159                print >>f, "Type: %s" % type
1160                print >>f, "RemoteExperiment: %s" % rexp
1161                print >>f, "LocalExperiment: %s" % lexp
1162                print >>f, "RemoteConfigFile: " + \
1163                        "/proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf" \
1164                        % (rproj, reid, peer.lower(), reid.lower(),
1165                                rproj.lower(), rdomain)
1166                print >>f, "Peer: %s.%s.%s%s" % \
1167                        (peer.lower(), reid.lower(), rproj.lower(), rdomain)
1168                print >>f, "RemoteScriptDir: %s" % scriptdir
1169                print >>f, "Pubkeys: /proj/%s/exp/%s/tmp/%s" % \
1170                        (lproj, leid, pubkey_base)
1171                print >>f, "Privkeys: /proj/%s/exp/%s/tmp/%s" % \
1172                        (lproj, leid, secretkey_base)
1173                f.close()
1174            except IOError, e:
1175                raise service_error(service_error.internal,
1176                        "Can't write protal config %s: %s" % (cfn, e))
1177           
1178            # XXX: This little seer config file needs to go away.
1179            if not seer_out:
1180                try:
1181                    seerfn = "%s/seer.conf" % tmpdir
1182                    f = open(seerfn, "w")
1183                    if not master:
1184                        print >>f, "ControlNode: control.%s.%s%s" % \
1185                            (meid.lower(), mproj.lower(), mdomain)
1186                    print >>f, "ExperimentID: %s" % mexp
1187                    f.close()
1188                except IOError, e:
1189                    raise service_error(service_error.internal, 
1190                            "Can't write seer.conf: %s" %e)
1191                seer_out = True
1192
1193            if not client_out and type in ('control', 'both'):
1194                try:
1195                    f = open("%s/client.conf" % tmpdir, "w")
1196                    print >>f, "ControlGateway: %s.%s.%s%s" % \
1197                        (myname.lower(), leid.lower(), lproj.lower(),
1198                                ldomain.lower())
1199                    print >>f, "SMBshare: %s" % smbshare
1200                    print >>f, "ProjectUser: %s" % muser
1201                    print >>f, "ProjectName: %s" % mproj
1202                    print >>f, "ExperimentID: %s/%s" % (mproj, meid)
1203                    f.close()
1204                except IOError, e:
1205                    raise service_error(service_error.internal,
1206                            "Cannot write client.conf: %s" %s)
1207                client_out = True
1208
1209
1210
1211    def generate_ns2(self, topo, expfn, softdir, master):
1212        t = topo.clone()
1213
1214        # Weed out the things we aren't going to instantiate: Segments, portal
1215        # substrates, and portal interfaces.  (The copy in the for loop allows
1216        # us to delete from e.elements in side the for loop).
1217        for e in [e for e in t.elements]:
1218            if isinstance(e, topdl.Segment):
1219                t.elements.remove(e)
1220            if isinstance(e, topdl.Computer):
1221                e.interface = [i for i in e.interface \
1222                        if not i.get_attribute('portal')]
1223        t.substrates = [ s for s in t.substrates \
1224                if not s.get_attribute('portal')]
1225        t.incorporate_elements()
1226
1227        # Localize the software locations
1228        for e in t.elements:
1229            for s in getattr(e, 'software', []):
1230                s.location = re.sub("^.*/", softdir, s.location)
1231
1232        # Customize the ns2 output for local portal commands and images
1233        filters = []
1234
1235        if master: cmdname = 'MasterConnectorCmd'
1236        else:cmdname = 'SlaveConnectorCmd'
1237
1238        if self.attrs.has_key(cmdname):
1239            filters.append(topdl.generate_portal_command_filter(
1240                self.attrs.get(cmdname)))
1241
1242        if self.attrs.has_key('connectorImage'):
1243            filters.append(topdl.generate_portal_image_filter(
1244                self.attrs.get('connectorImage')))
1245
1246        if self.attrs.has_key('connectorType'):
1247            filters.append(topdl.generate_portal_hardware_filter(
1248                self.attrs.get('connectorType')))
1249
1250        # Convert to ns and write it out
1251        expfile = topdl.topology_to_ns2(t, filters)
1252        try:
1253            f = open(expfn, "w")
1254            print >>f, expfile
1255            f.close()
1256        except IOError:
1257            raise service_error(service_error.internal,
1258                    "Cannot write experiment file %s: %s" % (expfn,e))
1259
1260    def StartSegment(self, req, fid):
1261        def get_url(url, cf, destdir):
1262            po = urlparse(url)
1263            fn = po.path.rpartition('/')[2]
1264            try:
1265                conn = httplib.HTTPSConnection(po.hostname, port=po.port, 
1266                        cert_file=cf, key_file=cf)
1267                conn.putrequest('GET', po.path)
1268                conn.endheaders()
1269                response = conn.getresponse()
1270
1271                lf = open("%s/%s" % (destdir, fn), "w")
1272                buf = response.read(4096)
1273                while buf:
1274                    lf.write(buf)
1275                    buf = response.read(4096)
1276                lf.close()
1277            except IOError, e:
1278                raise service_error(service_error.internal,
1279                        "Erro writing tempfile: %s" %e)
1280            except httplib.HTTPException, e:
1281                raise service_error(service_error.internal, 
1282                        "Error retrieving data: %s" % e)
1283
1284        configs = set(('hosts', 'ssh_pubkey', 'ssh_secretkey'))
1285
1286        err = None  # Any service_error generated after tmpdir is created
1287        rv = None   # Return value from segment creation
1288
1289        try:
1290            req = req['StartSegmentRequestBody']
1291        except KeyError:
1292            raise service_error(server_error.req, "Badly formed request")
1293
1294        auth_attr = req['allocID']['fedid']
1295        aid = "%s" % auth_attr
1296        attrs = req.get('fedAttr', [])
1297        if not self.auth.check_attribute(fid, auth_attr):
1298            raise service_error(service_error.access, "Access denied")
1299
1300        if req.has_key('segmentdescription') and \
1301                req['segmentdescription'].has_key('topdldescription'):
1302            topo = \
1303                topdl.Topology(**req['segmentdescription']['topdldescription'])
1304        else:
1305            raise service_error(service_error.req, 
1306                    "Request missing segmentdescription'")
1307
1308        master = req.get('master', False)
1309
1310        certfile = "%s/%s.pem" % (self.certdir, auth_attr)
1311        try:
1312            tmpdir = tempfile.mkdtemp(prefix="access-")
1313            softdir = "%s/software" % tmpdir
1314        except IOError:
1315            raise service_error(service_error.internal, "Cannot create tmp dir")
1316
1317        # Try block alllows us to clean up temporary files.
1318        try:
1319            sw = set()
1320            for e in topo.elements:
1321                for s in getattr(e, 'software', []):
1322                    sw.add(s.location)
1323            if len(sw) > 0:
1324                os.mkdir(softdir)
1325            for s in sw:
1326                get_url(s, certfile, softdir)
1327
1328            for a in attrs:
1329                if a['attribute'] in configs:
1330                    get_url(a['value'], certfile, tmpdir)
1331                if a['attribute'] == 'ssh_pubkey':
1332                    pubkey_base = a['value'].rpartition('/')[2]
1333                if a['attribute'] == 'ssh_secretkey':
1334                    secretkey_base = a['value'].rpartition('/')[2]
1335                if a['attribute'] == 'experiment_name':
1336                    ename = a['value']
1337
1338            proj = None
1339            user = None
1340            self.state_lock.acquire()
1341            if self.allocation.has_key(aid):
1342                proj = self.allocation[aid].get('project', None)
1343                if not proj: 
1344                    proj = self.allocation[aid].get('sproject', None)
1345                user = self.allocation[aid].get('user', None)
1346                self.allocation[aid]['experiment'] = ename
1347                self.allocation[aid]['log'] = [ ]
1348                # Create a logger that logs to the experiment's state object as
1349                # well as to the main log file.
1350                alloc_log = logging.getLogger('fedd.access.%s' % ename)
1351                h = logging.StreamHandler(
1352                        list_log.list_log(self.allocation[aid]['log']))
1353                # XXX: there should be a global one of these rather than
1354                # repeating the code.
1355                h.setFormatter(logging.Formatter(
1356                    "%(asctime)s %(name)s %(message)s",
1357                            '%d %b %y %H:%M:%S'))
1358                alloc_log.addHandler(h)
1359                self.write_state()
1360            self.state_lock.release()
1361
1362            if not proj:
1363                raise service_error(service_error.internal, 
1364                        "Can't find project for %s" %aid)
1365
1366            if not user:
1367                raise service_error(service_error.internal, 
1368                        "Can't find creation user for %s" %aid)
1369
1370            expfile = "%s/experiment.tcl" % tmpdir
1371
1372            self.generate_portal_configs(topo, pubkey_base, 
1373                    secretkey_base, tmpdir, master)
1374            self.generate_ns2(topo, expfile, 
1375                    "/proj/%s/software/%s/" % (proj, ename), master)
1376
1377            starter = self.start_segment(keyfile=self.ssh_privkey_file, 
1378                    debug=self.create_debug, log=alloc_log)
1379            rv = starter(self, ename, proj, user, expfile, tmpdir)
1380        except service_error, e:
1381            err = e
1382
1383        self.log.debug("[StartSegment]: removing %s" % tmpdir)
1384        # Walk up tmpdir, deleting as we go
1385        for path, dirs, files in os.walk(tmpdir, topdown=False):
1386            for f in files:
1387                os.remove(os.path.join(path, f))
1388            for d in dirs:
1389                os.rmdir(os.path.join(path, d))
1390        os.rmdir(tmpdir)
1391
1392        if rv:
1393            # Grab the log (this is some anal locking, but better safe than
1394            # sorry)
1395            self.state_lock.acquire()
1396            logv = "".join(self.allocation[aid]['log'])
1397            self.state_lock.release()
1398
1399            return { 'allocID': req['allocID'], 'allocationLog': logv }
1400        elif err:
1401            raise service_error(service_error.federant,
1402                    "Swapin failed: %s" % err)
1403        else:
1404            raise service_error(service_error.federant, "Swapin failed")
1405
1406    def TerminateSegment(self, req, fid):
1407        try:
1408            req = req['TerminateSegmentRequestBody']
1409        except KeyError:
1410            raise service_error(server_error.req, "Badly formed request")
1411
1412        auth_attr = req['allocID']['fedid']
1413        aid = "%s" % auth_attr
1414        attrs = req.get('fedAttr', [])
1415        if not self.auth.check_attribute(fid, auth_attr):
1416            raise service_error(service_error.access, "Access denied")
1417
1418        self.state_lock.acquire()
1419        if self.allocation.has_key(aid):
1420            proj = self.allocation[aid].get('project', None)
1421            if not proj: 
1422                proj = self.allocation[aid].get('sproject', None)
1423            user = self.allocation[aid].get('user', None)
1424            ename = self.allocation[aid].get('experiment', None)
1425        self.state_lock.release()
1426
1427        if not proj:
1428            raise service_error(service_error.internal, 
1429                    "Can't find project for %s" % aid)
1430
1431        if not user:
1432            raise service_error(service_error.internal, 
1433                    "Can't find creation user for %s" % aid)
1434        if not ename:
1435            raise service_error(service_error.internal, 
1436                    "Can't find experiment name for %s" % aid)
1437        stopper = self.stop_segment(keyfile=self.ssh_privkey_file,
1438                debug=self.create_debug)
1439        stopper(self, user, proj, ename)
1440        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.