source: fedd/federation/dragon_access.py @ f52f5df

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f52f5df was 7a011e9, checked in by Ted Faber <faber@…>, 15 years ago

Use self organizing store

  • Property mode set to 100644
File size: 29.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import *
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to dragon_access.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = False
64        self.certdir = config.get("access","certdir")
65        self.create_debug = config.getboolean("access", "create_debug")
66        self.cli_dir = config.get("access", "cli_dir")
67        self.axis2_home = config.get("access", "axis2_home")
68        self.idc_url = config.get("access", "idc")
69        self.domain = config.get("access", "domain")
70
71        self.attrs = { }
72        self.access = { }
73        # State is a dict of dicts indexed by segment fedid that includes the
74        # owners of the segment as fedids (who can manipulate it, key: owners),
75        # the repo dir/user for the allocation (key: user),  Current allocation
76        # log (key: log), and GRI of the reservation once made (key: gri)
77        self.state = { }
78        self.log = logging.getLogger("fedd.access")
79        set_log_level(config, "access", self.log)
80        self.state_lock = Lock()
81        if not (self.cli_dir and self.axis2_home and self.idc_url):
82            self.log.error("Must specify all of cli_dir, axis2_home, " +\
83                    "idc in the [access] section of the configuration")
84
85        if auth: self.auth = auth
86        else:
87            self.log.error(\
88                    "[access]: No authorizer initialized, creating local one.")
89            auth = authorizer()
90
91        tb = config.get('access', 'testbed')
92        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
93        else: self.testbed = [ ]
94
95        if config.has_option("access", "accessdb"):
96            self.read_access(config.get("access", "accessdb"))
97
98        self.state_filename = config.get("access", "access_state")
99        self.read_state()
100
101        # Keep cert_file and cert_pwd coming from the same place
102        self.cert_file = config.get("access", "cert_file")
103        if self.cert_file:
104            self.sert_pwd = config.get("access", "cert_pw")
105        else:
106            self.cert_file = config.get("globals", "cert_file")
107            self.sert_pwd = config.get("globals", "cert_pw")
108
109        self.trusted_certs = config.get("access", "trusted_certs") or \
110                config.get("globals", "trusted_certs")
111
112        self.call_GetValue= service_caller('GetValue')
113        self.call_SetValue= service_caller('SetValue')
114
115        self.soap_services = {\
116            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
117            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
118            'StartSegment': soap_handler("StartSegment", self.StartSegment),
119            'TerminateSegment': soap_handler("TerminateSegment", 
120                self.TerminateSegment),
121            }
122        self.xmlrpc_services =  {\
123            'RequestAccess': xmlrpc_handler('RequestAccess',
124                self.RequestAccess),
125            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
126                self.ReleaseAccess),
127            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
128            'TerminateSegment': xmlrpc_handler('TerminateSegment',
129                self.TerminateSegment),
130            }
131
132    def read_access(self, config):
133        """
134        Read a configuration file and set internal parameters.
135
136        There are access lines of the
137        form (tb, proj, user) -> user that map the first tuple of
138        names to the user for for access purposes.  Names in the key (left side)
139        can include "<NONE> or <ANY>" to act as wildcards or to require the
140        fields to be empty.  Similarly aproj or auser can be <SAME> or
141        <DYNAMIC> indicating that either the matching key is to be used or a
142        dynamic user or project will be created.  These names can also be
143        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
144        directory that contains the DRAGON user credentials.
145        Testbed attributes outside the forms above can be given using the
146        format attribute: name value: value.  The name is a single word and the
147        value continues to the end of the line.  Empty lines and lines startin
148        with a # are ignored.
149
150        Parsing errors result in a self.parse_error exception being raised.
151        """
152        lineno=0
153        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
154        fedid_expr = "fedid:[" + string.hexdigits + "]+"
155        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
156
157        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
158                re.IGNORECASE)
159        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
160                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
161
162        def parse_name(n):
163            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
164            else: return n
165       
166        def auth_name(n):
167            if isinstance(n, basestring):
168                if n =='<any>' or n =='<none>': return None
169                else: return unicode(n)
170            else:
171                return n
172
173        f = open(config, "r");
174        for line in f:
175            lineno += 1
176            line = line.strip();
177            if len(line) == 0 or line.startswith('#'):
178                continue
179
180            # Extended (attribute: x value: y) attribute line
181            m = attr_re.match(line)
182            if m != None:
183                attr, val = m.group(1,2)
184                self.attrs[attr] = val
185                continue
186
187            # Access line (t, p, u) -> (a) line
188            # XXX: you are here
189            m = access_re.match(line)
190            if m != None:
191                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
192                auth_key = tuple([ auth_name(x) for x in access_key])
193                user_name = auth_name(parse_name(m.group(4)))
194
195                self.access[access_key] = user_name
196                self.auth.set_attribute(auth_key, "access")
197                continue
198
199            # Nothing matched to here: unknown line - raise exception
200            f.close()
201            raise self.parse_error("Unknown statement at line %d of %s" % \
202                    (lineno, config))
203        f.close()
204
205    def get_users(self, obj):
206        """
207        Return a list of the IDs of the users in dict
208        """
209        if obj.has_key('user'):
210            return [ unpack_id(u['userID']) \
211                    for u in obj['user'] if u.has_key('userID') ]
212        else:
213            return None
214
215    def write_state(self):
216        if self.state_filename:
217            try:
218                f = open(self.state_filename, 'w')
219                pickle.dump(self.state, f)
220            except IOError, e:
221                self.log.error("Can't write file %s: %s" % \
222                        (self.state_filename, e))
223            except pickle.PicklingError, e:
224                self.log.error("Pickling problem: %s" % e)
225            except TypeError, e:
226                self.log.error("Pickling problem (TypeError): %s" % e)
227
228
229    def read_state(self):
230        """
231        Read a new copy of access state.  Old state is overwritten.
232
233        State format is a simple pickling of the state dictionary.
234        """
235        if self.state_filename:
236            try:
237                f = open(self.state_filename, "r")
238                self.state = pickle.load(f)
239                self.log.debug("[read_state]: Read state from %s" % \
240                        self.state_filename)
241            except IOError, e:
242                self.log.warning(("[read_state]: No saved state: " +\
243                        "Can't open %s: %s") % (self.state_filename, e))
244            except EOFError, e:
245                self.log.warning(("[read_state]: " +\
246                        "Empty or damaged state file: %s:") % \
247                        self.state_filename)
248            except pickle.UnpicklingError, e:
249                self.log.warning(("[read_state]: No saved state: " + \
250                        "Unpickling failed: %s") % e)
251
252            # Add the ownership attributes to the authorizer.  Note that the
253            # indices of the allocation dict are strings, but the attributes are
254            # fedids, so there is a conversion.
255            for k in self.state.keys():
256                for o in self.state[k].get('owners', []):
257                    self.auth.set_attribute(o, fedid(hexstr=k))
258                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
259
260
261    def permute_wildcards(self, a, p):
262        """Return a copy of a with various fields wildcarded.
263
264        The bits of p control the wildcards.  A set bit is a wildcard
265        replacement with the lowest bit being user then project then testbed.
266        """
267        if p & 1: user = ["<any>"]
268        else: user = a[2]
269        if p & 2: proj = "<any>"
270        else: proj = a[1]
271        if p & 4: tb = "<any>"
272        else: tb = a[0]
273
274        return (tb, proj, user)
275
276    def find_access(self, search):
277        """
278        Search the access DB for a match on this tuple.  Return the matching
279        user (repo dir).
280       
281        NB, if the initial tuple fails to match we start inserting wildcards in
282        an order determined by self.project_priority.  Try the list of users in
283        order (when wildcarded, there's only one user in the list).
284        """
285        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
286        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
287
288        for p in perm: 
289            s = self.permute_wildcards(search, p)
290            # s[2] is None on an anonymous, unwildcarded request
291            if s[2] != None:
292                for u in s[2]:
293                    if self.access.has_key((s[0], s[1], u)):
294                        return self.access[(s[0], s[1], u)]
295            else:
296                if self.access.has_key(s):
297                    return self.access[s]
298        return None
299
300    def lookup_access(self, req, fid):
301        """
302        Determine the allowed access for this request.  Return the access and
303        which fields are dynamic.
304
305        The fedid is needed to construct the request
306        """
307        # Search keys
308        tb = None
309        project = None
310        user = None
311        # Return values
312        rp = access_project(None, ())
313        ru = None
314        user_re = re.compile("user:\s(.*)")
315        project_re = re.compile("project:\s(.*)")
316
317        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
318                if user_re.match(x)]
319        project = [ project_re.findall(x)[0] \
320                for x in req.get('credential', []) \
321                    if project_re.match(x)]
322
323        if len(project) == 1: project = project[0]
324        elif len(project) == 0: project = None
325        else: 
326            raise service_error(service_error.req, 
327                    "More than one project credential")
328
329
330        user_fedids = [ u for u in user if isinstance(u, fedid)]
331
332        # Determine how the caller is representing itself.  If its fedid shows
333        # up as a project or a singleton user, let that stand.  If neither the
334        # usernames nor the project name is a fedid, the caller is a testbed.
335        if project and isinstance(project, fedid):
336            if project == fid:
337                # The caller is the project (which is already in the tuple
338                # passed in to the authorizer)
339                owners = user_fedids
340                owners.append(project)
341            else:
342                raise service_error(service_error.req,
343                        "Project asserting different fedid")
344        else:
345            if fid not in user_fedids:
346                tb = fid
347                owners = user_fedids
348                owners.append(fid)
349            else:
350                if len(fedids) > 1:
351                    raise service_error(service_error.req,
352                            "User asserting different fedid")
353                else:
354                    # Which is a singleton
355                    owners = user_fedids
356        # Confirm authorization
357
358        for u in user:
359            self.log.debug("[lookup_access] Checking access for %s" % \
360                    ((tb, project, u),))
361            if self.auth.check_attribute((tb, project, u), 'access'):
362                self.log.debug("[lookup_access] Access granted")
363                break
364            else:
365                self.log.debug("[lookup_access] Access Denied")
366        else:
367            raise service_error(service_error.access, "Access denied")
368
369        # This maps a valid user to the Emulab projects and users to use
370        found = self.find_access((tb, project, user))
371       
372        if found == None:
373            raise service_error(service_error.access,
374                    "Access denied - cannot map access")
375        return found, owners
376
377    def RequestAccess(self, req, fid):
378        """
379        Handle the access request.  Proxy if not for us.
380
381        Parse out the fields and make the allocations or rejections if for us,
382        otherwise, assuming we're willing to proxy, proxy the request out.
383        """
384
385        # The dance to get into the request body
386        if req.has_key('RequestAccessRequestBody'):
387            req = req['RequestAccessRequestBody']
388        else:
389            raise service_error(service_error.req, "No request!?")
390
391        if req.has_key('destinationTestbed'):
392            dt = unpack_id(req['destinationTestbed'])
393
394        if dt == None or dt in self.testbed:
395            # Request for this fedd
396            found, owners = self.lookup_access(req, fid)
397            # keep track of what's been added
398            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
399            aid = unicode(allocID)
400
401            self.state_lock.acquire()
402            self.state[aid] = { }
403            self.state[aid]['user'] = found
404            self.state[aid]['owners'] = owners
405            self.write_state()
406            self.state_lock.release()
407            for o in owners:
408                self.auth.set_attribute(o, allocID)
409            self.auth.set_attribute(allocID, allocID)
410
411            try:
412                f = open("%s/%s.pem" % (self.certdir, aid), "w")
413                print >>f, alloc_cert
414                f.close()
415            except IOError, e:
416                raise service_error(service_error.internal, 
417                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
418            return { 'allocID': { 'fedid': allocID } }
419        else:
420            if self.allow_proxy:
421                resp = self.proxy_RequestAccess.call_service(dt, req,
422                            self.cert_file, self.cert_pwd,
423                            self.trusted_certs)
424                if resp.has_key('RequestAccessResponseBody'):
425                    return resp['RequestAccessResponseBody']
426                else:
427                    return None
428            else:
429                raise service_error(service_error.access,
430                        "Access proxying denied")
431
432    def ReleaseAccess(self, req, fid):
433        # The dance to get into the request body
434        if req.has_key('ReleaseAccessRequestBody'):
435            req = req['ReleaseAccessRequestBody']
436        else:
437            raise service_error(service_error.req, "No request!?")
438
439        if req.has_key('destinationTestbed'):
440            dt = unpack_id(req['destinationTestbed'])
441        else:
442            dt = None
443
444        if dt == None or dt in self.testbed:
445            # Local request
446            try:
447                if req['allocID'].has_key('localname'):
448                    auth_attr = aid = req['allocID']['localname']
449                elif req['allocID'].has_key('fedid'):
450                    aid = unicode(req['allocID']['fedid'])
451                    auth_attr = req['allocID']['fedid']
452                else:
453                    raise service_error(service_error.req,
454                            "Only localnames and fedids are understood")
455            except KeyError:
456                raise service_error(service_error.req, "Badly formed request")
457
458            self.log.debug("[access] deallocation requested for %s", aid)
459            if not self.auth.check_attribute(fid, auth_attr):
460                self.log.debug("[access] deallocation denied for %s", aid)
461                raise service_error(service_error.access, "Access Denied")
462
463            self.state_lock.acquire()
464            if self.state.has_key(aid):
465                self.log.debug("Found allocation for %s" %aid)
466                del self.state[aid]
467                self.write_state()
468                self.state_lock.release()
469                # And remove the access cert
470                cf = "%s/%s.pem" % (self.certdir, aid)
471                self.log.debug("Removing %s" % cf)
472                os.remove(cf)
473                return { 'allocID': req['allocID'] } 
474            else:
475                self.state_lock.release()
476                raise service_error(service_error.req, "No such allocation")
477
478        else:
479            if self.allow_proxy:
480                resp = self.proxy_ReleaseAccess.call_service(dt, req,
481                            self.cert_file, self.cert_pwd,
482                            self.trusted_certs)
483                if resp.has_key('ReleaseAccessResponseBody'):
484                    return resp['ReleaseAccessResponseBody']
485                else:
486                    return None
487            else:
488                raise service_error(service_error.access,
489                        "Access proxying denied")
490
491    def extract_parameters(self, top):
492        """
493        DRAGON currently supports a fixed capacity link between two endpoints.
494        Those endpoints may specify a VPN (or list or range) to use.  This
495        extracts the DRAGON endpoints and vpn preferences from the segments (as
496        attributes) and the capacity of the connection as given by the
497        substrate.  The two endpoints VLAN choices are intersected to get set
498        of VLANs that are acceptable (no VLAN requiremnets means any is
499        acceptable).
500        """
501        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
502
503        if len(segments) != 2 or len(top.substrates) != 1:
504            raise service_error(service_error.req,
505                    "Requests to DRAGON must have exactlty two segments " +\
506                            "and one substrate")
507
508        ends = [ ]
509        for s in segments:
510            ep = s.get_attribute('dragon_endpoint')
511            if not ep:
512                raise service_error(service_error.req, 
513                        "Missing DRAGON endpoint for %s" % s.id)
514            v = s.get_attribute('vlans')
515            vlans = None
516            vset = set()
517            # the vlans can be a single integer, a comma separated list or a
518            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
519            # 100,300-305,400
520            if v:
521                if v.count(",") > 0 :
522                    vl = [ x.strip() for x in v.split(",") ]
523                else:
524                    vl = [ v ]
525                for v in vl:
526                    try:
527                        if v.count("-")> 0:
528                            f, t = v.split("-", 1)
529                            for i in range(int(f), int(t)+1):
530                                vset.add(i)
531                        else:
532                            vset.add(int(v))
533                    except ValueError:
534                        raise service_error(service_error.req, 
535                                "VLAN tags must be integers (%s)" %s.name)
536            if len(vset) > 0:
537                if vlans: vlans &= vest
538                else: vlans = vset
539            ends.append(ep)
540
541
542        sub = top.substrates[0]
543        if sub.capacity:
544            cap = int(sub.capacity.rate / 1000.0)
545            if cap < 1: cap = 1
546        else:
547            cap = 100
548
549
550        # DRAGON's command line tool barfs if the source (ends[0]) is not in
551        # the domain controlled by the IDC.  This code ensures this situation.
552        if self.domain and not ends[0].endswith(self.domain):
553            hold = ends[0]
554            for i, e in enumerate(ends):
555                if i == 0: continue
556                if e.endswith(self.domain):
557                    ends[0] = e
558                    ends[i] = hold
559                    break
560            else:
561                raise service_error(service_error.req, 
562                        "No endpoint in my domain")
563
564
565        return cap, ends[0], ends[1], vlans
566
567
568    def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
569            log=None):
570        """
571        Do the actual work of creating the dragon connecton.
572        """
573        if not log: log = self.log
574        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
575        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
576        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
577        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
578        path_re = re.compile("Path:")
579
580        if not vpns:
581            vpns = [ None, None, None, None, None]
582
583        if not start:
584            start = time.time()
585        if not end:
586            end = start + 120 *60
587
588
589        status = None
590        gri = None
591        rv = None
592        vlan_no = None
593        for v in vpns:
594            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
595                'createReservation', '-repo',  repo , '-url', self.idc_url,
596                '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
597                '-vlan', "%s" % v, '-desc', 'fedd created connection',
598                '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
599                '-end', "%d" % int(end)]
600            log.debug("[start_segment]: %s" % " ".join(cmd))
601            if not self.create_debug: 
602                p = Popen(cmd, cwd=self.cli_dir, 
603                        stdout=PIPE, stderr=STDOUT,
604                        close_fds=True)
605                for line in p.stdout:
606                    m = status_re.match(line)
607                    if m:
608                        status = m.group(1)
609                        continue
610                    m = gri_re.match(line)
611                    if m:
612                        gri = m.group(1)
613                rv = p.wait()
614            else: 
615                rv = 0
616                status = 'ACCEPTED'
617                gri = 'debug_gri'
618
619            # Reservation in progress.  Poll the IDC until we know the outcome
620            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
621                'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
622
623            while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
624                log.debug("[start_segment]: %s" % " ".join(cmd))
625                if not self.create_debug:
626                    p = Popen(cmd, cwd=self.cli_dir, 
627                            stdout=PIPE, stderr=STDOUT,
628                            close_fds=True)
629                    in_path = False
630                    vpn1 = None
631                    vpn2 = None
632                    src = None
633                    dest = None
634                    for line in p.stdout:
635                        if not in_path:
636                            m = status_re.match(line)
637                            if m: 
638                                status = m.group(1)
639                                continue
640                            m = source_re.match(line)
641                            if m:
642                                src = m.group(1)
643                                continue
644                            m = dest_re.match(line)
645                            if m:
646                                dest = m.group(1)
647                                continue
648                            m = path_re.match(line)
649                            if m:
650                                in_path = True
651                                if src and dest:
652                                    vpn1_re = re.compile(
653                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
654                                                    src.replace("*", "\*"))
655                                    vpn2_re = re.compile(
656                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
657                                                    dest.replace("*", "\*"))
658                                else:
659                                    raise service_error(service_error.internal, 
660                                            "Strange output from query")
661                        else:
662                            m = vpn1_re.match(line)
663                            if m:
664                                vpn1 = m.group(1)
665                                continue
666                            m = vpn2_re.match(line)
667                            if m:
668                                vpn2 = m.group(1)
669                                continue
670                    rv = p.wait()
671                    if vpn1 == vpn2:
672                        if v is not None:
673                            if int(vpn1) == v:
674                                vlan_no = int(v)
675                            else:
676                                raise service_error(service_error.federant, 
677                                        "Unexpected vlan assignment")
678                        else:
679                            vlan_no = int(v)
680                    else:
681                        raise service_error(service_error.internal,
682                                "Different VPNs on DRAGON ends")
683                    log.debug("Status: %s" % status or "none")
684                else:
685                    status = 'ACTIVE'
686                    vlan_no = int(v) or 1
687                if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
688                    time.sleep(45)
689            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
690                break
691
692        self.log.debug("made reservation %s %s" % (gri, vlan_no))
693
694        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
695            return gri, vlan_no
696        else:
697            raise service_error(service_error.federant, 
698                    "Cannot make reservation")
699
700    def stop_segment(self, repo, gri, log=None):
701        if not log: log = self.log
702        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
703            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
704
705
706        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
707        if not self.create_debug:
708            try:
709                f = open("/dev/null", "w")
710                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
711                f.close()
712            except IOError, e:
713                raise service_error(service_error.internal, 
714                        "Failed to open /dev/null: %s" % e)
715
716    def export_store_info(self, cf, vlan, connInfo):
717        """
718        For the export requests in the connection info, install the peer names
719        at the experiment controller via SetValue calls.
720        """
721
722        for c in connInfo:
723            for p in [ p for p in c.get('parameter', []) \
724                    if p.get('type', '') == 'output']:
725
726                if p.get('name', '') != 'vlan_id':
727                    self.log.error("Unknown export parameter: %s" % \
728                            p.get('name'))
729                    continue
730
731                k = p.get('key', None)
732                surl = p.get('store', None)
733                if surl and k:
734                    value = "%s" % vlan
735                    req = { 'name': k, 'value': value }
736                    print "calling SetValue %s %s" % (surl, req)
737                    self.call_SetValue(surl, req, cf)
738                else:
739                    self.log.error("Bad export request: %s" % p)
740
741    def import_store_info(self, cf, connInfo):
742        """
743        Pull any import parameters in connInfo in.  We translate them either
744        into known member names or fedAddrs.
745        """
746
747        for c in connInfo:
748            for p in [ p for p in c.get('parameter', []) \
749                    if p.get('type', '') == 'input']:
750                name = p.get('name', None)
751                key = p.get('key', None)
752                store = p.get('store', None)
753
754                if name and key and store :
755                    req = { 'name': key, 'wait': True }
756                    r = self.call_GetValue(store, req, cf)
757                    r = r.get('GetValueResponseBody', None)
758                    if r :
759                        if r.get('name', '') == key:
760                            v = r.get('value', None)
761                            if v is not None:
762                                if name == 'peer':
763                                    c['peer'] = v
764                                else:
765                                    if c.has_key('fedAttr'):
766                                        c['fedAttr'].append({
767                                            'attribute': name, 'value': value})
768                                    else:
769                                        c['fedAttr']= [{
770                                            'attribute': name, 'value': value}]
771                            else:
772                                raise service_error(service_error.internal, 
773                                        'None value exported for %s'  % key)
774                        else:
775                            raise service_error(service_error.internal, 
776                                    'Different name returned for %s: %s' \
777                                            % (key, r.get('name','')))
778                    else:
779                        raise service_error(service_error.internal, 
780                            'Badly formatted response: no GetValueResponseBody')
781                else:
782                    raise service_error(service_error.internal, 
783                        'Bad Services missing info for import %s' % c)
784
785
786    def StartSegment(self, req, fid):
787        err = None  # Any service_error generated after tmpdir is created
788        rv = None   # Return value from segment creation
789
790        try:
791            req = req['StartSegmentRequestBody']
792        except KeyError:
793            raise service_error(server_error.req, "Badly formed request")
794
795        auth_attr = req['allocID']['fedid']
796        aid = "%s" % auth_attr
797        attrs = req.get('fedAttr', [])
798        if not self.auth.check_attribute(fid, auth_attr):
799            raise service_error(service_error.access, "Access denied")
800        else:
801            certfile = "%s/%s.pem" % (self.certdir, aid)
802
803        if req.has_key('segmentdescription') and \
804                req['segmentdescription'].has_key('topdldescription'):
805            topo = \
806                topdl.Topology(**req['segmentdescription']['topdldescription'])
807        else:
808            raise service_error(service_error.req, 
809                    "Request missing segmentdescription'")
810
811        connInfo = req.get('connection', [])
812
813        cap, src, dest, vlans = self.extract_parameters(topo)
814        ename = aid
815
816        for a in attrs:
817            if a['attribute'] == 'experiment_name':
818                ename = a['value']
819
820        repo = None
821        self.state_lock.acquire()
822        if self.state.has_key(aid):
823            repo = self.state[aid]['user']
824            self.state[aid]['log'] = [ ]
825            # Create a logger that logs to the experiment's state object as
826            # well as to the main log file.
827            alloc_log = logging.getLogger('fedd.access.%s' % ename)
828            h = logging.StreamHandler(
829                    list_log.list_log(self.state[aid]['log']))
830            # XXX: there should be a global one of these rather than
831            # repeating the code.
832            h.setFormatter(logging.Formatter(
833                "%(asctime)s %(name)s %(message)s",
834                        '%d %b %y %H:%M:%S'))
835            alloc_log.addHandler(h)
836            self.write_state()
837        self.state_lock.release()
838
839        if not repo:
840            raise service_error(service_error.internal, 
841                    "Can't find creation user for %s" %aid)
842
843        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
844                log=alloc_log)
845
846        self.export_store_info(certfile, vlan_no, connInfo)
847
848        if gri:
849            # Grab the log (this is some anal locking, but better safe than
850            # sorry)
851            self.state_lock.acquire()
852            self.state[aid]['gri'] = gri
853            logv = "".join(self.state[aid]['log'])
854            self.write_state()
855            self.state_lock.release()
856
857            return { 
858                    'allocID': req['allocID'],
859                    'allocationLog': logv,
860                    }
861        elif err:
862            raise service_error(service_error.federant,
863                    "Swapin failed: %s" % err)
864        else:
865            raise service_error(service_error.federant, "Swapin failed")
866
867    def TerminateSegment(self, req, fid):
868        try:
869            req = req['TerminateSegmentRequestBody']
870        except KeyError:
871            raise service_error(server_error.req, "Badly formed request")
872
873        auth_attr = req['allocID']['fedid']
874        aid = "%s" % auth_attr
875
876        self.log.debug("Terminate request for %s" %aid)
877        attrs = req.get('fedAttr', [])
878        if not self.auth.check_attribute(fid, auth_attr):
879            raise service_error(service_error.access, "Access denied")
880
881        self.state_lock.acquire()
882        if self.state.has_key(aid):
883            gri = self.state[aid].get('gri', None)
884            user = self.state[aid].get('user', None)
885        else:
886            gri = None
887            user = None
888        self.state_lock.release()
889        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
890
891        if not gri:
892            raise service_error(service_error.internal, 
893                    "Can't find gri for %s" % aid)
894
895        if not user:
896            raise service_error(service_error.internal, 
897                    "Can't find creation user for %s" % aid)
898   
899        self.log.debug("Stop segment for GRI: %s" %gri)
900        self.stop_segment(user, gri)
901        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.