source: fedd/federation/dragon_access.py @ 358e0b8

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 358e0b8 was 0f06528, checked in by Ted Faber <faber@…>, 15 years ago

Convert to useing credential strings

  • Property mode set to 100644
File size: 27.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import *
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to dragon_access.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = False
64        self.certdir = config.get("access","certdir")
65        self.create_debug = config.getboolean("access", "create_debug")
66        self.cli_dir = config.get("access", "cli_dir")
67        self.axis2_home = config.get("access", "axis2_home")
68        self.idc_url = config.get("access", "idc")
69        self.domain = config.get("access", "domain")
70
71        self.attrs = { }
72        self.access = { }
73        # State is a dict of dicts indexed by segment fedid that includes the
74        # owners of the segment as fedids (who can manipulate it, key: owners),
75        # the repo dir/user for the allocation (key: user),  Current allocation
76        # log (key: log), and GRI of the reservation once made (key: gri)
77        self.state = { }
78        self.log = logging.getLogger("fedd.access")
79        set_log_level(config, "access", self.log)
80        self.state_lock = Lock()
81        if not (self.cli_dir and self.axis2_home and self.idc_url):
82            self.log.error("Must specify all of cli_dir, axis2_home, " +\
83                    "idc in the [access] section of the configuration")
84
85        if auth: self.auth = auth
86        else:
87            self.log.error(\
88                    "[access]: No authorizer initialized, creating local one.")
89            auth = authorizer()
90
91        tb = config.get('access', 'testbed')
92        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
93        else: self.testbed = [ ]
94
95        if config.has_option("access", "accessdb"):
96            self.read_access(config.get("access", "accessdb"))
97
98        self.state_filename = config.get("access", "access_state")
99        self.read_state()
100
101        # Keep cert_file and cert_pwd coming from the same place
102        self.cert_file = config.get("access", "cert_file")
103        if self.cert_file:
104            self.sert_pwd = config.get("access", "cert_pw")
105        else:
106            self.cert_file = config.get("globals", "cert_file")
107            self.sert_pwd = config.get("globals", "cert_pw")
108
109        self.trusted_certs = config.get("access", "trusted_certs") or \
110                config.get("globals", "trusted_certs")
111
112        self.soap_services = {\
113            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
114            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
115            'StartSegment': soap_handler("StartSegment", self.StartSegment),
116            'TerminateSegment': soap_handler("TerminateSegment", 
117                self.TerminateSegment),
118            }
119        self.xmlrpc_services =  {\
120            'RequestAccess': xmlrpc_handler('RequestAccess',
121                self.RequestAccess),
122            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
123                self.ReleaseAccess),
124            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
125            'TerminateSegment': xmlrpc_handler('TerminateSegment',
126                self.TerminateSegment),
127            }
128
129    def read_access(self, config):
130        """
131        Read a configuration file and set internal parameters.
132
133        There are access lines of the
134        form (tb, proj, user) -> user that map the first tuple of
135        names to the user for for access purposes.  Names in the key (left side)
136        can include "<NONE> or <ANY>" to act as wildcards or to require the
137        fields to be empty.  Similarly aproj or auser can be <SAME> or
138        <DYNAMIC> indicating that either the matching key is to be used or a
139        dynamic user or project will be created.  These names can also be
140        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
141        directory that contains the DRAGON user credentials.
142        Testbed attributes outside the forms above can be given using the
143        format attribute: name value: value.  The name is a single word and the
144        value continues to the end of the line.  Empty lines and lines startin
145        with a # are ignored.
146
147        Parsing errors result in a self.parse_error exception being raised.
148        """
149        lineno=0
150        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
151        fedid_expr = "fedid:[" + string.hexdigits + "]+"
152        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
153
154        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
155                re.IGNORECASE)
156        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
157                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
158
159        def parse_name(n):
160            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
161            else: return n
162       
163        def auth_name(n):
164            if isinstance(n, basestring):
165                if n =='<any>' or n =='<none>': return None
166                else: return unicode(n)
167            else:
168                return n
169
170        f = open(config, "r");
171        for line in f:
172            lineno += 1
173            line = line.strip();
174            if len(line) == 0 or line.startswith('#'):
175                continue
176
177            # Extended (attribute: x value: y) attribute line
178            m = attr_re.match(line)
179            if m != None:
180                attr, val = m.group(1,2)
181                self.attrs[attr] = val
182                continue
183
184            # Access line (t, p, u) -> (a) line
185            # XXX: you are here
186            m = access_re.match(line)
187            if m != None:
188                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
189                auth_key = tuple([ auth_name(x) for x in access_key])
190                user_name = auth_name(parse_name(m.group(4)))
191
192                self.access[access_key] = user_name
193                self.auth.set_attribute(auth_key, "access")
194                continue
195
196            # Nothing matched to here: unknown line - raise exception
197            f.close()
198            raise self.parse_error("Unknown statement at line %d of %s" % \
199                    (lineno, config))
200        f.close()
201
202    def get_users(self, obj):
203        """
204        Return a list of the IDs of the users in dict
205        """
206        if obj.has_key('user'):
207            return [ unpack_id(u['userID']) \
208                    for u in obj['user'] if u.has_key('userID') ]
209        else:
210            return None
211
212    def write_state(self):
213        if self.state_filename:
214            try:
215                f = open(self.state_filename, 'w')
216                pickle.dump(self.state, f)
217            except IOError, e:
218                self.log.error("Can't write file %s: %s" % \
219                        (self.state_filename, e))
220            except pickle.PicklingError, e:
221                self.log.error("Pickling problem: %s" % e)
222            except TypeError, e:
223                self.log.error("Pickling problem (TypeError): %s" % e)
224
225
226    def read_state(self):
227        """
228        Read a new copy of access state.  Old state is overwritten.
229
230        State format is a simple pickling of the state dictionary.
231        """
232        if self.state_filename:
233            try:
234                f = open(self.state_filename, "r")
235                self.state = pickle.load(f)
236                self.log.debug("[read_state]: Read state from %s" % \
237                        self.state_filename)
238            except IOError, e:
239                self.log.warning(("[read_state]: No saved state: " +\
240                        "Can't open %s: %s") % (self.state_filename, e))
241            except EOFError, e:
242                self.log.warning(("[read_state]: " +\
243                        "Empty or damaged state file: %s:") % \
244                        self.state_filename)
245            except pickle.UnpicklingError, e:
246                self.log.warning(("[read_state]: No saved state: " + \
247                        "Unpickling failed: %s") % e)
248
249            # Add the ownership attributes to the authorizer.  Note that the
250            # indices of the allocation dict are strings, but the attributes are
251            # fedids, so there is a conversion.
252            for k in self.state.keys():
253                for o in self.state[k].get('owners', []):
254                    self.auth.set_attribute(o, fedid(hexstr=k))
255                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
256
257
258    def permute_wildcards(self, a, p):
259        """Return a copy of a with various fields wildcarded.
260
261        The bits of p control the wildcards.  A set bit is a wildcard
262        replacement with the lowest bit being user then project then testbed.
263        """
264        if p & 1: user = ["<any>"]
265        else: user = a[2]
266        if p & 2: proj = "<any>"
267        else: proj = a[1]
268        if p & 4: tb = "<any>"
269        else: tb = a[0]
270
271        return (tb, proj, user)
272
273    def find_access(self, search):
274        """
275        Search the access DB for a match on this tuple.  Return the matching
276        user (repo dir).
277       
278        NB, if the initial tuple fails to match we start inserting wildcards in
279        an order determined by self.project_priority.  Try the list of users in
280        order (when wildcarded, there's only one user in the list).
281        """
282        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
283        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
284
285        for p in perm: 
286            s = self.permute_wildcards(search, p)
287            # s[2] is None on an anonymous, unwildcarded request
288            if s[2] != None:
289                for u in s[2]:
290                    if self.access.has_key((s[0], s[1], u)):
291                        return self.access[(s[0], s[1], u)]
292            else:
293                if self.access.has_key(s):
294                    return self.access[s]
295        return None
296
297    def lookup_access(self, req, fid):
298        """
299        Determine the allowed access for this request.  Return the access and
300        which fields are dynamic.
301
302        The fedid is needed to construct the request
303        """
304        # Search keys
305        tb = None
306        project = None
307        user = None
308        # Return values
309        rp = access_project(None, ())
310        ru = None
311        user_re = re.compile("user:\s(.*)")
312        project_re = re.compile("project:\s(.*)")
313
314        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
315                if user_re.match(x)]
316        project = [ project_re.findall(x)[0] \
317                for x in req.get('credential', []) \
318                    if project_re.match(x)]
319
320        if len(project) == 1: project = project[0]
321        elif len(project) == 0: project = None
322        else: 
323            raise service_error(service_error.req, 
324                    "More than one project credential")
325
326
327        user_fedids = [ u for u in user if isinstance(u, fedid)]
328
329        # Determine how the caller is representing itself.  If its fedid shows
330        # up as a project or a singleton user, let that stand.  If neither the
331        # usernames nor the project name is a fedid, the caller is a testbed.
332        if project and isinstance(project, fedid):
333            if project == fid:
334                # The caller is the project (which is already in the tuple
335                # passed in to the authorizer)
336                owners = user_fedids
337                owners.append(project)
338            else:
339                raise service_error(service_error.req,
340                        "Project asserting different fedid")
341        else:
342            if fid not in user_fedids:
343                tb = fid
344                owners = user_fedids
345                owners.append(fid)
346            else:
347                if len(fedids) > 1:
348                    raise service_error(service_error.req,
349                            "User asserting different fedid")
350                else:
351                    # Which is a singleton
352                    owners = user_fedids
353        # Confirm authorization
354
355        for u in user:
356            self.log.debug("[lookup_access] Checking access for %s" % \
357                    ((tb, project, u),))
358            if self.auth.check_attribute((tb, project, u), 'access'):
359                self.log.debug("[lookup_access] Access granted")
360                break
361            else:
362                self.log.debug("[lookup_access] Access Denied")
363        else:
364            raise service_error(service_error.access, "Access denied")
365
366        # This maps a valid user to the Emulab projects and users to use
367        found = self.find_access((tb, project, user))
368       
369        if found == None:
370            raise service_error(service_error.access,
371                    "Access denied - cannot map access")
372        return found, owners
373
374    def RequestAccess(self, req, fid):
375        """
376        Handle the access request.  Proxy if not for us.
377
378        Parse out the fields and make the allocations or rejections if for us,
379        otherwise, assuming we're willing to proxy, proxy the request out.
380        """
381
382        # The dance to get into the request body
383        if req.has_key('RequestAccessRequestBody'):
384            req = req['RequestAccessRequestBody']
385        else:
386            raise service_error(service_error.req, "No request!?")
387
388        if req.has_key('destinationTestbed'):
389            dt = unpack_id(req['destinationTestbed'])
390
391        if dt == None or dt in self.testbed:
392            # Request for this fedd
393            found, owners = self.lookup_access(req, fid)
394            # keep track of what's been added
395            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
396            aid = unicode(allocID)
397
398            self.state_lock.acquire()
399            self.state[aid] = { }
400            self.state[aid]['user'] = found
401            self.state[aid]['owners'] = owners
402            self.write_state()
403            self.state_lock.release()
404            for o in owners:
405                self.auth.set_attribute(o, allocID)
406            self.auth.set_attribute(allocID, allocID)
407
408            try:
409                f = open("%s/%s.pem" % (self.certdir, aid), "w")
410                print >>f, alloc_cert
411                f.close()
412            except IOError, e:
413                raise service_error(service_error.internal, 
414                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
415            return { 'allocID': { 'fedid': allocID } }
416        else:
417            if self.allow_proxy:
418                resp = self.proxy_RequestAccess.call_service(dt, req,
419                            self.cert_file, self.cert_pwd,
420                            self.trusted_certs)
421                if resp.has_key('RequestAccessResponseBody'):
422                    return resp['RequestAccessResponseBody']
423                else:
424                    return None
425            else:
426                raise service_error(service_error.access,
427                        "Access proxying denied")
428
429    def ReleaseAccess(self, req, fid):
430        # The dance to get into the request body
431        if req.has_key('ReleaseAccessRequestBody'):
432            req = req['ReleaseAccessRequestBody']
433        else:
434            raise service_error(service_error.req, "No request!?")
435
436        if req.has_key('destinationTestbed'):
437            dt = unpack_id(req['destinationTestbed'])
438        else:
439            dt = None
440
441        if dt == None or dt in self.testbed:
442            # Local request
443            try:
444                if req['allocID'].has_key('localname'):
445                    auth_attr = aid = req['allocID']['localname']
446                elif req['allocID'].has_key('fedid'):
447                    aid = unicode(req['allocID']['fedid'])
448                    auth_attr = req['allocID']['fedid']
449                else:
450                    raise service_error(service_error.req,
451                            "Only localnames and fedids are understood")
452            except KeyError:
453                raise service_error(service_error.req, "Badly formed request")
454
455            self.log.debug("[access] deallocation requested for %s", aid)
456            if not self.auth.check_attribute(fid, auth_attr):
457                self.log.debug("[access] deallocation denied for %s", aid)
458                raise service_error(service_error.access, "Access Denied")
459
460            self.state_lock.acquire()
461            if self.state.has_key(aid):
462                self.log.debug("Found allocation for %s" %aid)
463                del self.state[aid]
464                self.write_state()
465                self.state_lock.release()
466                # And remove the access cert
467                cf = "%s/%s.pem" % (self.certdir, aid)
468                self.log.debug("Removing %s" % cf)
469                os.remove(cf)
470                return { 'allocID': req['allocID'] } 
471            else:
472                self.state_lock.release()
473                raise service_error(service_error.req, "No such allocation")
474
475        else:
476            if self.allow_proxy:
477                resp = self.proxy_ReleaseAccess.call_service(dt, req,
478                            self.cert_file, self.cert_pwd,
479                            self.trusted_certs)
480                if resp.has_key('ReleaseAccessResponseBody'):
481                    return resp['ReleaseAccessResponseBody']
482                else:
483                    return None
484            else:
485                raise service_error(service_error.access,
486                        "Access proxying denied")
487
488    def extract_parameters(self, top):
489        """
490        DRAGON currently supports a fixed capacity link between two endpoints.
491        Those endpoints may specify a VPN (or list or range) to use.  This
492        extracts the DRAGON endpoints and vpn preferences from the segments (as
493        attributes) and the capacity of the connection as given by the
494        substrate.  The two endpoints VLAN choices are intersected to get set
495        of VLANs that are acceptable (no VLAN requiremnets means any is
496        acceptable).
497        """
498        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
499
500        if len(segments) != 2 or len(top.substrates) != 1:
501            raise service_error(service_error.req,
502                    "Requests to DRAGON must have exactlty two segments " +\
503                            "and one substrate")
504
505        ends = [ ]
506        for s in segments:
507            ep = s.get_attribute('dragon_endpoint')
508            if not ep:
509                raise service_error(service_error.req, 
510                        "Missing DRAGON endpoint for %s" % s.id)
511            v = s.get_attribute('vlans')
512            vlans = None
513            vset = set()
514            # the vlans can be a single integer, a comma separated list or a
515            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
516            # 100,300-305,400
517            if v:
518                if v.count(",") > 0 :
519                    vl = [ x.strip() for x in v.split(",") ]
520                else:
521                    vl = [ v ]
522                for v in vl:
523                    try:
524                        if v.count("-")> 0:
525                            f, t = v.split("-", 1)
526                            for i in range(int(f), int(t)+1):
527                                vset.add(i)
528                        else:
529                            vset.add(int(v))
530                    except ValueError:
531                        raise service_error(service_error.req, 
532                                "VLAN tags must be integers (%s)" %s.name)
533            if len(vset) > 0:
534                if vlans: vlans &= vest
535                else: vlans = vset
536            ends.append(ep)
537
538
539        sub = top.substrates[0]
540        if sub.capacity:
541            cap = int(sub.capacity.rate / 1000.0)
542            if cap < 1: cap = 1
543        else:
544            cap = 100
545
546
547        # DRAGON's command line tool barfs if the source (ends[0]) is not in
548        # the domain controlled by the IDC.  This code ensures this situation.
549        if self.domain and not ends[0].endswith(self.domain):
550            hold = ends[0]
551            for i, e in enumerate(ends):
552                if i == 0: continue
553                if e.endswith(self.domain):
554                    ends[0] = e
555                    ends[i] = hold
556                    break
557            else:
558                raise service_error(service_error.req, 
559                        "No endpoint in my domain")
560
561
562        return cap, ends[0], ends[1], vlans
563
564
565    def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
566            log=None):
567        """
568        Do the actual work of creating the dragon connecton.
569        """
570        if not log: log = self.log
571        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
572        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
573        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
574        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
575        path_re = re.compile("Path:")
576
577        if not vpns:
578            vpns = [ None, None, None, None, None]
579
580        if not start:
581            start = time.time()
582        if not end:
583            end = start + 120 *60
584
585
586        status = None
587        gri = None
588        rv = None
589        vlan_no = None
590        for v in vpns:
591            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
592                'createReservation', '-repo',  repo , '-url', self.idc_url,
593                '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
594                '-vlan', "%s" % v, '-desc', 'fedd created connection',
595                '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
596                '-end', "%d" % int(end)]
597            log.debug("[start_segment]: %s" % " ".join(cmd))
598            if not self.create_debug: 
599                p = Popen(cmd, cwd=self.cli_dir, 
600                        stdout=PIPE, stderr=STDOUT,
601                        close_fds=True)
602                for line in p.stdout:
603                    m = status_re.match(line)
604                    if m:
605                        status = m.group(1)
606                        continue
607                    m = gri_re.match(line)
608                    if m:
609                        gri = m.group(1)
610                rv = p.wait()
611            else: 
612                rv = 0
613                status = 'ACCEPTED'
614                gri = 'debug_gri'
615
616            # Reservation in progress.  Poll the IDC until we know the outcome
617            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
618                'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
619
620            while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
621                log.debug("[start_segment]: %s" % " ".join(cmd))
622                if not self.create_debug:
623                    p = Popen(cmd, cwd=self.cli_dir, 
624                            stdout=PIPE, stderr=STDOUT,
625                            close_fds=True)
626                    in_path = False
627                    vpn1 = None
628                    vpn2 = None
629                    src = None
630                    dest = None
631                    for line in p.stdout:
632                        if not in_path:
633                            m = status_re.match(line)
634                            if m: 
635                                status = m.group(1)
636                                continue
637                            m = source_re.match(line)
638                            if m:
639                                src = m.group(1)
640                                continue
641                            m = dest_re.match(line)
642                            if m:
643                                dest = m.group(1)
644                                continue
645                            m = path_re.match(line)
646                            if m:
647                                in_path = True
648                                if src and dest:
649                                    vpn1_re = re.compile(
650                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
651                                                    src.replace("*", "\*"))
652                                    vpn2_re = re.compile(
653                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
654                                                    dest.replace("*", "\*"))
655                                else:
656                                    raise service_error(service_error.internal, 
657                                            "Strange output from query")
658                        else:
659                            m = vpn1_re.match(line)
660                            if m:
661                                vpn1 = m.group(1)
662                                continue
663                            m = vpn2_re.match(line)
664                            if m:
665                                vpn2 = m.group(1)
666                                continue
667                    rv = p.wait()
668                    if vpn1 == vpn2:
669                        if v is not None:
670                            if int(vpn1) == v:
671                                vlan_no = int(v)
672                            else:
673                                raise service_error(service_error.federant, 
674                                        "Unexpected vlan assignment")
675                        else:
676                            vlan_no = int(v)
677                    else:
678                        raise service_error(service_error.internal,
679                                "Different VPNs on DRAGON ends")
680                    log.debug("Status: %s" % status or "none")
681                else:
682                    status = 'ACTIVE'
683                    vlan_no = int(v) or 1
684                if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
685                    time.sleep(45)
686            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
687                break
688
689        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
690            return gri, vlan_no
691        else:
692            raise service_error(service_error.federant, 
693                    "Cannot make reservation")
694
695    def stop_segment(self, repo, gri, log=None):
696        if not log: log = self.log
697        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
698            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
699
700
701        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
702        if not self.create_debug:
703            try:
704                f = open("/dev/null", "w")
705                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
706                f.close()
707            except IOError, e:
708                raise service_error(service_error.internal, 
709                        "Failed to open /dev/null: %s" % e)
710
711    def StartSegment(self, req, fid):
712        err = None  # Any service_error generated after tmpdir is created
713        rv = None   # Return value from segment creation
714
715        try:
716            req = req['StartSegmentRequestBody']
717        except KeyError:
718            raise service_error(server_error.req, "Badly formed request")
719
720        auth_attr = req['allocID']['fedid']
721        aid = "%s" % auth_attr
722        attrs = req.get('fedAttr', [])
723        if not self.auth.check_attribute(fid, auth_attr):
724            raise service_error(service_error.access, "Access denied")
725
726        if req.has_key('segmentdescription') and \
727                req['segmentdescription'].has_key('topdldescription'):
728            topo = \
729                topdl.Topology(**req['segmentdescription']['topdldescription'])
730        else:
731            raise service_error(service_error.req, 
732                    "Request missing segmentdescription'")
733
734        cap, src, dest, vlans = self.extract_parameters(topo)
735        ename = aid
736
737        for a in attrs:
738            if a['attribute'] == 'experiment_name':
739                ename = a['value']
740
741        repo = None
742        self.state_lock.acquire()
743        if self.state.has_key(aid):
744            repo = self.state[aid]['user']
745            self.state[aid]['log'] = [ ]
746            # Create a logger that logs to the experiment's state object as
747            # well as to the main log file.
748            alloc_log = logging.getLogger('fedd.access.%s' % ename)
749            h = logging.StreamHandler(
750                    list_log.list_log(self.state[aid]['log']))
751            # XXX: there should be a global one of these rather than
752            # repeating the code.
753            h.setFormatter(logging.Formatter(
754                "%(asctime)s %(name)s %(message)s",
755                        '%d %b %y %H:%M:%S'))
756            alloc_log.addHandler(h)
757            self.write_state()
758        self.state_lock.release()
759
760        if not repo:
761            raise service_error(service_error.internal, 
762                    "Can't find creation user for %s" %aid)
763
764        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
765                log=alloc_log)
766
767        if gri:
768            # Grab the log (this is some anal locking, but better safe than
769            # sorry)
770            self.state_lock.acquire()
771            self.state[aid]['gri'] = gri
772            logv = "".join(self.state[aid]['log'])
773            self.write_state()
774            self.state_lock.release()
775
776            return { 
777                    'allocID': req['allocID'],
778                    'allocationLog': logv,
779                    'fedAttr': [
780                        {'attribute': 'vlan', 'value': '%d' % vlan_no }
781                        ]
782                    }
783        elif err:
784            raise service_error(service_error.federant,
785                    "Swapin failed: %s" % err)
786        else:
787            raise service_error(service_error.federant, "Swapin failed")
788
789    def TerminateSegment(self, req, fid):
790        try:
791            req = req['TerminateSegmentRequestBody']
792        except KeyError:
793            raise service_error(server_error.req, "Badly formed request")
794
795        auth_attr = req['allocID']['fedid']
796        aid = "%s" % auth_attr
797
798        self.log.debug("Terminate request for %s" %aid)
799        attrs = req.get('fedAttr', [])
800        if not self.auth.check_attribute(fid, auth_attr):
801            raise service_error(service_error.access, "Access denied")
802
803        self.state_lock.acquire()
804        if self.state.has_key(aid):
805            gri = self.state[aid].get('gri', None)
806            user = self.state[aid].get('user', None)
807        else:
808            gri = None
809            user = None
810        self.state_lock.release()
811        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
812
813        if not gri:
814            raise service_error(service_error.internal, 
815                    "Can't find gri for %s" % aid)
816
817        if not user:
818            raise service_error(service_error.internal, 
819                    "Can't find creation user for %s" % aid)
820   
821        self.log.debug("Stop segment for GRI: %s" %gri)
822        self.stop_segment(user, gri)
823        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.