source: fedd/federation/dragon_access.py @ e14f472

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since e14f472 was 9fea17b, checked in by Ted Faber <faber@…>, 15 years ago

remove extra DRAGON sleep (commented out in an earlier mis-aimed commit)

  • Property mode set to 100644
File size: 27.4 KB
RevLine 
[23dec62]1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import *
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to dragon_access.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = False
64        self.certdir = config.get("access","certdir")
65        self.create_debug = config.getboolean("access", "create_debug")
66        self.cli_dir = config.get("access", "cli_dir")
67        self.axis2_home = config.get("access", "axis2_home")
68        self.idc_url = config.get("access", "idc")
[36fec1b]69        self.domain = config.get("access", "domain")
[23dec62]70
71        self.attrs = { }
72        self.access = { }
73        # State is a dict of dicts indexed by segment fedid that includes the
74        # owners of the segment as fedids (who can manipulate it, key: owners),
75        # the repo dir/user for the allocation (key: user),  Current allocation
76        # log (key: log), and GRI of the reservation once made (key: gri)
77        self.state = { }
78        self.log = logging.getLogger("fedd.access")
79        set_log_level(config, "access", self.log)
80        self.state_lock = Lock()
81        if not (self.cli_dir and self.axis2_home and self.idc_url):
82            self.log.error("Must specify all of cli_dir, axis2_home, " +\
83                    "idc in the [access] section of the configuration")
84
85        if auth: self.auth = auth
86        else:
87            self.log.error(\
88                    "[access]: No authorizer initialized, creating local one.")
89            auth = authorizer()
90
91        tb = config.get('access', 'testbed')
92        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
93        else: self.testbed = [ ]
94
95        if config.has_option("access", "accessdb"):
96            self.read_access(config.get("access", "accessdb"))
97
98        self.state_filename = config.get("access", "access_state")
99        self.read_state()
100
101        # Keep cert_file and cert_pwd coming from the same place
102        self.cert_file = config.get("access", "cert_file")
103        if self.cert_file:
104            self.sert_pwd = config.get("access", "cert_pw")
105        else:
106            self.cert_file = config.get("globals", "cert_file")
107            self.sert_pwd = config.get("globals", "cert_pw")
108
109        self.trusted_certs = config.get("access", "trusted_certs") or \
110                config.get("globals", "trusted_certs")
111
112        self.soap_services = {\
113            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
114            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
115            'StartSegment': soap_handler("StartSegment", self.StartSegment),
116            'TerminateSegment': soap_handler("TerminateSegment", 
117                self.TerminateSegment),
118            }
119        self.xmlrpc_services =  {\
120            'RequestAccess': xmlrpc_handler('RequestAccess',
121                self.RequestAccess),
122            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
123                self.ReleaseAccess),
124            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
125            'TerminateSegment': xmlrpc_handler('TerminateSegment',
126                self.TerminateSegment),
127            }
128
129    def read_access(self, config):
130        """
131        Read a configuration file and set internal parameters.
132
133        There are access lines of the
134        form (tb, proj, user) -> user that map the first tuple of
135        names to the user for for access purposes.  Names in the key (left side)
136        can include "<NONE> or <ANY>" to act as wildcards or to require the
137        fields to be empty.  Similarly aproj or auser can be <SAME> or
138        <DYNAMIC> indicating that either the matching key is to be used or a
139        dynamic user or project will be created.  These names can also be
140        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
141        directory that contains the DRAGON user credentials.
142        Testbed attributes outside the forms above can be given using the
143        format attribute: name value: value.  The name is a single word and the
144        value continues to the end of the line.  Empty lines and lines startin
145        with a # are ignored.
146
147        Parsing errors result in a self.parse_error exception being raised.
148        """
149        lineno=0
150        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
151        fedid_expr = "fedid:[" + string.hexdigits + "]+"
152        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
153
154        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
155                re.IGNORECASE)
156        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
157                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
158
159        def parse_name(n):
160            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
161            else: return n
162       
163        def auth_name(n):
164            if isinstance(n, basestring):
165                if n =='<any>' or n =='<none>': return None
166                else: return unicode(n)
167            else:
168                return n
169
170        f = open(config, "r");
171        for line in f:
172            lineno += 1
173            line = line.strip();
174            if len(line) == 0 or line.startswith('#'):
175                continue
176
177            # Extended (attribute: x value: y) attribute line
178            m = attr_re.match(line)
179            if m != None:
180                attr, val = m.group(1,2)
181                self.attrs[attr] = val
182                continue
183
184            # Access line (t, p, u) -> (a) line
185            # XXX: you are here
186            m = access_re.match(line)
187            if m != None:
188                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
189                auth_key = tuple([ auth_name(x) for x in access_key])
190                user_name = auth_name(parse_name(m.group(4)))
191
192                self.access[access_key] = user_name
193                self.auth.set_attribute(auth_key, "access")
194                continue
195
196            # Nothing matched to here: unknown line - raise exception
197            f.close()
198            raise self.parse_error("Unknown statement at line %d of %s" % \
199                    (lineno, config))
200        f.close()
201
202    def get_users(self, obj):
203        """
204        Return a list of the IDs of the users in dict
205        """
206        if obj.has_key('user'):
207            return [ unpack_id(u['userID']) \
208                    for u in obj['user'] if u.has_key('userID') ]
209        else:
210            return None
211
212    def write_state(self):
213        if self.state_filename:
214            try:
215                f = open(self.state_filename, 'w')
216                pickle.dump(self.state, f)
217            except IOError, e:
218                self.log.error("Can't write file %s: %s" % \
219                        (self.state_filename, e))
220            except pickle.PicklingError, e:
221                self.log.error("Pickling problem: %s" % e)
222            except TypeError, e:
223                self.log.error("Pickling problem (TypeError): %s" % e)
224
225
226    def read_state(self):
227        """
228        Read a new copy of access state.  Old state is overwritten.
229
230        State format is a simple pickling of the state dictionary.
231        """
232        if self.state_filename:
233            try:
234                f = open(self.state_filename, "r")
235                self.state = pickle.load(f)
236                self.log.debug("[read_state]: Read state from %s" % \
237                        self.state_filename)
238            except IOError, e:
239                self.log.warning(("[read_state]: No saved state: " +\
240                        "Can't open %s: %s") % (self.state_filename, e))
241            except EOFError, e:
242                self.log.warning(("[read_state]: " +\
243                        "Empty or damaged state file: %s:") % \
244                        self.state_filename)
245            except pickle.UnpicklingError, e:
246                self.log.warning(("[read_state]: No saved state: " + \
247                        "Unpickling failed: %s") % e)
248
249            # Add the ownership attributes to the authorizer.  Note that the
250            # indices of the allocation dict are strings, but the attributes are
251            # fedids, so there is a conversion.
252            for k in self.state.keys():
253                for o in self.state[k].get('owners', []):
254                    self.auth.set_attribute(o, fedid(hexstr=k))
255                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
256
257
258    def permute_wildcards(self, a, p):
259        """Return a copy of a with various fields wildcarded.
260
261        The bits of p control the wildcards.  A set bit is a wildcard
262        replacement with the lowest bit being user then project then testbed.
263        """
264        if p & 1: user = ["<any>"]
265        else: user = a[2]
266        if p & 2: proj = "<any>"
267        else: proj = a[1]
268        if p & 4: tb = "<any>"
269        else: tb = a[0]
270
271        return (tb, proj, user)
272
273    def find_access(self, search):
274        """
275        Search the access DB for a match on this tuple.  Return the matching
276        user (repo dir).
277       
278        NB, if the initial tuple fails to match we start inserting wildcards in
279        an order determined by self.project_priority.  Try the list of users in
280        order (when wildcarded, there's only one user in the list).
281        """
282        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
283        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
284
285        for p in perm: 
286            s = self.permute_wildcards(search, p)
287            # s[2] is None on an anonymous, unwildcarded request
288            if s[2] != None:
289                for u in s[2]:
290                    if self.access.has_key((s[0], s[1], u)):
291                        return self.access[(s[0], s[1], u)]
292            else:
293                if self.access.has_key(s):
294                    return self.access[s]
295        return None
296
297    def lookup_access(self, req, fid):
298        """
299        Determine the allowed access for this request.  Return the access and
300        which fields are dynamic.
301
302        The fedid is needed to construct the request
303        """
304        # Search keys
305        tb = None
306        project = None
307        user = None
308        # Return values
309        rp = access_project(None, ())
310        ru = None
311
312        if req.has_key('project'):
313            p = req['project']
314            if p.has_key('name'):
315                project = unpack_id(p['name'])
316            user = self.get_users(p)
317        else:
318            user = self.get_users(req)
319
320        user_fedids = [ u for u in user if isinstance(u, fedid)]
321        # Determine how the caller is representing itself.  If its fedid shows
322        # up as a project or a singleton user, let that stand.  If neither the
323        # usernames nor the project name is a fedid, the caller is a testbed.
324        if project and isinstance(project, fedid):
325            if project == fid:
326                # The caller is the project (which is already in the tuple
327                # passed in to the authorizer)
328                owners = user_fedids
329                owners.append(project)
330            else:
331                raise service_error(service_error.req,
332                        "Project asserting different fedid")
333        else:
334            if fid not in user_fedids:
335                tb = fid
336                owners = user_fedids
337                owners.append(fid)
338            else:
339                if len(fedids) > 1:
340                    raise service_error(service_error.req,
341                            "User asserting different fedid")
342                else:
343                    # Which is a singleton
344                    owners = user_fedids
345        # Confirm authorization
346
347        for u in user:
348            self.log.debug("[lookup_access] Checking access for %s" % \
349                    ((tb, project, u),))
350            if self.auth.check_attribute((tb, project, u), 'access'):
351                self.log.debug("[lookup_access] Access granted")
352                break
353            else:
354                self.log.debug("[lookup_access] Access Denied")
355        else:
356            raise service_error(service_error.access, "Access denied")
357
358        # This maps a valid user to the Emulab projects and users to use
359        found = self.find_access((tb, project, user))
360       
361        if found == None:
362            raise service_error(service_error.access,
363                    "Access denied - cannot map access")
364        return found, owners
365
366    def RequestAccess(self, req, fid):
367        """
368        Handle the access request.  Proxy if not for us.
369
370        Parse out the fields and make the allocations or rejections if for us,
371        otherwise, assuming we're willing to proxy, proxy the request out.
372        """
373
374        # The dance to get into the request body
375        if req.has_key('RequestAccessRequestBody'):
376            req = req['RequestAccessRequestBody']
377        else:
378            raise service_error(service_error.req, "No request!?")
379
380        if req.has_key('destinationTestbed'):
381            dt = unpack_id(req['destinationTestbed'])
382
383        if dt == None or dt in self.testbed:
384            # Request for this fedd
385            found, owners = self.lookup_access(req, fid)
386            # keep track of what's been added
387            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
388            aid = unicode(allocID)
389
390            self.state_lock.acquire()
391            self.state[aid] = { }
392            self.state[aid]['user'] = found
393            self.state[aid]['owners'] = owners
394            self.write_state()
395            self.state_lock.release()
396            for o in owners:
397                self.auth.set_attribute(o, allocID)
398            self.auth.set_attribute(allocID, allocID)
399
400            try:
401                f = open("%s/%s.pem" % (self.certdir, aid), "w")
402                print >>f, alloc_cert
403                f.close()
404            except IOError, e:
405                raise service_error(service_error.internal, 
406                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
407            return { 'allocID': { 'fedid': allocID } }
408        else:
409            if self.allow_proxy:
410                resp = self.proxy_RequestAccess.call_service(dt, req,
411                            self.cert_file, self.cert_pwd,
412                            self.trusted_certs)
413                if resp.has_key('RequestAccessResponseBody'):
414                    return resp['RequestAccessResponseBody']
415                else:
416                    return None
417            else:
418                raise service_error(service_error.access,
419                        "Access proxying denied")
420
421    def ReleaseAccess(self, req, fid):
422        # The dance to get into the request body
423        if req.has_key('ReleaseAccessRequestBody'):
424            req = req['ReleaseAccessRequestBody']
425        else:
426            raise service_error(service_error.req, "No request!?")
427
428        if req.has_key('destinationTestbed'):
429            dt = unpack_id(req['destinationTestbed'])
430        else:
431            dt = None
432
433        if dt == None or dt in self.testbed:
434            # Local request
435            try:
436                if req['allocID'].has_key('localname'):
437                    auth_attr = aid = req['allocID']['localname']
438                elif req['allocID'].has_key('fedid'):
439                    aid = unicode(req['allocID']['fedid'])
440                    auth_attr = req['allocID']['fedid']
441                else:
442                    raise service_error(service_error.req,
443                            "Only localnames and fedids are understood")
444            except KeyError:
445                raise service_error(service_error.req, "Badly formed request")
446
447            self.log.debug("[access] deallocation requested for %s", aid)
448            if not self.auth.check_attribute(fid, auth_attr):
449                self.log.debug("[access] deallocation denied for %s", aid)
450                raise service_error(service_error.access, "Access Denied")
451
452            self.state_lock.acquire()
453            if self.state.has_key(aid):
454                self.log.debug("Found allocation for %s" %aid)
455                del self.state[aid]
456                self.write_state()
457                self.state_lock.release()
458                # And remove the access cert
459                cf = "%s/%s.pem" % (self.certdir, aid)
460                self.log.debug("Removing %s" % cf)
461                os.remove(cf)
462                return { 'allocID': req['allocID'] } 
463            else:
464                self.state_lock.release()
465                raise service_error(service_error.req, "No such allocation")
466
467        else:
468            if self.allow_proxy:
469                resp = self.proxy_ReleaseAccess.call_service(dt, req,
470                            self.cert_file, self.cert_pwd,
471                            self.trusted_certs)
472                if resp.has_key('ReleaseAccessResponseBody'):
473                    return resp['ReleaseAccessResponseBody']
474                else:
475                    return None
476            else:
477                raise service_error(service_error.access,
478                        "Access proxying denied")
479
480    def extract_parameters(self, top):
481        """
482        DRAGON currently supports a fixed capacity link between two endpoints.
483        Those endpoints may specify a VPN (or list or range) to use.  This
484        extracts the DRAGON endpoints and vpn preferences from the segments (as
485        attributes) and the capacity of the connection as given by the
486        substrate.  The two endpoints VLAN choices are intersected to get set
487        of VLANs that are acceptable (no VLAN requiremnets means any is
488        acceptable).
489        """
490        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
491
492        if len(segments) != 2 or len(top.substrates) != 1:
493            raise service_error(service_error.req,
494                    "Requests to DRAGON must have exactlty two segments " +\
495                            "and one substrate")
496
497        ends = [ ]
498        for s in segments:
499            ep = s.get_attribute('dragon_endpoint')
500            if not ep:
501                raise service_error(service_error.req, 
[69692a9]502                        "Missing DRAGON endpoint for %s" % s.id)
[23dec62]503            v = s.get_attribute('vlans')
504            vlans = None
505            vset = set()
506            # the vlans can be a single integer, a comma separated list or a
507            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
508            # 100,300-305,400
509            if v:
510                if v.count(",") > 0 :
511                    vl = [ x.strip() for x in v.split(",") ]
512                else:
513                    vl = [ v ]
514                for v in vl:
515                    try:
516                        if v.count("-")> 0:
517                            f, t = v.split("-", 1)
518                            for i in range(int(f), int(t)+1):
519                                vset.add(i)
520                        else:
521                            vset.add(int(v))
522                    except ValueError:
523                        raise service_error(service_error.req, 
524                                "VLAN tags must be integers (%s)" %s.name)
525            if len(vset) > 0:
526                if vlans: vlans &= vest
527                else: vlans = vset
528            ends.append(ep)
529
530
531        sub = top.substrates[0]
532        if sub.capacity:
533            cap = int(sub.capacity.rate / 1000.0)
534            if cap < 1: cap = 1
535        else:
536            cap = 100
537
[36fec1b]538
539        # DRAGON's command line tool barfs if the source (ends[0]) is not in
540        # the domain controlled by the IDC.  This code ensures this situation.
541        if self.domain and not ends[0].endswith(self.domain):
542            hold = ends[0]
543            for i, e in enumerate(ends):
544                if i == 0: continue
545                if e.endswith(self.domain):
546                    ends[0] = e
547                    ends[i] = hold
548                    break
549            else:
550                raise service_error(service_error.req, 
551                        "No endpoint in my domain")
552
553
[23dec62]554        return cap, ends[0], ends[1], vlans
555
556
557    def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
558            log=None):
559        """
560        Do the actual work of creating the dragon connecton.
561        """
562        if not log: log = self.log
563        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
564        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
565        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
566        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
567        path_re = re.compile("Path:")
568
569        if not vpns:
570            vpns = [ None, None, None, None, None]
571
572        if not start:
573            start = time.time()
574        if not end:
[ab33158]575            end = start + 120 *60
[23dec62]576
577
578        status = None
579        gri = None
580        rv = None
581        vlan_no = None
582        for v in vpns:
583            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
584                'createReservation', '-repo',  repo , '-url', self.idc_url,
585                '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
586                '-vlan', "%s" % v, '-desc', 'fedd created connection',
587                '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
588                '-end', "%d" % int(end)]
589            log.debug("[start_segment]: %s" % " ".join(cmd))
590            if not self.create_debug: 
591                p = Popen(cmd, cwd=self.cli_dir, 
592                        stdout=PIPE, stderr=STDOUT,
593                        close_fds=True)
594                for line in p.stdout:
595                    m = status_re.match(line)
596                    if m:
597                        status = m.group(1)
598                        continue
599                    m = gri_re.match(line)
600                    if m:
601                        gri = m.group(1)
602                rv = p.wait()
603            else: 
604                rv = 0
605                status = 'ACCEPTED'
606                gri = 'debug_gri'
607
608            # Reservation in progress.  Poll the IDC until we know the outcome
609            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
610                'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
611
[ab33158]612            while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
[23dec62]613                log.debug("[start_segment]: %s" % " ".join(cmd))
614                if not self.create_debug:
615                    p = Popen(cmd, cwd=self.cli_dir, 
616                            stdout=PIPE, stderr=STDOUT,
617                            close_fds=True)
618                    in_path = False
619                    vpn1 = None
620                    vpn2 = None
621                    src = None
622                    dest = None
623                    for line in p.stdout:
624                        if not in_path:
625                            m = status_re.match(line)
626                            if m: 
627                                status = m.group(1)
628                                continue
629                            m = source_re.match(line)
630                            if m:
631                                src = m.group(1)
632                                continue
633                            m = dest_re.match(line)
634                            if m:
635                                dest = m.group(1)
636                                continue
637                            m = path_re.match(line)
638                            if m:
639                                in_path = True
640                                if src and dest:
641                                    vpn1_re = re.compile(
642                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
643                                                    src.replace("*", "\*"))
644                                    vpn2_re = re.compile(
645                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
646                                                    dest.replace("*", "\*"))
647                                else:
648                                    raise service_error(service_error.internal, 
649                                            "Strange output from query")
650                        else:
651                            m = vpn1_re.match(line)
652                            if m:
653                                vpn1 = m.group(1)
654                                continue
655                            m = vpn2_re.match(line)
656                            if m:
657                                vpn2 = m.group(1)
658                                continue
659                    rv = p.wait()
660                    if vpn1 == vpn2:
661                        if v is not None:
662                            if int(vpn1) == v:
663                                vlan_no = int(v)
664                            else:
665                                raise service_error(service_error.federant, 
666                                        "Unexpected vlan assignment")
667                        else:
668                            vlan_no = int(v)
669                    else:
670                        raise service_error(service_error.internal,
671                                "Different VPNs on DRAGON ends")
[68bb551]672                    log.debug("Status: %s" % status or "none")
[23dec62]673                else:
674                    status = 'ACTIVE'
675                    vlan_no = int(v) or 1
[ab33158]676                if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
677                    time.sleep(45)
[23dec62]678            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
679                break
680
681        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
682            return gri, vlan_no
683        else:
[69692a9]684            raise service_error(service_error.federant, 
685                    "Cannot make reservation")
[23dec62]686
687    def stop_segment(self, repo, gri, log=None):
688        if not log: log = self.log
689        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
690            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
691
692
693        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
694        if not self.create_debug:
695            try:
696                f = open("/dev/null", "w")
697                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
698                f.close()
699            except IOError, e:
700                raise service_error(service_error.internal, 
701                        "Failed to open /dev/null: %s" % e)
702
703    def StartSegment(self, req, fid):
704        err = None  # Any service_error generated after tmpdir is created
705        rv = None   # Return value from segment creation
706
707        try:
708            req = req['StartSegmentRequestBody']
709        except KeyError:
710            raise service_error(server_error.req, "Badly formed request")
711
712        auth_attr = req['allocID']['fedid']
713        aid = "%s" % auth_attr
714        attrs = req.get('fedAttr', [])
715        if not self.auth.check_attribute(fid, auth_attr):
716            raise service_error(service_error.access, "Access denied")
717
718        if req.has_key('segmentdescription') and \
719                req['segmentdescription'].has_key('topdldescription'):
720            topo = \
721                topdl.Topology(**req['segmentdescription']['topdldescription'])
722        else:
723            raise service_error(service_error.req, 
724                    "Request missing segmentdescription'")
725
726        cap, src, dest, vlans = self.extract_parameters(topo)
727        ename = aid
728
729        for a in attrs:
730            if a['attribute'] == 'experiment_name':
731                ename = a['value']
732
733        repo = None
734        self.state_lock.acquire()
735        if self.state.has_key(aid):
736            repo = self.state[aid]['user']
737            self.state[aid]['log'] = [ ]
738            # Create a logger that logs to the experiment's state object as
739            # well as to the main log file.
740            alloc_log = logging.getLogger('fedd.access.%s' % ename)
741            h = logging.StreamHandler(
742                    list_log.list_log(self.state[aid]['log']))
743            # XXX: there should be a global one of these rather than
744            # repeating the code.
745            h.setFormatter(logging.Formatter(
746                "%(asctime)s %(name)s %(message)s",
747                        '%d %b %y %H:%M:%S'))
748            alloc_log.addHandler(h)
749            self.write_state()
750        self.state_lock.release()
751
752        if not repo:
753            raise service_error(service_error.internal, 
754                    "Can't find creation user for %s" %aid)
755
[68bb551]756        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
757                log=alloc_log)
[23dec62]758
759        if gri:
760            # Grab the log (this is some anal locking, but better safe than
761            # sorry)
762            self.state_lock.acquire()
763            self.state[aid]['gri'] = gri
764            logv = "".join(self.state[aid]['log'])
765            self.write_state()
766            self.state_lock.release()
767
[68bb551]768            return { 
769                    'allocID': req['allocID'],
770                    'allocationLog': logv,
771                    'fedAttr': [
772                        {'attribute': 'vlan', 'value': '%d' % vlan_no }
773                        ]
774                    }
[23dec62]775        elif err:
776            raise service_error(service_error.federant,
777                    "Swapin failed: %s" % err)
778        else:
779            raise service_error(service_error.federant, "Swapin failed")
780
781    def TerminateSegment(self, req, fid):
782        try:
783            req = req['TerminateSegmentRequestBody']
784        except KeyError:
785            raise service_error(server_error.req, "Badly formed request")
786
787        auth_attr = req['allocID']['fedid']
788        aid = "%s" % auth_attr
[ab33158]789
790        self.log.debug("Terminate request for %s" %aid)
[23dec62]791        attrs = req.get('fedAttr', [])
792        if not self.auth.check_attribute(fid, auth_attr):
793            raise service_error(service_error.access, "Access denied")
794
795        self.state_lock.acquire()
796        if self.state.has_key(aid):
797            gri = self.state[aid].get('gri', None)
798            user = self.state[aid].get('user', None)
799        else:
800            gri = None
801            user = None
802        self.state_lock.release()
[ab33158]803        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
[23dec62]804
805        if not gri:
806            raise service_error(service_error.internal, 
807                    "Can't find gri for %s" % aid)
808
809        if not user:
810            raise service_error(service_error.internal, 
811                    "Can't find creation user for %s" % aid)
[ab33158]812   
813        self.log.debug("Stop segment for GRI: %s" %gri)
[23dec62]814        self.stop_segment(user, gri)
815        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.