source: fedd/federation/dragon_access.py @ 68bb551

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 68bb551 was 68bb551, checked in by Ted Faber <faber@…>, 15 years ago

Get the allocation log into the response and add the vlan number as an attribute. Some misc debugging code removed as well.

Next, we make the experiment controller deal with this new thing.

  • Property mode set to 100644
File size: 26.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import *
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to dragon_access.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = False
64        self.certdir = config.get("access","certdir")
65        self.create_debug = config.getboolean("access", "create_debug")
66        self.cli_dir = config.get("access", "cli_dir")
67        self.axis2_home = config.get("access", "axis2_home")
68        self.idc_url = config.get("access", "idc")
69
70        self.attrs = { }
71        self.access = { }
72        # State is a dict of dicts indexed by segment fedid that includes the
73        # owners of the segment as fedids (who can manipulate it, key: owners),
74        # the repo dir/user for the allocation (key: user),  Current allocation
75        # log (key: log), and GRI of the reservation once made (key: gri)
76        self.state = { }
77        self.log = logging.getLogger("fedd.access")
78        set_log_level(config, "access", self.log)
79        self.state_lock = Lock()
80        if not (self.cli_dir and self.axis2_home and self.idc_url):
81            self.log.error("Must specify all of cli_dir, axis2_home, " +\
82                    "idc in the [access] section of the configuration")
83
84        if auth: self.auth = auth
85        else:
86            self.log.error(\
87                    "[access]: No authorizer initialized, creating local one.")
88            auth = authorizer()
89
90        tb = config.get('access', 'testbed')
91        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
92        else: self.testbed = [ ]
93
94        if config.has_option("access", "accessdb"):
95            self.read_access(config.get("access", "accessdb"))
96
97        self.state_filename = config.get("access", "access_state")
98        self.read_state()
99
100        # Keep cert_file and cert_pwd coming from the same place
101        self.cert_file = config.get("access", "cert_file")
102        if self.cert_file:
103            self.sert_pwd = config.get("access", "cert_pw")
104        else:
105            self.cert_file = config.get("globals", "cert_file")
106            self.sert_pwd = config.get("globals", "cert_pw")
107
108        self.trusted_certs = config.get("access", "trusted_certs") or \
109                config.get("globals", "trusted_certs")
110
111        self.soap_services = {\
112            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
113            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
114            'StartSegment': soap_handler("StartSegment", self.StartSegment),
115            'TerminateSegment': soap_handler("TerminateSegment", 
116                self.TerminateSegment),
117            }
118        self.xmlrpc_services =  {\
119            'RequestAccess': xmlrpc_handler('RequestAccess',
120                self.RequestAccess),
121            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
122                self.ReleaseAccess),
123            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
124            'TerminateSegment': xmlrpc_handler('TerminateSegment',
125                self.TerminateSegment),
126            }
127
128    def read_access(self, config):
129        """
130        Read a configuration file and set internal parameters.
131
132        There are access lines of the
133        form (tb, proj, user) -> user that map the first tuple of
134        names to the user for for access purposes.  Names in the key (left side)
135        can include "<NONE> or <ANY>" to act as wildcards or to require the
136        fields to be empty.  Similarly aproj or auser can be <SAME> or
137        <DYNAMIC> indicating that either the matching key is to be used or a
138        dynamic user or project will be created.  These names can also be
139        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
140        directory that contains the DRAGON user credentials.
141        Testbed attributes outside the forms above can be given using the
142        format attribute: name value: value.  The name is a single word and the
143        value continues to the end of the line.  Empty lines and lines startin
144        with a # are ignored.
145
146        Parsing errors result in a self.parse_error exception being raised.
147        """
148        lineno=0
149        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
150        fedid_expr = "fedid:[" + string.hexdigits + "]+"
151        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
152
153        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
154                re.IGNORECASE)
155        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
156                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
157
158        def parse_name(n):
159            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
160            else: return n
161       
162        def auth_name(n):
163            if isinstance(n, basestring):
164                if n =='<any>' or n =='<none>': return None
165                else: return unicode(n)
166            else:
167                return n
168
169        f = open(config, "r");
170        for line in f:
171            lineno += 1
172            line = line.strip();
173            if len(line) == 0 or line.startswith('#'):
174                continue
175
176            # Extended (attribute: x value: y) attribute line
177            m = attr_re.match(line)
178            if m != None:
179                attr, val = m.group(1,2)
180                self.attrs[attr] = val
181                continue
182
183            # Access line (t, p, u) -> (a) line
184            # XXX: you are here
185            m = access_re.match(line)
186            if m != None:
187                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
188                auth_key = tuple([ auth_name(x) for x in access_key])
189                user_name = auth_name(parse_name(m.group(4)))
190
191                self.access[access_key] = user_name
192                self.auth.set_attribute(auth_key, "access")
193                continue
194
195            # Nothing matched to here: unknown line - raise exception
196            f.close()
197            raise self.parse_error("Unknown statement at line %d of %s" % \
198                    (lineno, config))
199        f.close()
200
201        print self.access
202
203    def get_users(self, obj):
204        """
205        Return a list of the IDs of the users in dict
206        """
207        if obj.has_key('user'):
208            return [ unpack_id(u['userID']) \
209                    for u in obj['user'] if u.has_key('userID') ]
210        else:
211            return None
212
213    def write_state(self):
214        if self.state_filename:
215            try:
216                f = open(self.state_filename, 'w')
217                pickle.dump(self.state, f)
218            except IOError, e:
219                self.log.error("Can't write file %s: %s" % \
220                        (self.state_filename, e))
221            except pickle.PicklingError, e:
222                self.log.error("Pickling problem: %s" % e)
223            except TypeError, e:
224                self.log.error("Pickling problem (TypeError): %s" % e)
225
226
227    def read_state(self):
228        """
229        Read a new copy of access state.  Old state is overwritten.
230
231        State format is a simple pickling of the state dictionary.
232        """
233        if self.state_filename:
234            try:
235                f = open(self.state_filename, "r")
236                self.state = pickle.load(f)
237                self.log.debug("[read_state]: Read state from %s" % \
238                        self.state_filename)
239            except IOError, e:
240                self.log.warning(("[read_state]: No saved state: " +\
241                        "Can't open %s: %s") % (self.state_filename, e))
242            except EOFError, e:
243                self.log.warning(("[read_state]: " +\
244                        "Empty or damaged state file: %s:") % \
245                        self.state_filename)
246            except pickle.UnpicklingError, e:
247                self.log.warning(("[read_state]: No saved state: " + \
248                        "Unpickling failed: %s") % e)
249
250            # Add the ownership attributes to the authorizer.  Note that the
251            # indices of the allocation dict are strings, but the attributes are
252            # fedids, so there is a conversion.
253            for k in self.state.keys():
254                for o in self.state[k].get('owners', []):
255                    self.auth.set_attribute(o, fedid(hexstr=k))
256                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
257
258
259    def permute_wildcards(self, a, p):
260        """Return a copy of a with various fields wildcarded.
261
262        The bits of p control the wildcards.  A set bit is a wildcard
263        replacement with the lowest bit being user then project then testbed.
264        """
265        if p & 1: user = ["<any>"]
266        else: user = a[2]
267        if p & 2: proj = "<any>"
268        else: proj = a[1]
269        if p & 4: tb = "<any>"
270        else: tb = a[0]
271
272        return (tb, proj, user)
273
274    def find_access(self, search):
275        """
276        Search the access DB for a match on this tuple.  Return the matching
277        user (repo dir).
278       
279        NB, if the initial tuple fails to match we start inserting wildcards in
280        an order determined by self.project_priority.  Try the list of users in
281        order (when wildcarded, there's only one user in the list).
282        """
283        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
284        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
285
286        for p in perm: 
287            s = self.permute_wildcards(search, p)
288            # s[2] is None on an anonymous, unwildcarded request
289            if s[2] != None:
290                for u in s[2]:
291                    if self.access.has_key((s[0], s[1], u)):
292                        return self.access[(s[0], s[1], u)]
293            else:
294                if self.access.has_key(s):
295                    return self.access[s]
296        return None
297
298    def lookup_access(self, req, fid):
299        """
300        Determine the allowed access for this request.  Return the access and
301        which fields are dynamic.
302
303        The fedid is needed to construct the request
304        """
305        # Search keys
306        tb = None
307        project = None
308        user = None
309        # Return values
310        rp = access_project(None, ())
311        ru = None
312
313        if req.has_key('project'):
314            p = req['project']
315            if p.has_key('name'):
316                project = unpack_id(p['name'])
317            user = self.get_users(p)
318        else:
319            user = self.get_users(req)
320
321        user_fedids = [ u for u in user if isinstance(u, fedid)]
322        # Determine how the caller is representing itself.  If its fedid shows
323        # up as a project or a singleton user, let that stand.  If neither the
324        # usernames nor the project name is a fedid, the caller is a testbed.
325        if project and isinstance(project, fedid):
326            if project == fid:
327                # The caller is the project (which is already in the tuple
328                # passed in to the authorizer)
329                owners = user_fedids
330                owners.append(project)
331            else:
332                raise service_error(service_error.req,
333                        "Project asserting different fedid")
334        else:
335            if fid not in user_fedids:
336                tb = fid
337                owners = user_fedids
338                owners.append(fid)
339            else:
340                if len(fedids) > 1:
341                    raise service_error(service_error.req,
342                            "User asserting different fedid")
343                else:
344                    # Which is a singleton
345                    owners = user_fedids
346        # Confirm authorization
347
348        for u in user:
349            self.log.debug("[lookup_access] Checking access for %s" % \
350                    ((tb, project, u),))
351            if self.auth.check_attribute((tb, project, u), 'access'):
352                self.log.debug("[lookup_access] Access granted")
353                break
354            else:
355                self.log.debug("[lookup_access] Access Denied")
356        else:
357            raise service_error(service_error.access, "Access denied")
358
359        # This maps a valid user to the Emulab projects and users to use
360        found = self.find_access((tb, project, user))
361       
362        if found == None:
363            raise service_error(service_error.access,
364                    "Access denied - cannot map access")
365        return found, owners
366
367    def RequestAccess(self, req, fid):
368        """
369        Handle the access request.  Proxy if not for us.
370
371        Parse out the fields and make the allocations or rejections if for us,
372        otherwise, assuming we're willing to proxy, proxy the request out.
373        """
374
375        # The dance to get into the request body
376        if req.has_key('RequestAccessRequestBody'):
377            req = req['RequestAccessRequestBody']
378        else:
379            raise service_error(service_error.req, "No request!?")
380
381        if req.has_key('destinationTestbed'):
382            dt = unpack_id(req['destinationTestbed'])
383
384        if dt == None or dt in self.testbed:
385            # Request for this fedd
386            found, owners = self.lookup_access(req, fid)
387            # keep track of what's been added
388            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
389            aid = unicode(allocID)
390
391            self.state_lock.acquire()
392            self.state[aid] = { }
393            self.state[aid]['user'] = found
394            self.state[aid]['owners'] = owners
395            self.write_state()
396            self.state_lock.release()
397            for o in owners:
398                self.auth.set_attribute(o, allocID)
399            self.auth.set_attribute(allocID, allocID)
400
401            try:
402                f = open("%s/%s.pem" % (self.certdir, aid), "w")
403                print >>f, alloc_cert
404                f.close()
405            except IOError, e:
406                raise service_error(service_error.internal, 
407                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
408            print { 'allocID': allocID }
409            return { 'allocID': { 'fedid': allocID } }
410        else:
411            if self.allow_proxy:
412                resp = self.proxy_RequestAccess.call_service(dt, req,
413                            self.cert_file, self.cert_pwd,
414                            self.trusted_certs)
415                if resp.has_key('RequestAccessResponseBody'):
416                    return resp['RequestAccessResponseBody']
417                else:
418                    return None
419            else:
420                raise service_error(service_error.access,
421                        "Access proxying denied")
422
423    def ReleaseAccess(self, req, fid):
424        # The dance to get into the request body
425        if req.has_key('ReleaseAccessRequestBody'):
426            req = req['ReleaseAccessRequestBody']
427        else:
428            raise service_error(service_error.req, "No request!?")
429
430        if req.has_key('destinationTestbed'):
431            dt = unpack_id(req['destinationTestbed'])
432        else:
433            dt = None
434
435        if dt == None or dt in self.testbed:
436            # Local request
437            try:
438                if req['allocID'].has_key('localname'):
439                    auth_attr = aid = req['allocID']['localname']
440                elif req['allocID'].has_key('fedid'):
441                    aid = unicode(req['allocID']['fedid'])
442                    auth_attr = req['allocID']['fedid']
443                else:
444                    raise service_error(service_error.req,
445                            "Only localnames and fedids are understood")
446            except KeyError:
447                raise service_error(service_error.req, "Badly formed request")
448
449            self.log.debug("[access] deallocation requested for %s", aid)
450            if not self.auth.check_attribute(fid, auth_attr):
451                self.log.debug("[access] deallocation denied for %s", aid)
452                raise service_error(service_error.access, "Access Denied")
453
454            self.state_lock.acquire()
455            if self.state.has_key(aid):
456                self.log.debug("Found allocation for %s" %aid)
457                del self.state[aid]
458                self.write_state()
459                self.state_lock.release()
460                # And remove the access cert
461                cf = "%s/%s.pem" % (self.certdir, aid)
462                self.log.debug("Removing %s" % cf)
463                os.remove(cf)
464                return { 'allocID': req['allocID'] } 
465            else:
466                self.state_lock.release()
467                raise service_error(service_error.req, "No such allocation")
468
469        else:
470            if self.allow_proxy:
471                resp = self.proxy_ReleaseAccess.call_service(dt, req,
472                            self.cert_file, self.cert_pwd,
473                            self.trusted_certs)
474                if resp.has_key('ReleaseAccessResponseBody'):
475                    return resp['ReleaseAccessResponseBody']
476                else:
477                    return None
478            else:
479                raise service_error(service_error.access,
480                        "Access proxying denied")
481
482    def extract_parameters(self, top):
483        """
484        DRAGON currently supports a fixed capacity link between two endpoints.
485        Those endpoints may specify a VPN (or list or range) to use.  This
486        extracts the DRAGON endpoints and vpn preferences from the segments (as
487        attributes) and the capacity of the connection as given by the
488        substrate.  The two endpoints VLAN choices are intersected to get set
489        of VLANs that are acceptable (no VLAN requiremnets means any is
490        acceptable).
491        """
492        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
493
494        print segments
495
496        if len(segments) != 2 or len(top.substrates) != 1:
497            raise service_error(service_error.req,
498                    "Requests to DRAGON must have exactlty two segments " +\
499                            "and one substrate")
500
501        ends = [ ]
502        for s in segments:
503            ep = s.get_attribute('dragon_endpoint')
504            if not ep:
505                raise service_error(service_error.req, 
506                        "Missing DRAGON endpoint foe %s" % s.name)
507            v = s.get_attribute('vlans')
508            vlans = None
509            vset = set()
510            # the vlans can be a single integer, a comma separated list or a
511            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
512            # 100,300-305,400
513            if v:
514                if v.count(",") > 0 :
515                    vl = [ x.strip() for x in v.split(",") ]
516                else:
517                    vl = [ v ]
518                for v in vl:
519                    try:
520                        if v.count("-")> 0:
521                            f, t = v.split("-", 1)
522                            for i in range(int(f), int(t)+1):
523                                vset.add(i)
524                        else:
525                            vset.add(int(v))
526                    except ValueError:
527                        raise service_error(service_error.req, 
528                                "VLAN tags must be integers (%s)" %s.name)
529            if len(vset) > 0:
530                if vlans: vlans &= vest
531                else: vlans = vset
532            ends.append(ep)
533
534
535        sub = top.substrates[0]
536        if sub.capacity:
537            cap = int(sub.capacity.rate / 1000.0)
538            if cap < 1: cap = 1
539        else:
540            cap = 100
541
542        return cap, ends[0], ends[1], vlans
543
544
545    def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
546            log=None):
547        """
548        Do the actual work of creating the dragon connecton.
549        """
550        if not log: log = self.log
551        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
552        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
553        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
554        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
555        path_re = re.compile("Path:")
556
557        if not vpns:
558            vpns = [ None, None, None, None, None]
559
560        if not start:
561            start = time.time()
562        if not end:
563            end = start + 5 *60
564
565
566        status = None
567        gri = None
568        rv = None
569        vlan_no = None
570        for v in vpns:
571            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
572                'createReservation', '-repo',  repo , '-url', self.idc_url,
573                '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
574                '-vlan', "%s" % v, '-desc', 'fedd created connection',
575                '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
576                '-end', "%d" % int(end)]
577            log.debug("[start_segment]: %s" % " ".join(cmd))
578            if not self.create_debug: 
579                p = Popen(cmd, cwd=self.cli_dir, 
580                        stdout=PIPE, stderr=STDOUT,
581                        close_fds=True)
582                for line in p.stdout:
583                    m = status_re.match(line)
584                    if m:
585                        status = m.group(1)
586                        continue
587                    m = gri_re.match(line)
588                    if m:
589                        gri = m.group(1)
590                rv = p.wait()
591            else: 
592                rv = 0
593                status = 'ACCEPTED'
594                gri = 'debug_gri'
595
596            # Reservation in progress.  Poll the IDC until we know the outcome
597            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
598                'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
599
600            while status in ('ACCEPTED', 'INSETUP', 'PENDING'):
601                time.sleep(5)
602                log.debug("[start_segment]: %s" % " ".join(cmd))
603                if not self.create_debug:
604                    p = Popen(cmd, cwd=self.cli_dir, 
605                            stdout=PIPE, stderr=STDOUT,
606                            close_fds=True)
607                    in_path = False
608                    vpn1 = None
609                    vpn2 = None
610                    src = None
611                    dest = None
612                    for line in p.stdout:
613                        if not in_path:
614                            m = status_re.match(line)
615                            if m: 
616                                status = m.group(1)
617                                continue
618                            m = source_re.match(line)
619                            if m:
620                                src = m.group(1)
621                                continue
622                            m = dest_re.match(line)
623                            if m:
624                                dest = m.group(1)
625                                continue
626                            m = path_re.match(line)
627                            if m:
628                                in_path = True
629                                if src and dest:
630                                    vpn1_re = re.compile(
631                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
632                                                    src.replace("*", "\*"))
633                                    vpn2_re = re.compile(
634                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
635                                                    dest.replace("*", "\*"))
636                                else:
637                                    raise service_error(service_error.internal, 
638                                            "Strange output from query")
639                        else:
640                            m = vpn1_re.match(line)
641                            if m:
642                                vpn1 = m.group(1)
643                                continue
644                            m = vpn2_re.match(line)
645                            if m:
646                                vpn2 = m.group(1)
647                                continue
648                    rv = p.wait()
649                    if vpn1 == vpn2:
650                        if v is not None:
651                            if int(vpn1) == v:
652                                vlan_no = int(v)
653                            else:
654                                raise service_error(service_error.federant, 
655                                        "Unexpected vlan assignment")
656                        else:
657                            vlan_no = int(v)
658                    else:
659                        raise service_error(service_error.internal,
660                                "Different VPNs on DRAGON ends")
661                    log.debug("Status: %s" % status or "none")
662                else:
663                    status = 'ACTIVE'
664                    vlan_no = int(v) or 1
665            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
666                break
667
668        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
669            return gri, vlan_no
670        else:
671            raise service_error(service_error.federant, "Cannot make reservation")
672
673    def stop_segment(self, repo, gri, log=None):
674        if not log: log = self.log
675        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
676            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
677
678
679        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
680        if not self.create_debug:
681            try:
682                f = open("/dev/null", "w")
683                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
684                f.close()
685            except IOError, e:
686                raise service_error(service_error.internal, 
687                        "Failed to open /dev/null: %s" % e)
688
689    def StartSegment(self, req, fid):
690        err = None  # Any service_error generated after tmpdir is created
691        rv = None   # Return value from segment creation
692
693        try:
694            req = req['StartSegmentRequestBody']
695        except KeyError:
696            raise service_error(server_error.req, "Badly formed request")
697
698        auth_attr = req['allocID']['fedid']
699        aid = "%s" % auth_attr
700        attrs = req.get('fedAttr', [])
701        if not self.auth.check_attribute(fid, auth_attr):
702            raise service_error(service_error.access, "Access denied")
703
704        if req.has_key('segmentdescription') and \
705                req['segmentdescription'].has_key('topdldescription'):
706            topo = \
707                topdl.Topology(**req['segmentdescription']['topdldescription'])
708        else:
709            raise service_error(service_error.req, 
710                    "Request missing segmentdescription'")
711
712        cap, src, dest, vlans = self.extract_parameters(topo)
713        ename = aid
714
715        for a in attrs:
716            if a['attribute'] == 'experiment_name':
717                ename = a['value']
718
719        repo = None
720        self.state_lock.acquire()
721        if self.state.has_key(aid):
722            repo = self.state[aid]['user']
723            self.state[aid]['log'] = [ ]
724            # Create a logger that logs to the experiment's state object as
725            # well as to the main log file.
726            alloc_log = logging.getLogger('fedd.access.%s' % ename)
727            h = logging.StreamHandler(
728                    list_log.list_log(self.state[aid]['log']))
729            # XXX: there should be a global one of these rather than
730            # repeating the code.
731            h.setFormatter(logging.Formatter(
732                "%(asctime)s %(name)s %(message)s",
733                        '%d %b %y %H:%M:%S'))
734            alloc_log.addHandler(h)
735            self.write_state()
736        self.state_lock.release()
737
738        if not repo:
739            raise service_error(service_error.internal, 
740                    "Can't find creation user for %s" %aid)
741
742        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
743                log=alloc_log)
744
745        if gri:
746            # Grab the log (this is some anal locking, but better safe than
747            # sorry)
748            self.state_lock.acquire()
749            self.state[aid]['gri'] = gri
750            logv = "".join(self.state[aid]['log'])
751            self.write_state()
752            self.state_lock.release()
753
754            return { 
755                    'allocID': req['allocID'],
756                    'allocationLog': logv,
757                    'fedAttr': [
758                        {'attribute': 'vlan', 'value': '%d' % vlan_no }
759                        ]
760                    }
761        elif err:
762            raise service_error(service_error.federant,
763                    "Swapin failed: %s" % err)
764        else:
765            raise service_error(service_error.federant, "Swapin failed")
766
767    def TerminateSegment(self, req, fid):
768        try:
769            req = req['TerminateSegmentRequestBody']
770        except KeyError:
771            raise service_error(server_error.req, "Badly formed request")
772
773        auth_attr = req['allocID']['fedid']
774        aid = "%s" % auth_attr
775        attrs = req.get('fedAttr', [])
776        if not self.auth.check_attribute(fid, auth_attr):
777            raise service_error(service_error.access, "Access denied")
778
779        self.state_lock.acquire()
780        if self.state.has_key(aid):
781            gri = self.state[aid].get('gri', None)
782            user = self.state[aid].get('user', None)
783        else:
784            gri = None
785            user = None
786        self.state_lock.release()
787
788        if not gri:
789            raise service_error(service_error.internal, 
790                    "Can't find gri for %s" % aid)
791
792        if not user:
793            raise service_error(service_error.internal, 
794                    "Can't find creation user for %s" % aid)
795
796        self.stop_segment(user, gri)
797        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.