source: fedd/federation/dragon_access.py @ 36fec1b

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 36fec1b was 36fec1b, checked in by Ted Faber <faber@…>, 14 years ago

order dragon endpoints

  • Property mode set to 100644
File size: 27.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import *
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to dragon_access.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = False
64        self.certdir = config.get("access","certdir")
65        self.create_debug = config.getboolean("access", "create_debug")
66        self.cli_dir = config.get("access", "cli_dir")
67        self.axis2_home = config.get("access", "axis2_home")
68        self.idc_url = config.get("access", "idc")
69        self.domain = config.get("access", "domain")
70
71        self.attrs = { }
72        self.access = { }
73        # State is a dict of dicts indexed by segment fedid that includes the
74        # owners of the segment as fedids (who can manipulate it, key: owners),
75        # the repo dir/user for the allocation (key: user),  Current allocation
76        # log (key: log), and GRI of the reservation once made (key: gri)
77        self.state = { }
78        self.log = logging.getLogger("fedd.access")
79        set_log_level(config, "access", self.log)
80        self.state_lock = Lock()
81        if not (self.cli_dir and self.axis2_home and self.idc_url):
82            self.log.error("Must specify all of cli_dir, axis2_home, " +\
83                    "idc in the [access] section of the configuration")
84
85        if auth: self.auth = auth
86        else:
87            self.log.error(\
88                    "[access]: No authorizer initialized, creating local one.")
89            auth = authorizer()
90
91        tb = config.get('access', 'testbed')
92        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
93        else: self.testbed = [ ]
94
95        if config.has_option("access", "accessdb"):
96            self.read_access(config.get("access", "accessdb"))
97
98        self.state_filename = config.get("access", "access_state")
99        self.read_state()
100
101        # Keep cert_file and cert_pwd coming from the same place
102        self.cert_file = config.get("access", "cert_file")
103        if self.cert_file:
104            self.sert_pwd = config.get("access", "cert_pw")
105        else:
106            self.cert_file = config.get("globals", "cert_file")
107            self.sert_pwd = config.get("globals", "cert_pw")
108
109        self.trusted_certs = config.get("access", "trusted_certs") or \
110                config.get("globals", "trusted_certs")
111
112        self.soap_services = {\
113            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
114            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
115            'StartSegment': soap_handler("StartSegment", self.StartSegment),
116            'TerminateSegment': soap_handler("TerminateSegment", 
117                self.TerminateSegment),
118            }
119        self.xmlrpc_services =  {\
120            'RequestAccess': xmlrpc_handler('RequestAccess',
121                self.RequestAccess),
122            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
123                self.ReleaseAccess),
124            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
125            'TerminateSegment': xmlrpc_handler('TerminateSegment',
126                self.TerminateSegment),
127            }
128
129    def read_access(self, config):
130        """
131        Read a configuration file and set internal parameters.
132
133        There are access lines of the
134        form (tb, proj, user) -> user that map the first tuple of
135        names to the user for for access purposes.  Names in the key (left side)
136        can include "<NONE> or <ANY>" to act as wildcards or to require the
137        fields to be empty.  Similarly aproj or auser can be <SAME> or
138        <DYNAMIC> indicating that either the matching key is to be used or a
139        dynamic user or project will be created.  These names can also be
140        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
141        directory that contains the DRAGON user credentials.
142        Testbed attributes outside the forms above can be given using the
143        format attribute: name value: value.  The name is a single word and the
144        value continues to the end of the line.  Empty lines and lines startin
145        with a # are ignored.
146
147        Parsing errors result in a self.parse_error exception being raised.
148        """
149        lineno=0
150        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
151        fedid_expr = "fedid:[" + string.hexdigits + "]+"
152        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
153
154        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
155                re.IGNORECASE)
156        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
157                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
158
159        def parse_name(n):
160            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
161            else: return n
162       
163        def auth_name(n):
164            if isinstance(n, basestring):
165                if n =='<any>' or n =='<none>': return None
166                else: return unicode(n)
167            else:
168                return n
169
170        f = open(config, "r");
171        for line in f:
172            lineno += 1
173            line = line.strip();
174            if len(line) == 0 or line.startswith('#'):
175                continue
176
177            # Extended (attribute: x value: y) attribute line
178            m = attr_re.match(line)
179            if m != None:
180                attr, val = m.group(1,2)
181                self.attrs[attr] = val
182                continue
183
184            # Access line (t, p, u) -> (a) line
185            # XXX: you are here
186            m = access_re.match(line)
187            if m != None:
188                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
189                auth_key = tuple([ auth_name(x) for x in access_key])
190                user_name = auth_name(parse_name(m.group(4)))
191
192                self.access[access_key] = user_name
193                self.auth.set_attribute(auth_key, "access")
194                continue
195
196            # Nothing matched to here: unknown line - raise exception
197            f.close()
198            raise self.parse_error("Unknown statement at line %d of %s" % \
199                    (lineno, config))
200        f.close()
201
202        print self.access
203
204    def get_users(self, obj):
205        """
206        Return a list of the IDs of the users in dict
207        """
208        if obj.has_key('user'):
209            return [ unpack_id(u['userID']) \
210                    for u in obj['user'] if u.has_key('userID') ]
211        else:
212            return None
213
214    def write_state(self):
215        if self.state_filename:
216            try:
217                f = open(self.state_filename, 'w')
218                pickle.dump(self.state, f)
219            except IOError, e:
220                self.log.error("Can't write file %s: %s" % \
221                        (self.state_filename, e))
222            except pickle.PicklingError, e:
223                self.log.error("Pickling problem: %s" % e)
224            except TypeError, e:
225                self.log.error("Pickling problem (TypeError): %s" % e)
226
227
228    def read_state(self):
229        """
230        Read a new copy of access state.  Old state is overwritten.
231
232        State format is a simple pickling of the state dictionary.
233        """
234        if self.state_filename:
235            try:
236                f = open(self.state_filename, "r")
237                self.state = pickle.load(f)
238                self.log.debug("[read_state]: Read state from %s" % \
239                        self.state_filename)
240            except IOError, e:
241                self.log.warning(("[read_state]: No saved state: " +\
242                        "Can't open %s: %s") % (self.state_filename, e))
243            except EOFError, e:
244                self.log.warning(("[read_state]: " +\
245                        "Empty or damaged state file: %s:") % \
246                        self.state_filename)
247            except pickle.UnpicklingError, e:
248                self.log.warning(("[read_state]: No saved state: " + \
249                        "Unpickling failed: %s") % e)
250
251            # Add the ownership attributes to the authorizer.  Note that the
252            # indices of the allocation dict are strings, but the attributes are
253            # fedids, so there is a conversion.
254            for k in self.state.keys():
255                for o in self.state[k].get('owners', []):
256                    self.auth.set_attribute(o, fedid(hexstr=k))
257                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
258
259
260    def permute_wildcards(self, a, p):
261        """Return a copy of a with various fields wildcarded.
262
263        The bits of p control the wildcards.  A set bit is a wildcard
264        replacement with the lowest bit being user then project then testbed.
265        """
266        if p & 1: user = ["<any>"]
267        else: user = a[2]
268        if p & 2: proj = "<any>"
269        else: proj = a[1]
270        if p & 4: tb = "<any>"
271        else: tb = a[0]
272
273        return (tb, proj, user)
274
275    def find_access(self, search):
276        """
277        Search the access DB for a match on this tuple.  Return the matching
278        user (repo dir).
279       
280        NB, if the initial tuple fails to match we start inserting wildcards in
281        an order determined by self.project_priority.  Try the list of users in
282        order (when wildcarded, there's only one user in the list).
283        """
284        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
285        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
286
287        for p in perm: 
288            s = self.permute_wildcards(search, p)
289            # s[2] is None on an anonymous, unwildcarded request
290            if s[2] != None:
291                for u in s[2]:
292                    if self.access.has_key((s[0], s[1], u)):
293                        return self.access[(s[0], s[1], u)]
294            else:
295                if self.access.has_key(s):
296                    return self.access[s]
297        return None
298
299    def lookup_access(self, req, fid):
300        """
301        Determine the allowed access for this request.  Return the access and
302        which fields are dynamic.
303
304        The fedid is needed to construct the request
305        """
306        # Search keys
307        tb = None
308        project = None
309        user = None
310        # Return values
311        rp = access_project(None, ())
312        ru = None
313
314        if req.has_key('project'):
315            p = req['project']
316            if p.has_key('name'):
317                project = unpack_id(p['name'])
318            user = self.get_users(p)
319        else:
320            user = self.get_users(req)
321
322        user_fedids = [ u for u in user if isinstance(u, fedid)]
323        # Determine how the caller is representing itself.  If its fedid shows
324        # up as a project or a singleton user, let that stand.  If neither the
325        # usernames nor the project name is a fedid, the caller is a testbed.
326        if project and isinstance(project, fedid):
327            if project == fid:
328                # The caller is the project (which is already in the tuple
329                # passed in to the authorizer)
330                owners = user_fedids
331                owners.append(project)
332            else:
333                raise service_error(service_error.req,
334                        "Project asserting different fedid")
335        else:
336            if fid not in user_fedids:
337                tb = fid
338                owners = user_fedids
339                owners.append(fid)
340            else:
341                if len(fedids) > 1:
342                    raise service_error(service_error.req,
343                            "User asserting different fedid")
344                else:
345                    # Which is a singleton
346                    owners = user_fedids
347        # Confirm authorization
348
349        for u in user:
350            self.log.debug("[lookup_access] Checking access for %s" % \
351                    ((tb, project, u),))
352            if self.auth.check_attribute((tb, project, u), 'access'):
353                self.log.debug("[lookup_access] Access granted")
354                break
355            else:
356                self.log.debug("[lookup_access] Access Denied")
357        else:
358            raise service_error(service_error.access, "Access denied")
359
360        # This maps a valid user to the Emulab projects and users to use
361        found = self.find_access((tb, project, user))
362       
363        if found == None:
364            raise service_error(service_error.access,
365                    "Access denied - cannot map access")
366        return found, owners
367
368    def RequestAccess(self, req, fid):
369        """
370        Handle the access request.  Proxy if not for us.
371
372        Parse out the fields and make the allocations or rejections if for us,
373        otherwise, assuming we're willing to proxy, proxy the request out.
374        """
375
376        # The dance to get into the request body
377        if req.has_key('RequestAccessRequestBody'):
378            req = req['RequestAccessRequestBody']
379        else:
380            raise service_error(service_error.req, "No request!?")
381
382        if req.has_key('destinationTestbed'):
383            dt = unpack_id(req['destinationTestbed'])
384
385        if dt == None or dt in self.testbed:
386            # Request for this fedd
387            found, owners = self.lookup_access(req, fid)
388            # keep track of what's been added
389            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
390            aid = unicode(allocID)
391
392            self.state_lock.acquire()
393            self.state[aid] = { }
394            self.state[aid]['user'] = found
395            self.state[aid]['owners'] = owners
396            self.write_state()
397            self.state_lock.release()
398            for o in owners:
399                self.auth.set_attribute(o, allocID)
400            self.auth.set_attribute(allocID, allocID)
401
402            try:
403                f = open("%s/%s.pem" % (self.certdir, aid), "w")
404                print >>f, alloc_cert
405                f.close()
406            except IOError, e:
407                raise service_error(service_error.internal, 
408                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
409            print { 'allocID': allocID }
410            return { 'allocID': { 'fedid': allocID } }
411        else:
412            if self.allow_proxy:
413                resp = self.proxy_RequestAccess.call_service(dt, req,
414                            self.cert_file, self.cert_pwd,
415                            self.trusted_certs)
416                if resp.has_key('RequestAccessResponseBody'):
417                    return resp['RequestAccessResponseBody']
418                else:
419                    return None
420            else:
421                raise service_error(service_error.access,
422                        "Access proxying denied")
423
424    def ReleaseAccess(self, req, fid):
425        # The dance to get into the request body
426        if req.has_key('ReleaseAccessRequestBody'):
427            req = req['ReleaseAccessRequestBody']
428        else:
429            raise service_error(service_error.req, "No request!?")
430
431        if req.has_key('destinationTestbed'):
432            dt = unpack_id(req['destinationTestbed'])
433        else:
434            dt = None
435
436        if dt == None or dt in self.testbed:
437            # Local request
438            try:
439                if req['allocID'].has_key('localname'):
440                    auth_attr = aid = req['allocID']['localname']
441                elif req['allocID'].has_key('fedid'):
442                    aid = unicode(req['allocID']['fedid'])
443                    auth_attr = req['allocID']['fedid']
444                else:
445                    raise service_error(service_error.req,
446                            "Only localnames and fedids are understood")
447            except KeyError:
448                raise service_error(service_error.req, "Badly formed request")
449
450            self.log.debug("[access] deallocation requested for %s", aid)
451            if not self.auth.check_attribute(fid, auth_attr):
452                self.log.debug("[access] deallocation denied for %s", aid)
453                raise service_error(service_error.access, "Access Denied")
454
455            self.state_lock.acquire()
456            if self.state.has_key(aid):
457                self.log.debug("Found allocation for %s" %aid)
458                del self.state[aid]
459                self.write_state()
460                self.state_lock.release()
461                # And remove the access cert
462                cf = "%s/%s.pem" % (self.certdir, aid)
463                self.log.debug("Removing %s" % cf)
464                os.remove(cf)
465                return { 'allocID': req['allocID'] } 
466            else:
467                self.state_lock.release()
468                raise service_error(service_error.req, "No such allocation")
469
470        else:
471            if self.allow_proxy:
472                resp = self.proxy_ReleaseAccess.call_service(dt, req,
473                            self.cert_file, self.cert_pwd,
474                            self.trusted_certs)
475                if resp.has_key('ReleaseAccessResponseBody'):
476                    return resp['ReleaseAccessResponseBody']
477                else:
478                    return None
479            else:
480                raise service_error(service_error.access,
481                        "Access proxying denied")
482
483    def extract_parameters(self, top):
484        """
485        DRAGON currently supports a fixed capacity link between two endpoints.
486        Those endpoints may specify a VPN (or list or range) to use.  This
487        extracts the DRAGON endpoints and vpn preferences from the segments (as
488        attributes) and the capacity of the connection as given by the
489        substrate.  The two endpoints VLAN choices are intersected to get set
490        of VLANs that are acceptable (no VLAN requiremnets means any is
491        acceptable).
492        """
493        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
494
495        print segments
496
497        if len(segments) != 2 or len(top.substrates) != 1:
498            raise service_error(service_error.req,
499                    "Requests to DRAGON must have exactlty two segments " +\
500                            "and one substrate")
501
502        ends = [ ]
503        for s in segments:
504            ep = s.get_attribute('dragon_endpoint')
505            if not ep:
506                raise service_error(service_error.req, 
507                        "Missing DRAGON endpoint for %s" % s.id)
508            v = s.get_attribute('vlans')
509            vlans = None
510            vset = set()
511            # the vlans can be a single integer, a comma separated list or a
512            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
513            # 100,300-305,400
514            if v:
515                if v.count(",") > 0 :
516                    vl = [ x.strip() for x in v.split(",") ]
517                else:
518                    vl = [ v ]
519                for v in vl:
520                    try:
521                        if v.count("-")> 0:
522                            f, t = v.split("-", 1)
523                            for i in range(int(f), int(t)+1):
524                                vset.add(i)
525                        else:
526                            vset.add(int(v))
527                    except ValueError:
528                        raise service_error(service_error.req, 
529                                "VLAN tags must be integers (%s)" %s.name)
530            if len(vset) > 0:
531                if vlans: vlans &= vest
532                else: vlans = vset
533            ends.append(ep)
534
535
536        sub = top.substrates[0]
537        if sub.capacity:
538            cap = int(sub.capacity.rate / 1000.0)
539            if cap < 1: cap = 1
540        else:
541            cap = 100
542
543
544        # DRAGON's command line tool barfs if the source (ends[0]) is not in
545        # the domain controlled by the IDC.  This code ensures this situation.
546        if self.domain and not ends[0].endswith(self.domain):
547            hold = ends[0]
548            for i, e in enumerate(ends):
549                if i == 0: continue
550                if e.endswith(self.domain):
551                    ends[0] = e
552                    ends[i] = hold
553                    break
554            else:
555                raise service_error(service_error.req, 
556                        "No endpoint in my domain")
557
558
559        return cap, ends[0], ends[1], vlans
560
561
562    def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
563            log=None):
564        """
565        Do the actual work of creating the dragon connecton.
566        """
567        if not log: log = self.log
568        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
569        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
570        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
571        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
572        path_re = re.compile("Path:")
573
574        if not vpns:
575            vpns = [ None, None, None, None, None]
576
577        if not start:
578            start = time.time()
579        if not end:
580            end = start + 120 *60
581
582
583        status = None
584        gri = None
585        rv = None
586        vlan_no = None
587        for v in vpns:
588            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
589                'createReservation', '-repo',  repo , '-url', self.idc_url,
590                '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
591                '-vlan', "%s" % v, '-desc', 'fedd created connection',
592                '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
593                '-end', "%d" % int(end)]
594            log.debug("[start_segment]: %s" % " ".join(cmd))
595            if not self.create_debug: 
596                p = Popen(cmd, cwd=self.cli_dir, 
597                        stdout=PIPE, stderr=STDOUT,
598                        close_fds=True)
599                for line in p.stdout:
600                    m = status_re.match(line)
601                    if m:
602                        status = m.group(1)
603                        continue
604                    m = gri_re.match(line)
605                    if m:
606                        gri = m.group(1)
607                rv = p.wait()
608            else: 
609                rv = 0
610                status = 'ACCEPTED'
611                gri = 'debug_gri'
612
613            # Reservation in progress.  Poll the IDC until we know the outcome
614            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
615                'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
616
617            while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
618                log.debug("[start_segment]: %s" % " ".join(cmd))
619                if not self.create_debug:
620                    p = Popen(cmd, cwd=self.cli_dir, 
621                            stdout=PIPE, stderr=STDOUT,
622                            close_fds=True)
623                    in_path = False
624                    vpn1 = None
625                    vpn2 = None
626                    src = None
627                    dest = None
628                    for line in p.stdout:
629                        if not in_path:
630                            m = status_re.match(line)
631                            if m: 
632                                status = m.group(1)
633                                continue
634                            m = source_re.match(line)
635                            if m:
636                                src = m.group(1)
637                                continue
638                            m = dest_re.match(line)
639                            if m:
640                                dest = m.group(1)
641                                continue
642                            m = path_re.match(line)
643                            if m:
644                                in_path = True
645                                if src and dest:
646                                    vpn1_re = re.compile(
647                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
648                                                    src.replace("*", "\*"))
649                                    vpn2_re = re.compile(
650                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
651                                                    dest.replace("*", "\*"))
652                                else:
653                                    raise service_error(service_error.internal, 
654                                            "Strange output from query")
655                        else:
656                            m = vpn1_re.match(line)
657                            if m:
658                                vpn1 = m.group(1)
659                                continue
660                            m = vpn2_re.match(line)
661                            if m:
662                                vpn2 = m.group(1)
663                                continue
664                    rv = p.wait()
665                    if vpn1 == vpn2:
666                        if v is not None:
667                            if int(vpn1) == v:
668                                vlan_no = int(v)
669                            else:
670                                raise service_error(service_error.federant, 
671                                        "Unexpected vlan assignment")
672                        else:
673                            vlan_no = int(v)
674                    else:
675                        raise service_error(service_error.internal,
676                                "Different VPNs on DRAGON ends")
677                    log.debug("Status: %s" % status or "none")
678                else:
679                    status = 'ACTIVE'
680                    vlan_no = int(v) or 1
681                if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
682                    time.sleep(45)
683            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
684                break
685
686        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
687            return gri, vlan_no
688        else:
689            raise service_error(service_error.federant, 
690                    "Cannot make reservation")
691
692    def stop_segment(self, repo, gri, log=None):
693        if not log: log = self.log
694        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
695            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
696
697
698        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
699        if not self.create_debug:
700            try:
701                f = open("/dev/null", "w")
702                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
703                f.close()
704            except IOError, e:
705                raise service_error(service_error.internal, 
706                        "Failed to open /dev/null: %s" % e)
707
708    def StartSegment(self, req, fid):
709        err = None  # Any service_error generated after tmpdir is created
710        rv = None   # Return value from segment creation
711
712        try:
713            req = req['StartSegmentRequestBody']
714        except KeyError:
715            raise service_error(server_error.req, "Badly formed request")
716
717        auth_attr = req['allocID']['fedid']
718        aid = "%s" % auth_attr
719        attrs = req.get('fedAttr', [])
720        if not self.auth.check_attribute(fid, auth_attr):
721            raise service_error(service_error.access, "Access denied")
722
723        if req.has_key('segmentdescription') and \
724                req['segmentdescription'].has_key('topdldescription'):
725            topo = \
726                topdl.Topology(**req['segmentdescription']['topdldescription'])
727        else:
728            raise service_error(service_error.req, 
729                    "Request missing segmentdescription'")
730
731        cap, src, dest, vlans = self.extract_parameters(topo)
732        ename = aid
733
734        for a in attrs:
735            if a['attribute'] == 'experiment_name':
736                ename = a['value']
737
738        repo = None
739        self.state_lock.acquire()
740        if self.state.has_key(aid):
741            repo = self.state[aid]['user']
742            self.state[aid]['log'] = [ ]
743            # Create a logger that logs to the experiment's state object as
744            # well as to the main log file.
745            alloc_log = logging.getLogger('fedd.access.%s' % ename)
746            h = logging.StreamHandler(
747                    list_log.list_log(self.state[aid]['log']))
748            # XXX: there should be a global one of these rather than
749            # repeating the code.
750            h.setFormatter(logging.Formatter(
751                "%(asctime)s %(name)s %(message)s",
752                        '%d %b %y %H:%M:%S'))
753            alloc_log.addHandler(h)
754            self.write_state()
755        self.state_lock.release()
756
757        if not repo:
758            raise service_error(service_error.internal, 
759                    "Can't find creation user for %s" %aid)
760
761        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
762                log=alloc_log)
763
764        if gri:
765            # Grab the log (this is some anal locking, but better safe than
766            # sorry)
767            self.state_lock.acquire()
768            self.state[aid]['gri'] = gri
769            logv = "".join(self.state[aid]['log'])
770            self.write_state()
771            self.state_lock.release()
772
773            return { 
774                    'allocID': req['allocID'],
775                    'allocationLog': logv,
776                    'fedAttr': [
777                        {'attribute': 'vlan', 'value': '%d' % vlan_no }
778                        ]
779                    }
780        elif err:
781            raise service_error(service_error.federant,
782                    "Swapin failed: %s" % err)
783        else:
784            raise service_error(service_error.federant, "Swapin failed")
785
786    def TerminateSegment(self, req, fid):
787        try:
788            req = req['TerminateSegmentRequestBody']
789        except KeyError:
790            raise service_error(server_error.req, "Badly formed request")
791
792        auth_attr = req['allocID']['fedid']
793        aid = "%s" % auth_attr
794
795        self.log.debug("Terminate request for %s" %aid)
796        attrs = req.get('fedAttr', [])
797        if not self.auth.check_attribute(fid, auth_attr):
798            raise service_error(service_error.access, "Access denied")
799
800        self.state_lock.acquire()
801        if self.state.has_key(aid):
802            gri = self.state[aid].get('gri', None)
803            user = self.state[aid].get('user', None)
804        else:
805            gri = None
806            user = None
807        self.state_lock.release()
808        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
809
810        if not gri:
811            raise service_error(service_error.internal, 
812                    "Can't find gri for %s" % aid)
813
814        if not user:
815            raise service_error(service_error.internal, 
816                    "Can't find creation user for %s" % aid)
817   
818        self.log.debug("Stop segment for GRI: %s" %gri)
819        self.stop_segment(user, gri)
820        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.