source: fedd/federation/dragon_access.py @ f432e51

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f432e51 was d3c8759, checked in by Ted Faber <faber@…>, 15 years ago

Wholesale change of IOError to EnvironmentError? for file operations. Lots of
uncaught EnvironmentErrors? were causing spurious error conditions, e.g. on disk
full.

  • Property mode set to 100644
File size: 30.8 KB
RevLine 
[23dec62]1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
[ab847bc]11from threading import Thread, Lock
[23dec62]12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    class parse_error(RuntimeError): pass
48
49
50    proxy_RequestAccess= service_caller('RequestAccess')
51    proxy_ReleaseAccess= service_caller('ReleaseAccess')
52
53    def __init__(self, config=None, auth=None):
54        """
55        Initializer.  Pulls parameters out of the ConfigParser's access section.
56        """
57
58        # Make sure that the configuration is in place
59        if not config: 
60            raise RunTimeError("No config to dragon_access.access")
61
62        self.project_priority = config.getboolean("access", "project_priority")
63        self.allow_proxy = False
64        self.certdir = config.get("access","certdir")
65        self.create_debug = config.getboolean("access", "create_debug")
66        self.cli_dir = config.get("access", "cli_dir")
67        self.axis2_home = config.get("access", "axis2_home")
68        self.idc_url = config.get("access", "idc")
[36fec1b]69        self.domain = config.get("access", "domain")
[6f82229]70        self.duration = config.getint("access", "duration", 120)
[23dec62]71
72        self.attrs = { }
73        self.access = { }
74        # State is a dict of dicts indexed by segment fedid that includes the
75        # owners of the segment as fedids (who can manipulate it, key: owners),
76        # the repo dir/user for the allocation (key: user),  Current allocation
77        # log (key: log), and GRI of the reservation once made (key: gri)
78        self.state = { }
79        self.log = logging.getLogger("fedd.access")
80        set_log_level(config, "access", self.log)
81        self.state_lock = Lock()
82        if not (self.cli_dir and self.axis2_home and self.idc_url):
83            self.log.error("Must specify all of cli_dir, axis2_home, " +\
84                    "idc in the [access] section of the configuration")
85
86        if auth: self.auth = auth
87        else:
88            self.log.error(\
89                    "[access]: No authorizer initialized, creating local one.")
90            auth = authorizer()
91
92        tb = config.get('access', 'testbed')
93        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
94        else: self.testbed = [ ]
95
96        if config.has_option("access", "accessdb"):
97            self.read_access(config.get("access", "accessdb"))
98
99        self.state_filename = config.get("access", "access_state")
100        self.read_state()
101
102        # Keep cert_file and cert_pwd coming from the same place
103        self.cert_file = config.get("access", "cert_file")
104        if self.cert_file:
105            self.sert_pwd = config.get("access", "cert_pw")
106        else:
107            self.cert_file = config.get("globals", "cert_file")
108            self.sert_pwd = config.get("globals", "cert_pw")
109
110        self.trusted_certs = config.get("access", "trusted_certs") or \
111                config.get("globals", "trusted_certs")
112
[7a011e9]113        self.call_GetValue= service_caller('GetValue')
114        self.call_SetValue= service_caller('SetValue')
115
[23dec62]116        self.soap_services = {\
117            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
118            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
119            'StartSegment': soap_handler("StartSegment", self.StartSegment),
120            'TerminateSegment': soap_handler("TerminateSegment", 
121                self.TerminateSegment),
122            }
123        self.xmlrpc_services =  {\
124            'RequestAccess': xmlrpc_handler('RequestAccess',
125                self.RequestAccess),
126            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
127                self.ReleaseAccess),
128            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
129            'TerminateSegment': xmlrpc_handler('TerminateSegment',
130                self.TerminateSegment),
131            }
132
133    def read_access(self, config):
134        """
135        Read a configuration file and set internal parameters.
136
137        There are access lines of the
138        form (tb, proj, user) -> user that map the first tuple of
139        names to the user for for access purposes.  Names in the key (left side)
140        can include "<NONE> or <ANY>" to act as wildcards or to require the
141        fields to be empty.  Similarly aproj or auser can be <SAME> or
142        <DYNAMIC> indicating that either the matching key is to be used or a
143        dynamic user or project will be created.  These names can also be
144        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
145        directory that contains the DRAGON user credentials.
146        Testbed attributes outside the forms above can be given using the
147        format attribute: name value: value.  The name is a single word and the
148        value continues to the end of the line.  Empty lines and lines startin
149        with a # are ignored.
150
151        Parsing errors result in a self.parse_error exception being raised.
152        """
153        lineno=0
154        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
155        fedid_expr = "fedid:[" + string.hexdigits + "]+"
156        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
157
158        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
159                re.IGNORECASE)
160        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
161                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
162
163        def parse_name(n):
164            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
165            else: return n
166       
167        def auth_name(n):
168            if isinstance(n, basestring):
169                if n =='<any>' or n =='<none>': return None
170                else: return unicode(n)
171            else:
172                return n
173
174        f = open(config, "r");
175        for line in f:
176            lineno += 1
177            line = line.strip();
178            if len(line) == 0 or line.startswith('#'):
179                continue
180
181            # Extended (attribute: x value: y) attribute line
182            m = attr_re.match(line)
183            if m != None:
184                attr, val = m.group(1,2)
185                self.attrs[attr] = val
186                continue
187
188            # Access line (t, p, u) -> (a) line
189            # XXX: you are here
190            m = access_re.match(line)
191            if m != None:
192                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
193                auth_key = tuple([ auth_name(x) for x in access_key])
194                user_name = auth_name(parse_name(m.group(4)))
195
196                self.access[access_key] = user_name
197                self.auth.set_attribute(auth_key, "access")
198                continue
199
200            # Nothing matched to here: unknown line - raise exception
201            f.close()
202            raise self.parse_error("Unknown statement at line %d of %s" % \
203                    (lineno, config))
204        f.close()
205
206    def get_users(self, obj):
207        """
208        Return a list of the IDs of the users in dict
209        """
210        if obj.has_key('user'):
211            return [ unpack_id(u['userID']) \
212                    for u in obj['user'] if u.has_key('userID') ]
213        else:
214            return None
215
216    def write_state(self):
217        if self.state_filename:
218            try:
219                f = open(self.state_filename, 'w')
220                pickle.dump(self.state, f)
[d3c8759]221            except EnvironmentError, e:
[23dec62]222                self.log.error("Can't write file %s: %s" % \
223                        (self.state_filename, e))
224            except pickle.PicklingError, e:
225                self.log.error("Pickling problem: %s" % e)
226            except TypeError, e:
227                self.log.error("Pickling problem (TypeError): %s" % e)
228
229
230    def read_state(self):
231        """
232        Read a new copy of access state.  Old state is overwritten.
233
234        State format is a simple pickling of the state dictionary.
235        """
236        if self.state_filename:
237            try:
238                f = open(self.state_filename, "r")
239                self.state = pickle.load(f)
240                self.log.debug("[read_state]: Read state from %s" % \
241                        self.state_filename)
[d3c8759]242            except EnvironmentError, e:
[23dec62]243                self.log.warning(("[read_state]: No saved state: " +\
244                        "Can't open %s: %s") % (self.state_filename, e))
245            except EOFError, e:
246                self.log.warning(("[read_state]: " +\
247                        "Empty or damaged state file: %s:") % \
248                        self.state_filename)
249            except pickle.UnpicklingError, e:
250                self.log.warning(("[read_state]: No saved state: " + \
251                        "Unpickling failed: %s") % e)
252
253            # Add the ownership attributes to the authorizer.  Note that the
254            # indices of the allocation dict are strings, but the attributes are
255            # fedids, so there is a conversion.
256            for k in self.state.keys():
257                for o in self.state[k].get('owners', []):
258                    self.auth.set_attribute(o, fedid(hexstr=k))
259                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
260
261
262    def permute_wildcards(self, a, p):
263        """Return a copy of a with various fields wildcarded.
264
265        The bits of p control the wildcards.  A set bit is a wildcard
266        replacement with the lowest bit being user then project then testbed.
267        """
268        if p & 1: user = ["<any>"]
269        else: user = a[2]
270        if p & 2: proj = "<any>"
271        else: proj = a[1]
272        if p & 4: tb = "<any>"
273        else: tb = a[0]
274
275        return (tb, proj, user)
276
277    def find_access(self, search):
278        """
279        Search the access DB for a match on this tuple.  Return the matching
280        user (repo dir).
281       
282        NB, if the initial tuple fails to match we start inserting wildcards in
283        an order determined by self.project_priority.  Try the list of users in
284        order (when wildcarded, there's only one user in the list).
285        """
286        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
287        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
288
289        for p in perm: 
290            s = self.permute_wildcards(search, p)
291            # s[2] is None on an anonymous, unwildcarded request
292            if s[2] != None:
293                for u in s[2]:
294                    if self.access.has_key((s[0], s[1], u)):
295                        return self.access[(s[0], s[1], u)]
296            else:
297                if self.access.has_key(s):
298                    return self.access[s]
299        return None
300
301    def lookup_access(self, req, fid):
302        """
303        Determine the allowed access for this request.  Return the access and
304        which fields are dynamic.
305
306        The fedid is needed to construct the request
307        """
308        # Search keys
309        tb = None
310        project = None
311        user = None
312        # Return values
313        rp = access_project(None, ())
314        ru = None
[0f06528]315        user_re = re.compile("user:\s(.*)")
316        project_re = re.compile("project:\s(.*)")
317
318        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
319                if user_re.match(x)]
320        project = [ project_re.findall(x)[0] \
321                for x in req.get('credential', []) \
322                    if project_re.match(x)]
323
324        if len(project) == 1: project = project[0]
325        elif len(project) == 0: project = None
326        else: 
327            raise service_error(service_error.req, 
328                    "More than one project credential")
[23dec62]329
330
331        user_fedids = [ u for u in user if isinstance(u, fedid)]
[0f06528]332
[23dec62]333        # Determine how the caller is representing itself.  If its fedid shows
334        # up as a project or a singleton user, let that stand.  If neither the
335        # usernames nor the project name is a fedid, the caller is a testbed.
336        if project and isinstance(project, fedid):
337            if project == fid:
338                # The caller is the project (which is already in the tuple
339                # passed in to the authorizer)
340                owners = user_fedids
341                owners.append(project)
342            else:
343                raise service_error(service_error.req,
344                        "Project asserting different fedid")
345        else:
346            if fid not in user_fedids:
347                tb = fid
348                owners = user_fedids
349                owners.append(fid)
350            else:
351                if len(fedids) > 1:
352                    raise service_error(service_error.req,
353                            "User asserting different fedid")
354                else:
355                    # Which is a singleton
356                    owners = user_fedids
357        # Confirm authorization
358
359        for u in user:
360            self.log.debug("[lookup_access] Checking access for %s" % \
361                    ((tb, project, u),))
362            if self.auth.check_attribute((tb, project, u), 'access'):
363                self.log.debug("[lookup_access] Access granted")
364                break
365            else:
366                self.log.debug("[lookup_access] Access Denied")
367        else:
368            raise service_error(service_error.access, "Access denied")
369
370        # This maps a valid user to the Emulab projects and users to use
371        found = self.find_access((tb, project, user))
372       
373        if found == None:
374            raise service_error(service_error.access,
375                    "Access denied - cannot map access")
376        return found, owners
377
378    def RequestAccess(self, req, fid):
379        """
380        Handle the access request.  Proxy if not for us.
381
382        Parse out the fields and make the allocations or rejections if for us,
383        otherwise, assuming we're willing to proxy, proxy the request out.
384        """
385
386        # The dance to get into the request body
387        if req.has_key('RequestAccessRequestBody'):
388            req = req['RequestAccessRequestBody']
389        else:
390            raise service_error(service_error.req, "No request!?")
391
392        if req.has_key('destinationTestbed'):
393            dt = unpack_id(req['destinationTestbed'])
394
395        if dt == None or dt in self.testbed:
396            # Request for this fedd
397            found, owners = self.lookup_access(req, fid)
398            # keep track of what's been added
399            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
400            aid = unicode(allocID)
401
402            self.state_lock.acquire()
403            self.state[aid] = { }
404            self.state[aid]['user'] = found
405            self.state[aid]['owners'] = owners
406            self.write_state()
407            self.state_lock.release()
408            for o in owners:
409                self.auth.set_attribute(o, allocID)
410            self.auth.set_attribute(allocID, allocID)
411
412            try:
413                f = open("%s/%s.pem" % (self.certdir, aid), "w")
414                print >>f, alloc_cert
415                f.close()
[d3c8759]416            except EnvironmentError, e:
[23dec62]417                raise service_error(service_error.internal, 
418                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
419            return { 'allocID': { 'fedid': allocID } }
420        else:
421            if self.allow_proxy:
422                resp = self.proxy_RequestAccess.call_service(dt, req,
423                            self.cert_file, self.cert_pwd,
424                            self.trusted_certs)
425                if resp.has_key('RequestAccessResponseBody'):
426                    return resp['RequestAccessResponseBody']
427                else:
428                    return None
429            else:
430                raise service_error(service_error.access,
431                        "Access proxying denied")
432
433    def ReleaseAccess(self, req, fid):
434        # The dance to get into the request body
435        if req.has_key('ReleaseAccessRequestBody'):
436            req = req['ReleaseAccessRequestBody']
437        else:
438            raise service_error(service_error.req, "No request!?")
439
440        if req.has_key('destinationTestbed'):
441            dt = unpack_id(req['destinationTestbed'])
442        else:
443            dt = None
444
445        if dt == None or dt in self.testbed:
446            # Local request
447            try:
448                if req['allocID'].has_key('localname'):
449                    auth_attr = aid = req['allocID']['localname']
450                elif req['allocID'].has_key('fedid'):
451                    aid = unicode(req['allocID']['fedid'])
452                    auth_attr = req['allocID']['fedid']
453                else:
454                    raise service_error(service_error.req,
455                            "Only localnames and fedids are understood")
456            except KeyError:
457                raise service_error(service_error.req, "Badly formed request")
458
459            self.log.debug("[access] deallocation requested for %s", aid)
460            if not self.auth.check_attribute(fid, auth_attr):
461                self.log.debug("[access] deallocation denied for %s", aid)
462                raise service_error(service_error.access, "Access Denied")
463
464            self.state_lock.acquire()
465            if self.state.has_key(aid):
466                self.log.debug("Found allocation for %s" %aid)
467                del self.state[aid]
468                self.write_state()
469                self.state_lock.release()
470                # And remove the access cert
471                cf = "%s/%s.pem" % (self.certdir, aid)
472                self.log.debug("Removing %s" % cf)
473                os.remove(cf)
474                return { 'allocID': req['allocID'] } 
475            else:
476                self.state_lock.release()
477                raise service_error(service_error.req, "No such allocation")
478
479        else:
480            if self.allow_proxy:
481                resp = self.proxy_ReleaseAccess.call_service(dt, req,
482                            self.cert_file, self.cert_pwd,
483                            self.trusted_certs)
484                if resp.has_key('ReleaseAccessResponseBody'):
485                    return resp['ReleaseAccessResponseBody']
486                else:
487                    return None
488            else:
489                raise service_error(service_error.access,
490                        "Access proxying denied")
491
492    def extract_parameters(self, top):
493        """
494        DRAGON currently supports a fixed capacity link between two endpoints.
495        Those endpoints may specify a VPN (or list or range) to use.  This
496        extracts the DRAGON endpoints and vpn preferences from the segments (as
497        attributes) and the capacity of the connection as given by the
498        substrate.  The two endpoints VLAN choices are intersected to get set
499        of VLANs that are acceptable (no VLAN requiremnets means any is
500        acceptable).
501        """
502        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
503
504        if len(segments) != 2 or len(top.substrates) != 1:
505            raise service_error(service_error.req,
506                    "Requests to DRAGON must have exactlty two segments " +\
507                            "and one substrate")
508
509        ends = [ ]
510        for s in segments:
511            ep = s.get_attribute('dragon_endpoint')
512            if not ep:
513                raise service_error(service_error.req, 
[69692a9]514                        "Missing DRAGON endpoint for %s" % s.id)
[23dec62]515            v = s.get_attribute('vlans')
516            vlans = None
517            vset = set()
518            # the vlans can be a single integer, a comma separated list or a
519            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
520            # 100,300-305,400
521            if v:
522                if v.count(",") > 0 :
523                    vl = [ x.strip() for x in v.split(",") ]
524                else:
525                    vl = [ v ]
526                for v in vl:
527                    try:
528                        if v.count("-")> 0:
529                            f, t = v.split("-", 1)
530                            for i in range(int(f), int(t)+1):
531                                vset.add(i)
532                        else:
533                            vset.add(int(v))
534                    except ValueError:
535                        raise service_error(service_error.req, 
536                                "VLAN tags must be integers (%s)" %s.name)
537            if len(vset) > 0:
538                if vlans: vlans &= vest
539                else: vlans = vset
540            ends.append(ep)
541
542
543        sub = top.substrates[0]
544        if sub.capacity:
545            cap = int(sub.capacity.rate / 1000.0)
546            if cap < 1: cap = 1
547        else:
548            cap = 100
549
[36fec1b]550
551        # DRAGON's command line tool barfs if the source (ends[0]) is not in
552        # the domain controlled by the IDC.  This code ensures this situation.
553        if self.domain and not ends[0].endswith(self.domain):
554            hold = ends[0]
555            for i, e in enumerate(ends):
556                if i == 0: continue
557                if e.endswith(self.domain):
558                    ends[0] = e
559                    ends[i] = hold
560                    break
561            else:
562                raise service_error(service_error.req, 
563                        "No endpoint in my domain")
564
565
[23dec62]566        return cap, ends[0], ends[1], vlans
567
568
569    def start_segment(self, repo, fr, to, cap, vpns=[], start=None, end=None,
570            log=None):
571        """
572        Do the actual work of creating the dragon connecton.
573        """
574        if not log: log = self.log
575        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
576        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
577        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
578        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
579        path_re = re.compile("Path:")
580
581        if not vpns:
582            vpns = [ None, None, None, None, None]
583
584        if not start:
585            start = time.time()
586        if not end:
[6f82229]587            end = start + self.duration *60
[23dec62]588
589
590        status = None
591        gri = None
592        rv = None
593        vlan_no = None
594        for v in vpns:
595            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
596                'createReservation', '-repo',  repo , '-url', self.idc_url,
597                '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
598                '-vlan', "%s" % v, '-desc', 'fedd created connection',
599                '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
600                '-end', "%d" % int(end)]
601            log.debug("[start_segment]: %s" % " ".join(cmd))
602            if not self.create_debug: 
603                p = Popen(cmd, cwd=self.cli_dir, 
604                        stdout=PIPE, stderr=STDOUT,
605                        close_fds=True)
606                for line in p.stdout:
607                    m = status_re.match(line)
608                    if m:
609                        status = m.group(1)
610                        continue
611                    m = gri_re.match(line)
612                    if m:
613                        gri = m.group(1)
614                rv = p.wait()
615            else: 
616                rv = 0
617                status = 'ACCEPTED'
618                gri = 'debug_gri'
619
620            # Reservation in progress.  Poll the IDC until we know the outcome
621            cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
622                'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
623
[ab33158]624            while status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
[23dec62]625                log.debug("[start_segment]: %s" % " ".join(cmd))
626                if not self.create_debug:
627                    p = Popen(cmd, cwd=self.cli_dir, 
628                            stdout=PIPE, stderr=STDOUT,
629                            close_fds=True)
630                    in_path = False
631                    vpn1 = None
632                    vpn2 = None
633                    src = None
634                    dest = None
635                    for line in p.stdout:
636                        if not in_path:
637                            m = status_re.match(line)
638                            if m: 
639                                status = m.group(1)
640                                continue
641                            m = source_re.match(line)
642                            if m:
643                                src = m.group(1)
644                                continue
645                            m = dest_re.match(line)
646                            if m:
647                                dest = m.group(1)
648                                continue
649                            m = path_re.match(line)
650                            if m:
651                                in_path = True
652                                if src and dest:
653                                    vpn1_re = re.compile(
654                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
655                                                    src.replace("*", "\*"))
656                                    vpn2_re = re.compile(
657                                            "\s*%s,\s*\w+\s*,\s*(\d+)" % \
658                                                    dest.replace("*", "\*"))
659                                else:
660                                    raise service_error(service_error.internal, 
661                                            "Strange output from query")
662                        else:
663                            m = vpn1_re.match(line)
664                            if m:
665                                vpn1 = m.group(1)
666                                continue
667                            m = vpn2_re.match(line)
668                            if m:
669                                vpn2 = m.group(1)
670                                continue
671                    rv = p.wait()
672                    if vpn1 == vpn2:
673                        if v is not None:
674                            if int(vpn1) == v:
675                                vlan_no = int(v)
676                            else:
677                                raise service_error(service_error.federant, 
678                                        "Unexpected vlan assignment")
679                        else:
680                            vlan_no = int(v)
681                    else:
682                        raise service_error(service_error.internal,
683                                "Different VPNs on DRAGON ends")
[68bb551]684                    log.debug("Status: %s" % status or "none")
[23dec62]685                else:
686                    status = 'ACTIVE'
687                    vlan_no = int(v) or 1
[ab33158]688                if status in ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING'):
689                    time.sleep(45)
[23dec62]690            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
691                break
692
[7a011e9]693        self.log.debug("made reservation %s %s" % (gri, vlan_no))
694
[23dec62]695        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
696            return gri, vlan_no
697        else:
[69692a9]698            raise service_error(service_error.federant, 
699                    "Cannot make reservation")
[23dec62]700
701    def stop_segment(self, repo, gri, log=None):
702        if not log: log = self.log
703        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
704            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
705
706
707        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
708        if not self.create_debug:
709            try:
710                f = open("/dev/null", "w")
711                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
712                f.close()
[d3c8759]713            except EnvironmentError, e:
[23dec62]714                raise service_error(service_error.internal, 
715                        "Failed to open /dev/null: %s" % e)
716
[7a011e9]717    def export_store_info(self, cf, vlan, connInfo):
718        """
719        For the export requests in the connection info, install the peer names
720        at the experiment controller via SetValue calls.
721        """
722
723        for c in connInfo:
724            for p in [ p for p in c.get('parameter', []) \
725                    if p.get('type', '') == 'output']:
726
727                if p.get('name', '') != 'vlan_id':
728                    self.log.error("Unknown export parameter: %s" % \
729                            p.get('name'))
730                    continue
731
732                k = p.get('key', None)
733                surl = p.get('store', None)
734                if surl and k:
735                    value = "%s" % vlan
736                    req = { 'name': k, 'value': value }
737                    print "calling SetValue %s %s" % (surl, req)
738                    self.call_SetValue(surl, req, cf)
739                else:
740                    self.log.error("Bad export request: %s" % p)
741
742    def import_store_info(self, cf, connInfo):
743        """
744        Pull any import parameters in connInfo in.  We translate them either
745        into known member names or fedAddrs.
746        """
747
748        for c in connInfo:
749            for p in [ p for p in c.get('parameter', []) \
750                    if p.get('type', '') == 'input']:
751                name = p.get('name', None)
752                key = p.get('key', None)
753                store = p.get('store', None)
754
755                if name and key and store :
756                    req = { 'name': key, 'wait': True }
757                    r = self.call_GetValue(store, req, cf)
758                    r = r.get('GetValueResponseBody', None)
759                    if r :
760                        if r.get('name', '') == key:
761                            v = r.get('value', None)
762                            if v is not None:
763                                if name == 'peer':
764                                    c['peer'] = v
765                                else:
766                                    if c.has_key('fedAttr'):
767                                        c['fedAttr'].append({
768                                            'attribute': name, 'value': value})
769                                    else:
770                                        c['fedAttr']= [{
771                                            'attribute': name, 'value': value}]
772                            else:
773                                raise service_error(service_error.internal, 
774                                        'None value exported for %s'  % key)
775                        else:
776                            raise service_error(service_error.internal, 
777                                    'Different name returned for %s: %s' \
778                                            % (key, r.get('name','')))
779                    else:
780                        raise service_error(service_error.internal, 
781                            'Badly formatted response: no GetValueResponseBody')
782                else:
783                    raise service_error(service_error.internal, 
784                        'Bad Services missing info for import %s' % c)
785
786
[23dec62]787    def StartSegment(self, req, fid):
788        err = None  # Any service_error generated after tmpdir is created
789        rv = None   # Return value from segment creation
790
791        try:
792            req = req['StartSegmentRequestBody']
793        except KeyError:
794            raise service_error(server_error.req, "Badly formed request")
795
796        auth_attr = req['allocID']['fedid']
797        aid = "%s" % auth_attr
798        attrs = req.get('fedAttr', [])
799        if not self.auth.check_attribute(fid, auth_attr):
800            raise service_error(service_error.access, "Access denied")
[7a011e9]801        else:
[1627c3d]802            # See if this is a replay of an earlier succeeded StartSegment -
803            # sometimes SSL kills 'em.  If so, replay the response rather than
804            # redoing the allocation.
805            self.state_lock.acquire()
[ab847bc]806            retval = self.state[aid].get('started', None)
[1627c3d]807            self.state_lock.release()
808            if retval:
809                self.log.warning("Duplicate StartSegment for %s: " % aid + \
810                        "replaying response")
811                return retval
812
813        certfile = "%s/%s.pem" % (self.certdir, aid)
[23dec62]814
815        if req.has_key('segmentdescription') and \
816                req['segmentdescription'].has_key('topdldescription'):
817            topo = \
818                topdl.Topology(**req['segmentdescription']['topdldescription'])
819        else:
820            raise service_error(service_error.req, 
821                    "Request missing segmentdescription'")
822
[7a011e9]823        connInfo = req.get('connection', [])
824
[23dec62]825        cap, src, dest, vlans = self.extract_parameters(topo)
826        ename = aid
827
828        for a in attrs:
829            if a['attribute'] == 'experiment_name':
830                ename = a['value']
831
832        repo = None
833        self.state_lock.acquire()
834        if self.state.has_key(aid):
835            repo = self.state[aid]['user']
836            self.state[aid]['log'] = [ ]
837            # Create a logger that logs to the experiment's state object as
838            # well as to the main log file.
839            alloc_log = logging.getLogger('fedd.access.%s' % ename)
840            h = logging.StreamHandler(
841                    list_log.list_log(self.state[aid]['log']))
842            # XXX: there should be a global one of these rather than
843            # repeating the code.
844            h.setFormatter(logging.Formatter(
845                "%(asctime)s %(name)s %(message)s",
846                        '%d %b %y %H:%M:%S'))
847            alloc_log.addHandler(h)
848            self.write_state()
849        self.state_lock.release()
850
851        if not repo:
852            raise service_error(service_error.internal, 
853                    "Can't find creation user for %s" %aid)
854
[68bb551]855        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
856                log=alloc_log)
[23dec62]857
[7a011e9]858        self.export_store_info(certfile, vlan_no, connInfo)
859
[ab847bc]860
[23dec62]861        if gri:
[ab847bc]862            rtopo = topo.clone()
863            for s in rtopo.substrates:
864                s.set_attribute('vlan', vlan_no)
865                s.set_attribute('gri', gri)
866
[23dec62]867            # Grab the log (this is some anal locking, but better safe than
868            # sorry)
869            self.state_lock.acquire()
870            self.state[aid]['gri'] = gri
871            logv = "".join(self.state[aid]['log'])
[1627c3d]872            # It's possible that the StartSegment call gets retried (!).
873            # if the 'started' key is in the allocation, we'll return it rather
874            # than redo the setup.
[ab847bc]875            self.state[aid]['started'] = { 
[68bb551]876                    'allocID': req['allocID'],
877                    'allocationLog': logv,
[ab847bc]878                    'segmentdescription': {
879                        'topdldescription': rtopo.to_dict()
880                        },
[68bb551]881                    }
[ab847bc]882            retval = self.state[aid]['started']
[1627c3d]883            self.write_state()
884            self.state_lock.release()
885
886            return retval
[23dec62]887        elif err:
888            raise service_error(service_error.federant,
889                    "Swapin failed: %s" % err)
890        else:
891            raise service_error(service_error.federant, "Swapin failed")
892
893    def TerminateSegment(self, req, fid):
894        try:
895            req = req['TerminateSegmentRequestBody']
896        except KeyError:
897            raise service_error(server_error.req, "Badly formed request")
898
899        auth_attr = req['allocID']['fedid']
900        aid = "%s" % auth_attr
[ab33158]901
902        self.log.debug("Terminate request for %s" %aid)
[23dec62]903        attrs = req.get('fedAttr', [])
904        if not self.auth.check_attribute(fid, auth_attr):
905            raise service_error(service_error.access, "Access denied")
906
907        self.state_lock.acquire()
908        if self.state.has_key(aid):
909            gri = self.state[aid].get('gri', None)
910            user = self.state[aid].get('user', None)
911        else:
912            gri = None
913            user = None
914        self.state_lock.release()
[ab33158]915        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
[23dec62]916
917        if not gri:
918            raise service_error(service_error.internal, 
919                    "Can't find gri for %s" % aid)
920
921        if not user:
922            raise service_error(service_error.internal, 
923                    "Can't find creation user for %s" % aid)
[ab33158]924   
925        self.log.debug("Stop segment for GRI: %s" %gri)
[23dec62]926        self.stop_segment(user, gri)
927        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.