source: fedd/federation/dragon_access.py @ d03c991

axis_examplecompt_changesinfo-ops
Last change on this file since d03c991 was d03c991, checked in by Ted Faber <faber@…>, 13 years ago

Dragon works under ABAC

  • Property mode set to 100644
File size: 19.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13from access import access_base
14from legacy_access import legacy_access
15
16from util import *
17from allocate_project import allocate_project_local, allocate_project_remote
18from fedid import fedid, generate_fedid
19from authorizer import authorizer, abac_authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_emulab_segment
30import local_emulab_segment
31
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
40class access(access_base, legacy_access):
41    """
42    The implementation of access control based on mapping users to projects.
43
44    Users can be mapped to existing projects or have projects created
45    dynamically.  This implements both direct requests and proxies.
46    """
47
48    def __init__(self, config=None, auth=None):
49        """
50        Initializer.  Pulls parameters out of the ConfigParser's access section.
51        """
52        access_base.__init__(self, config, auth)
53
54        self.cli_dir = config.get("access", "cli_dir")
55        self.axis2_home = config.get("access", "axis2_home")
56        self.idc_url = config.get("access", "idc")
57        self.domain = config.get("access", "domain")
58        self.duration = config.getint("access", "duration", 120)
59
60        self.access = { }
61        if not (self.cli_dir and self.axis2_home and self.idc_url):
62            self.log.error("Must specify all of cli_dir, axis2_home, " +\
63                    "idc in the [access] section of the configuration")
64
65        if config.has_option("access", "accessdb"):
66            self.read_access(config.get("access", "accessdb"), self.make_repo)
67
68
69        # authorization information
70        self.auth_type = config.get('access', 'auth_type') \
71                or 'legacy'
72        self.auth_dir = config.get('access', 'auth_dir')
73        accessdb = config.get("access", "accessdb")
74        # initialize the authorization system
75        if self.auth_type == 'legacy':
76            self.access = { }
77            if accessdb:
78                self.legacy_read_access(accessdb, self.make_repo)
79            # Add the ownership attributes to the authorizer.  Note that the
80            # indices of the allocation dict are strings, but the attributes are
81            # fedids, so there is a conversion.
82            self.state_lock.acquire()
83            for k in self.state.keys():
84                for o in self.state[k].get('owners', []):
85                    self.auth.set_attribute(o, fedid(hexstr=k))
86                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
87            self.state_lock.release()
88            self.lookup_access = self.legacy_lookup_access_base
89        elif self.auth_type == 'abac':
90            self.auth = abac_authorizer(load=self.auth_dir)
91            self.access = [ ]
92            if accessdb:
93                self.read_access(accessdb, self.make_repo)
94        else:
95            raise service_error(service_error.internal, 
96                    "Unknown auth_type: %s" % self.auth_type)
97
98        self.call_GetValue= service_caller('GetValue')
99        self.call_SetValue= service_caller('SetValue')
100
101        self.soap_services = {\
102            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
103            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
104            'StartSegment': soap_handler("StartSegment", self.StartSegment),
105            'TerminateSegment': soap_handler("TerminateSegment", 
106                self.TerminateSegment),
107            }
108        self.xmlrpc_services =  {\
109            'RequestAccess': xmlrpc_handler('RequestAccess',
110                self.RequestAccess),
111            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
112                self.ReleaseAccess),
113            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
114            'TerminateSegment': xmlrpc_handler('TerminateSegment',
115                self.TerminateSegment),
116            }
117
118    @staticmethod
119    def make_repo(s):
120        """
121        Get the repo directory from an access line.  This is removing the ()
122        from the string.
123        """
124        rv = s.strip()
125        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
126        else: raise self.parse_error("Repo should be in parens");
127
128    def RequestAccess(self, req, fid):
129        """
130        Handle the access request.
131
132        Parse out the fields and make the allocations or rejections if for us,
133        otherwise, assuming we're willing to proxy, proxy the request out.
134        """
135
136        # The dance to get into the request body
137        if req.has_key('RequestAccessRequestBody'):
138            req = req['RequestAccessRequestBody']
139        else:
140            raise service_error(service_error.req, "No request!?")
141
142        if req.has_key('destinationTestbed'):
143            dt = unpack_id(req['destinationTestbed'])
144
145        # Request for this fedd
146        found, match, owners = self.lookup_access(req, fid)
147        # keep track of what's been added
148        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
149        aid = unicode(allocID)
150
151        self.state_lock.acquire()
152        self.state[aid] = { }
153        self.state[aid]['user'] = found
154        self.state[aid]['owners'] = owners
155        self.write_state()
156        self.state_lock.release()
157        self.auth.set_attribute(fid, allocID)
158        self.auth.set_attribute(allocID, allocID)
159        self.auth.save()
160
161        try:
162            f = open("%s/%s.pem" % (self.certdir, aid), "w")
163            print >>f, alloc_cert
164            f.close()
165        except EnvironmentError, e:
166            raise service_error(service_error.internal, 
167                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
168        return { 'allocID': { 'fedid': allocID } }
169
170    def ReleaseAccess(self, req, fid):
171        # The dance to get into the request body
172        if req.has_key('ReleaseAccessRequestBody'):
173            req = req['ReleaseAccessRequestBody']
174        else:
175            raise service_error(service_error.req, "No request!?")
176
177        try:
178            if req['allocID'].has_key('localname'):
179                auth_attr = aid = req['allocID']['localname']
180            elif req['allocID'].has_key('fedid'):
181                aid = unicode(req['allocID']['fedid'])
182                auth_attr = req['allocID']['fedid']
183            else:
184                raise service_error(service_error.req,
185                        "Only localnames and fedids are understood")
186        except KeyError:
187            raise service_error(service_error.req, "Badly formed request")
188
189        self.log.debug("[access] deallocation requested for %s", aid)
190        if not self.auth.check_attribute(fid, auth_attr):
191            self.log.debug("[access] deallocation denied for %s", aid)
192            raise service_error(service_error.access, "Access Denied")
193
194        self.state_lock.acquire()
195        if self.state.has_key(aid):
196            self.log.debug("Found allocation for %s" %aid)
197            del self.state[aid]
198            self.write_state()
199            self.state_lock.release()
200            # And remove the access cert
201            cf = "%s/%s.pem" % (self.certdir, aid)
202            self.log.debug("Removing %s" % cf)
203            os.remove(cf)
204            return { 'allocID': req['allocID'] } 
205        else:
206            self.state_lock.release()
207            raise service_error(service_error.req, "No such allocation")
208
209    def extract_parameters(self, top):
210        """
211        DRAGON currently supports a fixed capacity link between two endpoints.
212        Those endpoints may specify a VPN (or list or range) to use.  This
213        extracts the DRAGON endpoints and vpn preferences from the segments (as
214        attributes) and the capacity of the connection as given by the
215        substrate.  The two endpoints VLAN choices are intersected to get set
216        of VLANs that are acceptable (no VLAN requiremnets means any is
217        acceptable).
218        """
219        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
220
221        if len(segments) != 2 or len(top.substrates) != 1:
222            raise service_error(service_error.req,
223                    "Requests to DRAGON must have exactlty two segments " +\
224                            "and one substrate")
225
226        ends = [ ]
227        for s in segments:
228            ep = s.get_attribute('dragon_endpoint')
229            if not ep:
230                raise service_error(service_error.req, 
231                        "Missing DRAGON endpoint for %s" % s.id)
232            v = s.get_attribute('vlans')
233            vlans = None
234            vset = set()
235            # the vlans can be a single integer, a comma separated list or a
236            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
237            # 100,300-305,400
238            if v:
239                if v.count(",") > 0 :
240                    vl = [ x.strip() for x in v.split(",") ]
241                else:
242                    vl = [ v ]
243                for v in vl:
244                    try:
245                        if v.count("-")> 0:
246                            f, t = v.split("-", 1)
247                            for i in range(int(f), int(t)+1):
248                                vset.add(i)
249                        else:
250                            vset.add(int(v))
251                    except ValueError:
252                        raise service_error(service_error.req, 
253                                "VLAN tags must be integers (%s)" %s.name)
254            if len(vset) > 0:
255                if vlans: vlans &= vest
256                else: vlans = vset
257            ends.append(ep)
258
259
260        sub = top.substrates[0]
261        if sub.capacity:
262            cap = int(sub.capacity.rate / 1000.0)
263            if cap < 1: cap = 1
264        else:
265            cap = 100
266
267
268        # DRAGON's command line tool barfs if the source (ends[0]) is not in
269        # the domain controlled by the IDC.  This code ensures this situation.
270        if self.domain and not ends[0].endswith(self.domain):
271            hold = ends[0]
272            for i, e in enumerate(ends):
273                if i == 0: continue
274                if e.endswith(self.domain):
275                    ends[0] = e
276                    ends[i] = hold
277                    break
278            else:
279                raise service_error(service_error.req, 
280                        "No endpoint in my domain")
281
282
283        return cap, ends[0], ends[1], vlans
284
285    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
286
287        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
288        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
289
290        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
291            'createReservation', '-repo',  repo , '-url', self.idc_url,
292            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
293            '-vlan', "%s" % v, '-desc', 'fedd created connection',
294            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
295            '-end', "%d" % int(end)]
296        log.debug("[start_segment]: %s" % " ".join(cmd))
297        if not self.create_debug: 
298            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
299                    close_fds=True)
300            for line in p.stdout:
301                m = status_re.match(line)
302                if m:
303                    status = m.group(1)
304                    continue
305                m = gri_re.match(line)
306                if m:
307                    gri = m.group(1)
308                    continue
309            rv = p.wait()
310        else: 
311            rv = 0
312            status = 'ACCEPTED'
313            gri = 'debug_gri'
314
315        return (rv, status, gri)
316
317    def oscars_query_vpn(self, repo, gri, v, log):
318        """
319        Call the oscars query command from the command line and parse out the
320        data to see if the current request succeeded.  This is a lot of fiddly
321        code to do  a pretty simple thing.
322        """
323        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
324        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
325        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
326        path_re = re.compile("Path:")
327
328        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
329            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
330        log.debug("[start_segment]: %s" % " ".join(cmd))
331        if not self.create_debug:
332            # Really do the query
333            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
334                    close_fds=True)
335            in_path = False
336            vpn1 = None
337            vpn2 = None
338            src = None
339            dest = None
340            for line in p.stdout:
341                if not in_path:
342                    m = status_re.match(line)
343                    if m: 
344                        status = m.group(1)
345                        continue
346                    m = source_re.match(line)
347                    if m:
348                        src = m.group(1)
349                        continue
350                    m = dest_re.match(line)
351                    if m:
352                        dest = m.group(1)
353                        continue
354                    m = path_re.match(line)
355                    if m:
356                        in_path = True
357                        if src and dest:
358                            vpn1_re = re.compile(
359                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
360                                            src.replace("*", "\*"))
361                            vpn2_re = re.compile(
362                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
363                                            dest.replace("*", "\*"))
364                        else:
365                            raise service_error(service_error.internal, 
366                                    "Strange output from query")
367                else:
368                    m = vpn1_re.match(line)
369                    if m:
370                        vpn1 = m.group(1)
371                        continue
372                    m = vpn2_re.match(line)
373                    if m:
374                        vpn2 = m.group(1)
375                        continue
376            rv = p.wait()
377            # Make sure that OSCARS did what we expected.
378            if vpn1 == vpn2:
379                if v is not None:
380                    if int(vpn1) == v:
381                        vlan_no = int(v)
382                    else:
383                        raise service_error(service_error.federant, 
384                                "Unexpected vlan assignment")
385                else:
386                    vlan_no = int(v or 0)
387            else:
388                raise service_error(service_error.internal,
389                        "Different VPNs on DRAGON ends")
390            log.debug("Status: %s" % status or "none")
391        else:
392            rv = 0
393            status = 'ACTIVE'
394            vlan_no = int(v or 1)
395
396        return (rv, status, vlan_no)
397
398
399
400    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
401            log=None):
402        """
403        Do the actual work of creating the dragon connecton.
404        """
405        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
406        if not log: log = self.log
407
408        if not vpns:
409            vpns = [ None, None, None, None, None]
410
411        if not start:
412            start = time.time()
413        if not end:
414            end = start + self.duration *60
415
416
417        status = None
418        gri = None
419        rv = None
420        vlan_no = None
421        for v in vpns:
422            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
423                    start, end, log)
424            # Reservation in progress.  Poll the IDC until we know the outcome
425            while status in waiting_states:
426                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
427                if status in waiting_states:
428                    time.sleep(45)
429            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
430                break
431
432        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
433            self.log.debug("made reservation %s %s" % (gri, vlan_no))
434            return gri, vlan_no
435        else:
436            raise service_error(service_error.federant, 
437                    "Cannot make reservation")
438
439    def stop_segment(self, repo, gri, log=None):
440        """
441        Terminate the reservation.
442        """
443        if not log: log = self.log
444        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
445            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
446
447
448        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
449        if not self.create_debug:
450            try:
451                f = open("/dev/null", "w")
452                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
453                f.close()
454            except EnvironmentError, e:
455                raise service_error(service_error.internal, 
456                        "Failed to open /dev/null: %s" % e)
457
458    def export_store_info(self, cf, vlan, connInfo):
459        """
460        For the export requests in the connection info, install the peer names
461        at the experiment controller via SetValue calls.
462        """
463
464        for c in connInfo:
465            for p in [ p for p in c.get('parameter', []) \
466                    if p.get('type', '') == 'output']:
467
468                if p.get('name', '') != 'vlan_id':
469                    self.log.error("Unknown export parameter: %s" % \
470                            p.get('name'))
471                    continue
472
473                k = p.get('key', None)
474                surl = p.get('store', None)
475                if surl and k:
476                    value = "%s" % vlan
477                    req = { 'name': k, 'value': value }
478                    print "calling SetValue %s %s" % (surl, req)
479                    self.call_SetValue(surl, req, cf)
480                else:
481                    self.log.error("Bad export request: %s" % p)
482
483    def initialize_experiment_info(self, aid, ename):
484        repo = None
485        self.state_lock.acquire()
486        if aid in self.state:
487            repo = self.state[aid].get('user', None)
488            self.state[aid]['log'] = [ ]
489            # Create a logger that logs to the experiment's state object as
490            # well as to the main log file.
491            alloc_log = logging.getLogger('fedd.access.%s' % ename)
492            h = logging.StreamHandler(
493                    list_log.list_log(self.state[aid]['log']))
494            # XXX: there should be a global one of these rather than
495            # repeating the code.
496            h.setFormatter(logging.Formatter(
497                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
498            alloc_log.addHandler(h)
499            self.write_state()
500        self.state_lock.release()
501        return (repo, alloc_log)
502
503    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id):
504        """
505        Place the relevant information in the global state block, and prepare
506        the response.
507        """
508        rtopo = topo.clone()
509        for s in rtopo.substrates:
510            s.set_attribute('vlan', vlan_no)
511            s.set_attribute('gri', gri)
512
513        # Grab the log (this is some anal locking, but better safe than
514        # sorry)
515        self.state_lock.acquire()
516        self.state[aid]['gri'] = gri
517        logv = "".join(self.state[aid]['log'])
518        # It's possible that the StartSegment call gets retried (!).
519        # if the 'started' key is in the allocation, we'll return it rather
520        # than redo the setup.
521        self.state[aid]['started'] = { 
522                'allocID': alloc_id,
523                'allocationLog': logv,
524                'segmentdescription': {
525                    'topdldescription': rtopo.to_dict()
526                    },
527                }
528        retval = copy.deepcopy(self.state[aid]['started'])
529        self.write_state()
530        self.state_lock.release()
531
532        return retval
533
534    def StartSegment(self, req, fid):
535        err = None  # Any service_error generated after tmpdir is created
536        rv = None   # Return value from segment creation
537
538        try:
539            req = req['StartSegmentRequestBody']
540            topref = req['segmentdescription']['topdldescription']
541        except KeyError:
542            raise service_error(server_error.req, "Badly formed request")
543
544        auth_attr = req['allocID']['fedid']
545        aid = "%s" % auth_attr
546        attrs = req.get('fedAttr', [])
547        if not self.auth.check_attribute(fid, auth_attr):
548            raise service_error(service_error.access, "Access denied")
549        else:
550            # See if this is a replay of an earlier succeeded StartSegment -
551            # sometimes SSL kills 'em.  If so, replay the response rather than
552            # redoing the allocation.
553            self.state_lock.acquire()
554            retval = self.state[aid].get('started', None)
555            self.state_lock.release()
556            if retval:
557                self.log.warning("Duplicate StartSegment for %s: " % aid + \
558                        "replaying response")
559                return retval
560
561        certfile = "%s/%s.pem" % (self.certdir, aid)
562
563        if topref:
564            topo = topdl.Topology(**topref)
565        else:
566            raise service_error(service_error.req, 
567                    "Request missing segmentdescription'")
568
569        connInfo = req.get('connection', [])
570
571        cap, src, dest, vlans = self.extract_parameters(topo)
572
573        for a in attrs:
574            if a['attribute'] == 'experiment_name':
575                ename = a['value']
576                break
577        else: ename = aid
578       
579        repo, alloc_log = self.initialize_experiment_info(aid, ename)
580
581        if not repo:
582            raise service_error(service_error.internal, 
583                    "Can't find creation user for %s" % aid)
584
585        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
586                log=alloc_log)
587
588        self.export_store_info(certfile, vlan_no, connInfo)
589
590
591        if gri:
592            return self.finalize_experiment(topo, vlan_no, gri, aid, 
593                    req['allocID'])
594        elif err:
595            raise service_error(service_error.federant,
596                    "Swapin failed: %s" % err)
597        else:
598            raise service_error(service_error.federant, "Swapin failed")
599
600    def TerminateSegment(self, req, fid):
601        try:
602            req = req['TerminateSegmentRequestBody']
603        except KeyError:
604            raise service_error(server_error.req, "Badly formed request")
605
606        auth_attr = req['allocID']['fedid']
607        aid = "%s" % auth_attr
608
609        self.log.debug("Terminate request for %s" %aid)
610        attrs = req.get('fedAttr', [])
611        if not self.auth.check_attribute(fid, auth_attr):
612            raise service_error(service_error.access, "Access denied")
613
614        self.state_lock.acquire()
615        if self.state.has_key(aid):
616            gri = self.state[aid].get('gri', None)
617            user = self.state[aid].get('user', None)
618        else:
619            gri = None
620            user = None
621        self.state_lock.release()
622        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
623
624        if not gri:
625            raise service_error(service_error.internal, 
626                    "Can't find gri for %s" % aid)
627
628        if not user:
629            raise service_error(service_error.internal, 
630                    "Can't find creation user for %s" % aid)
631   
632        self.log.debug("Stop segment for GRI: %s" %gri)
633        self.stop_segment(user, gri)
634        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.