source: fedd/federation/dragon_access.py @ 814b5e5

axis_examplecompt_changesinfo-ops
Last change on this file since 814b5e5 was 6abed7b, checked in by Ted Faber <faber@…>, 15 years ago

derive from access_base and refactor a little.

  • Property mode set to 100644
File size: 18.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13from access import access_base
14
15from util import *
16from allocate_project import allocate_project_local, allocate_project_remote
17from access_project import access_project
18from fedid import fedid, generate_fedid
19from authorizer import authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_emulab_segment
30import local_emulab_segment
31
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
40class access(access_base):
41    """
42    The implementation of access control based on mapping users to projects.
43
44    Users can be mapped to existing projects or have projects created
45    dynamically.  This implements both direct requests and proxies.
46    """
47
48    def __init__(self, config=None, auth=None):
49        """
50        Initializer.  Pulls parameters out of the ConfigParser's access section.
51        """
52        access_base.__init__(self, config, auth)
53
54        self.cli_dir = config.get("access", "cli_dir")
55        self.axis2_home = config.get("access", "axis2_home")
56        self.idc_url = config.get("access", "idc")
57        self.domain = config.get("access", "domain")
58        self.duration = config.getint("access", "duration", 120)
59
60        self.access = { }
61        if not (self.cli_dir and self.axis2_home and self.idc_url):
62            self.log.error("Must specify all of cli_dir, axis2_home, " +\
63                    "idc in the [access] section of the configuration")
64
65        if config.has_option("access", "accessdb"):
66            self.read_access(config.get("access", "accessdb"), self.make_repo)
67
68        # Add the ownership attributes to the authorizer.  Note that the
69        # indices of the allocation dict are strings, but the attributes are
70        # fedids, so there is a conversion.
71        self.state_lock.acquire()
72        for k in self.state.keys():
73            for o in self.state[k].get('owners', []):
74                self.auth.set_attribute(o, fedid(hexstr=k))
75            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
76        self.state_lock.release()
77
78        self.lookup_access = self.lookup_access_base
79
80        self.call_GetValue= service_caller('GetValue')
81        self.call_SetValue= service_caller('SetValue')
82
83        self.soap_services = {\
84            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
85            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
86            'StartSegment': soap_handler("StartSegment", self.StartSegment),
87            'TerminateSegment': soap_handler("TerminateSegment", 
88                self.TerminateSegment),
89            }
90        self.xmlrpc_services =  {\
91            'RequestAccess': xmlrpc_handler('RequestAccess',
92                self.RequestAccess),
93            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
94                self.ReleaseAccess),
95            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
96            'TerminateSegment': xmlrpc_handler('TerminateSegment',
97                self.TerminateSegment),
98            }
99
100    @staticmethod
101    def make_repo(s):
102        """
103        Get the repo directory from an access line.  This is removing the ()
104        from the string.
105        """
106        rv = s.strip()
107        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
108        else: raise self.parse_error("Repo should be in parens");
109
110    def RequestAccess(self, req, fid):
111        """
112        Handle the access request.
113
114        Parse out the fields and make the allocations or rejections if for us,
115        otherwise, assuming we're willing to proxy, proxy the request out.
116        """
117
118        # The dance to get into the request body
119        if req.has_key('RequestAccessRequestBody'):
120            req = req['RequestAccessRequestBody']
121        else:
122            raise service_error(service_error.req, "No request!?")
123
124        if req.has_key('destinationTestbed'):
125            dt = unpack_id(req['destinationTestbed'])
126
127        # Request for this fedd
128        found, match = self.lookup_access(req, fid)
129        # keep track of what's been added
130        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
131        aid = unicode(allocID)
132
133        self.state_lock.acquire()
134        self.state[aid] = { }
135        self.state[aid]['user'] = found
136        self.state[aid]['owners'] = [ fid ]
137        self.write_state()
138        self.state_lock.release()
139        self.auth.set_attribute(fid, allocID)
140        self.auth.set_attribute(allocID, allocID)
141
142        try:
143            f = open("%s/%s.pem" % (self.certdir, aid), "w")
144            print >>f, alloc_cert
145            f.close()
146        except EnvironmentError, e:
147            raise service_error(service_error.internal, 
148                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
149        return { 'allocID': { 'fedid': allocID } }
150
151    def ReleaseAccess(self, req, fid):
152        # The dance to get into the request body
153        if req.has_key('ReleaseAccessRequestBody'):
154            req = req['ReleaseAccessRequestBody']
155        else:
156            raise service_error(service_error.req, "No request!?")
157
158        try:
159            if req['allocID'].has_key('localname'):
160                auth_attr = aid = req['allocID']['localname']
161            elif req['allocID'].has_key('fedid'):
162                aid = unicode(req['allocID']['fedid'])
163                auth_attr = req['allocID']['fedid']
164            else:
165                raise service_error(service_error.req,
166                        "Only localnames and fedids are understood")
167        except KeyError:
168            raise service_error(service_error.req, "Badly formed request")
169
170        self.log.debug("[access] deallocation requested for %s", aid)
171        if not self.auth.check_attribute(fid, auth_attr):
172            self.log.debug("[access] deallocation denied for %s", aid)
173            raise service_error(service_error.access, "Access Denied")
174
175        self.state_lock.acquire()
176        if self.state.has_key(aid):
177            self.log.debug("Found allocation for %s" %aid)
178            del self.state[aid]
179            self.write_state()
180            self.state_lock.release()
181            # And remove the access cert
182            cf = "%s/%s.pem" % (self.certdir, aid)
183            self.log.debug("Removing %s" % cf)
184            os.remove(cf)
185            return { 'allocID': req['allocID'] } 
186        else:
187            self.state_lock.release()
188            raise service_error(service_error.req, "No such allocation")
189
190    def extract_parameters(self, top):
191        """
192        DRAGON currently supports a fixed capacity link between two endpoints.
193        Those endpoints may specify a VPN (or list or range) to use.  This
194        extracts the DRAGON endpoints and vpn preferences from the segments (as
195        attributes) and the capacity of the connection as given by the
196        substrate.  The two endpoints VLAN choices are intersected to get set
197        of VLANs that are acceptable (no VLAN requiremnets means any is
198        acceptable).
199        """
200        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
201
202        if len(segments) != 2 or len(top.substrates) != 1:
203            raise service_error(service_error.req,
204                    "Requests to DRAGON must have exactlty two segments " +\
205                            "and one substrate")
206
207        ends = [ ]
208        for s in segments:
209            ep = s.get_attribute('dragon_endpoint')
210            if not ep:
211                raise service_error(service_error.req, 
212                        "Missing DRAGON endpoint for %s" % s.id)
213            v = s.get_attribute('vlans')
214            vlans = None
215            vset = set()
216            # the vlans can be a single integer, a comma separated list or a
217            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
218            # 100,300-305,400
219            if v:
220                if v.count(",") > 0 :
221                    vl = [ x.strip() for x in v.split(",") ]
222                else:
223                    vl = [ v ]
224                for v in vl:
225                    try:
226                        if v.count("-")> 0:
227                            f, t = v.split("-", 1)
228                            for i in range(int(f), int(t)+1):
229                                vset.add(i)
230                        else:
231                            vset.add(int(v))
232                    except ValueError:
233                        raise service_error(service_error.req, 
234                                "VLAN tags must be integers (%s)" %s.name)
235            if len(vset) > 0:
236                if vlans: vlans &= vest
237                else: vlans = vset
238            ends.append(ep)
239
240
241        sub = top.substrates[0]
242        if sub.capacity:
243            cap = int(sub.capacity.rate / 1000.0)
244            if cap < 1: cap = 1
245        else:
246            cap = 100
247
248
249        # DRAGON's command line tool barfs if the source (ends[0]) is not in
250        # the domain controlled by the IDC.  This code ensures this situation.
251        if self.domain and not ends[0].endswith(self.domain):
252            hold = ends[0]
253            for i, e in enumerate(ends):
254                if i == 0: continue
255                if e.endswith(self.domain):
256                    ends[0] = e
257                    ends[i] = hold
258                    break
259            else:
260                raise service_error(service_error.req, 
261                        "No endpoint in my domain")
262
263
264        return cap, ends[0], ends[1], vlans
265
266    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
267
268        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
269        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
270
271        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
272            'createReservation', '-repo',  repo , '-url', self.idc_url,
273            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
274            '-vlan', "%s" % v, '-desc', 'fedd created connection',
275            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
276            '-end', "%d" % int(end)]
277        log.debug("[start_segment]: %s" % " ".join(cmd))
278        if not self.create_debug: 
279            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
280                    close_fds=True)
281            for line in p.stdout:
282                m = status_re.match(line)
283                if m:
284                    status = m.group(1)
285                    continue
286                m = gri_re.match(line)
287                if m:
288                    gri = m.group(1)
289                    continue
290            rv = p.wait()
291        else: 
292            rv = 0
293            status = 'ACCEPTED'
294            gri = 'debug_gri'
295
296        return (rv, status, gri)
297
298    def oscars_query_vpn(self, repo, gri, v, log):
299        """
300        Call the oscars query command from the command line and parse out the
301        data to see if the current request succeeded.  This is a lot of fiddly
302        code to do  a pretty simple thing.
303        """
304        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
305        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
306        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
307        path_re = re.compile("Path:")
308
309        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
310            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
311        log.debug("[start_segment]: %s" % " ".join(cmd))
312        if not self.create_debug:
313            # Really do the query
314            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
315                    close_fds=True)
316            in_path = False
317            vpn1 = None
318            vpn2 = None
319            src = None
320            dest = None
321            for line in p.stdout:
322                if not in_path:
323                    m = status_re.match(line)
324                    if m: 
325                        status = m.group(1)
326                        continue
327                    m = source_re.match(line)
328                    if m:
329                        src = m.group(1)
330                        continue
331                    m = dest_re.match(line)
332                    if m:
333                        dest = m.group(1)
334                        continue
335                    m = path_re.match(line)
336                    if m:
337                        in_path = True
338                        if src and dest:
339                            vpn1_re = re.compile(
340                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
341                                            src.replace("*", "\*"))
342                            vpn2_re = re.compile(
343                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
344                                            dest.replace("*", "\*"))
345                        else:
346                            raise service_error(service_error.internal, 
347                                    "Strange output from query")
348                else:
349                    m = vpn1_re.match(line)
350                    if m:
351                        vpn1 = m.group(1)
352                        continue
353                    m = vpn2_re.match(line)
354                    if m:
355                        vpn2 = m.group(1)
356                        continue
357            rv = p.wait()
358            # Make sure that OSCARS did what we expected.
359            if vpn1 == vpn2:
360                if v is not None:
361                    if int(vpn1) == v:
362                        vlan_no = int(v)
363                    else:
364                        raise service_error(service_error.federant, 
365                                "Unexpected vlan assignment")
366                else:
367                    vlan_no = int(v or 0)
368            else:
369                raise service_error(service_error.internal,
370                        "Different VPNs on DRAGON ends")
371            log.debug("Status: %s" % status or "none")
372        else:
373            rv = 0
374            status = 'ACTIVE'
375            vlan_no = int(v or 1)
376
377        return (rv, status, vlan_no)
378
379
380
381    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
382            log=None):
383        """
384        Do the actual work of creating the dragon connecton.
385        """
386        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
387        if not log: log = self.log
388
389        if not vpns:
390            vpns = [ None, None, None, None, None]
391
392        if not start:
393            start = time.time()
394        if not end:
395            end = start + self.duration *60
396
397
398        status = None
399        gri = None
400        rv = None
401        vlan_no = None
402        for v in vpns:
403            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
404                    start, end, log)
405            # Reservation in progress.  Poll the IDC until we know the outcome
406            while status in waiting_states:
407                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
408                if status in waiting_states:
409                    time.sleep(45)
410            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
411                break
412
413        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
414            self.log.debug("made reservation %s %s" % (gri, vlan_no))
415            return gri, vlan_no
416        else:
417            raise service_error(service_error.federant, 
418                    "Cannot make reservation")
419
420    def stop_segment(self, repo, gri, log=None):
421        """
422        Terminate the reservation.
423        """
424        if not log: log = self.log
425        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
426            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
427
428
429        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
430        if not self.create_debug:
431            try:
432                f = open("/dev/null", "w")
433                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
434                f.close()
435            except EnvironmentError, e:
436                raise service_error(service_error.internal, 
437                        "Failed to open /dev/null: %s" % e)
438
439    def export_store_info(self, cf, vlan, connInfo):
440        """
441        For the export requests in the connection info, install the peer names
442        at the experiment controller via SetValue calls.
443        """
444
445        for c in connInfo:
446            for p in [ p for p in c.get('parameter', []) \
447                    if p.get('type', '') == 'output']:
448
449                if p.get('name', '') != 'vlan_id':
450                    self.log.error("Unknown export parameter: %s" % \
451                            p.get('name'))
452                    continue
453
454                k = p.get('key', None)
455                surl = p.get('store', None)
456                if surl and k:
457                    value = "%s" % vlan
458                    req = { 'name': k, 'value': value }
459                    print "calling SetValue %s %s" % (surl, req)
460                    self.call_SetValue(surl, req, cf)
461                else:
462                    self.log.error("Bad export request: %s" % p)
463
464    def initialize_experiment_info(self, aid, ename):
465        repo = None
466        self.state_lock.acquire()
467        if aid in self.state:
468            repo = self.state[aid].get('user', None)
469            self.state[aid]['log'] = [ ]
470            # Create a logger that logs to the experiment's state object as
471            # well as to the main log file.
472            alloc_log = logging.getLogger('fedd.access.%s' % ename)
473            h = logging.StreamHandler(
474                    list_log.list_log(self.state[aid]['log']))
475            # XXX: there should be a global one of these rather than
476            # repeating the code.
477            h.setFormatter(logging.Formatter(
478                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
479            alloc_log.addHandler(h)
480            self.write_state()
481        self.state_lock.release()
482        return (repo, alloc_log)
483
484    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id):
485        """
486        Place the relevant information in the global state block, and prepare
487        the response.
488        """
489        rtopo = topo.clone()
490        for s in rtopo.substrates:
491            s.set_attribute('vlan', vlan_no)
492            s.set_attribute('gri', gri)
493
494        # Grab the log (this is some anal locking, but better safe than
495        # sorry)
496        self.state_lock.acquire()
497        self.state[aid]['gri'] = gri
498        logv = "".join(self.state[aid]['log'])
499        # It's possible that the StartSegment call gets retried (!).
500        # if the 'started' key is in the allocation, we'll return it rather
501        # than redo the setup.
502        self.state[aid]['started'] = { 
503                'allocID': alloc_id,
504                'allocationLog': logv,
505                'segmentdescription': {
506                    'topdldescription': rtopo.to_dict()
507                    },
508                }
509        retval = copy.deepcopy(self.state[aid]['started'])
510        self.write_state()
511        self.state_lock.release()
512
513        return retval
514
515    def StartSegment(self, req, fid):
516        err = None  # Any service_error generated after tmpdir is created
517        rv = None   # Return value from segment creation
518
519        try:
520            req = req['StartSegmentRequestBody']
521            topref = req['segmentdescription']['topdldescription']
522        except KeyError:
523            raise service_error(server_error.req, "Badly formed request")
524
525        auth_attr = req['allocID']['fedid']
526        aid = "%s" % auth_attr
527        attrs = req.get('fedAttr', [])
528        if not self.auth.check_attribute(fid, auth_attr):
529            raise service_error(service_error.access, "Access denied")
530        else:
531            # See if this is a replay of an earlier succeeded StartSegment -
532            # sometimes SSL kills 'em.  If so, replay the response rather than
533            # redoing the allocation.
534            self.state_lock.acquire()
535            retval = self.state[aid].get('started', None)
536            self.state_lock.release()
537            if retval:
538                self.log.warning("Duplicate StartSegment for %s: " % aid + \
539                        "replaying response")
540                return retval
541
542        certfile = "%s/%s.pem" % (self.certdir, aid)
543
544        if topref:
545            topo = topdl.Topology(**topref)
546        else:
547            raise service_error(service_error.req, 
548                    "Request missing segmentdescription'")
549
550        connInfo = req.get('connection', [])
551
552        cap, src, dest, vlans = self.extract_parameters(topo)
553
554        for a in attrs:
555            if a['attribute'] == 'experiment_name':
556                ename = a['value']
557                break
558        else: ename = aid
559       
560        repo, alloc_log = self.initialize_experiment_info(aid, ename)
561
562        if not repo:
563            raise service_error(service_error.internal, 
564                    "Can't find creation user for %s" % aid)
565
566        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
567                log=alloc_log)
568
569        self.export_store_info(certfile, vlan_no, connInfo)
570
571
572        if gri:
573            return self.finalize_experiment(topo, vlan_no, gri, aid, 
574                    req['allocID'])
575        elif err:
576            raise service_error(service_error.federant,
577                    "Swapin failed: %s" % err)
578        else:
579            raise service_error(service_error.federant, "Swapin failed")
580
581    def TerminateSegment(self, req, fid):
582        try:
583            req = req['TerminateSegmentRequestBody']
584        except KeyError:
585            raise service_error(server_error.req, "Badly formed request")
586
587        auth_attr = req['allocID']['fedid']
588        aid = "%s" % auth_attr
589
590        self.log.debug("Terminate request for %s" %aid)
591        attrs = req.get('fedAttr', [])
592        if not self.auth.check_attribute(fid, auth_attr):
593            raise service_error(service_error.access, "Access denied")
594
595        self.state_lock.acquire()
596        if self.state.has_key(aid):
597            gri = self.state[aid].get('gri', None)
598            user = self.state[aid].get('user', None)
599        else:
600            gri = None
601            user = None
602        self.state_lock.release()
603        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
604
605        if not gri:
606            raise service_error(service_error.internal, 
607                    "Can't find gri for %s" % aid)
608
609        if not user:
610            raise service_error(service_error.internal, 
611                    "Can't find creation user for %s" % aid)
612   
613        self.log.debug("Stop segment for GRI: %s" %gri)
614        self.stop_segment(user, gri)
615        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.