source: fedd/federation/dragon_access.py @ c65b7e4

axis_examplecompt_changesinfo-ops
Last change on this file since c65b7e4 was c65b7e4, checked in by Ted Faber <faber@…>, 13 years ago

Access controllers delete (some) unused ABAC attrs.

  • Property mode set to 100644
File size: 19.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13from access import access_base
14from legacy_access import legacy_access
15
16from util import *
17from allocate_project import allocate_project_local, allocate_project_remote
18from fedid import fedid, generate_fedid
19from authorizer import authorizer, abac_authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_emulab_segment
30import local_emulab_segment
31
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
40class access(access_base, legacy_access):
41    """
42    The implementation of access control based on mapping users to projects.
43
44    Users can be mapped to existing projects or have projects created
45    dynamically.  This implements both direct requests and proxies.
46    """
47
48    def __init__(self, config=None, auth=None):
49        """
50        Initializer.  Pulls parameters out of the ConfigParser's access section.
51        """
52        access_base.__init__(self, config, auth)
53
54        self.cli_dir = config.get("access", "cli_dir")
55        self.axis2_home = config.get("access", "axis2_home")
56        self.idc_url = config.get("access", "idc")
57        self.domain = config.get("access", "domain")
58        self.duration = config.getint("access", "duration", 120)
59
60        self.access = { }
61        if not (self.cli_dir and self.axis2_home and self.idc_url):
62            self.log.error("Must specify all of cli_dir, axis2_home, " +\
63                    "idc in the [access] section of the configuration")
64
65        # authorization information
66        self.auth_type = config.get('access', 'auth_type') \
67                or 'legacy'
68        self.auth_dir = config.get('access', 'auth_dir')
69        accessdb = config.get("access", "accessdb")
70        # initialize the authorization system
71        if self.auth_type == 'legacy':
72            self.access = { }
73            if accessdb:
74                self.legacy_read_access(accessdb, self.make_repo)
75            # Add the ownership attributes to the authorizer.  Note that the
76            # indices of the allocation dict are strings, but the attributes are
77            # fedids, so there is a conversion.
78            self.state_lock.acquire()
79            for k in self.state.keys():
80                for o in self.state[k].get('owners', []):
81                    self.auth.set_attribute(o, fedid(hexstr=k))
82                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
83            self.state_lock.release()
84            self.lookup_access = self.legacy_lookup_access_base
85        elif self.auth_type == 'abac':
86            self.auth = abac_authorizer(load=self.auth_dir)
87            self.access = [ ]
88            if accessdb:
89                self.read_access(accessdb, self.make_repo)
90        else:
91            raise service_error(service_error.internal, 
92                    "Unknown auth_type: %s" % self.auth_type)
93
94        self.call_GetValue= service_caller('GetValue')
95        self.call_SetValue= service_caller('SetValue')
96
97        self.soap_services = {\
98            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
99            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
100            'StartSegment': soap_handler("StartSegment", self.StartSegment),
101            'TerminateSegment': soap_handler("TerminateSegment", 
102                self.TerminateSegment),
103            }
104        self.xmlrpc_services =  {\
105            'RequestAccess': xmlrpc_handler('RequestAccess',
106                self.RequestAccess),
107            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
108                self.ReleaseAccess),
109            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
110            'TerminateSegment': xmlrpc_handler('TerminateSegment',
111                self.TerminateSegment),
112            }
113
114    @staticmethod
115    def make_repo(s):
116        """
117        Get the repo directory from an access line.  This is removing the ()
118        from the string.
119        """
120        rv = s.strip()
121        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
122        else: raise self.parse_error("Repo should be in parens");
123
124    def RequestAccess(self, req, fid):
125        """
126        Handle the access request.
127
128        Parse out the fields and make the allocations or rejections if for us,
129        otherwise, assuming we're willing to proxy, proxy the request out.
130        """
131
132        # The dance to get into the request body
133        if req.has_key('RequestAccessRequestBody'):
134            req = req['RequestAccessRequestBody']
135        else:
136            raise service_error(service_error.req, "No request!?")
137
138        if req.has_key('destinationTestbed'):
139            dt = unpack_id(req['destinationTestbed'])
140
141        # Request for this fedd
142        found, match, owners = self.lookup_access(req, fid)
143        # keep track of what's been added
144        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
145        aid = unicode(allocID)
146
147        self.state_lock.acquire()
148        self.state[aid] = { }
149        self.state[aid]['user'] = found
150        self.state[aid]['owners'] = owners
151        self.state[aid]['auth'] = set()
152        self.append_allocation_authorization(aid, 
153                ((fid, allocID),(allocID, allocID)))
154        self.write_state()
155        self.state_lock.release()
156
157        try:
158            f = open("%s/%s.pem" % (self.certdir, aid), "w")
159            print >>f, alloc_cert
160            f.close()
161        except EnvironmentError, e:
162            raise service_error(service_error.internal, 
163                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
164        return { 'allocID': { 'fedid': allocID } }
165
166    def ReleaseAccess(self, req, fid):
167        # The dance to get into the request body
168        if req.has_key('ReleaseAccessRequestBody'):
169            req = req['ReleaseAccessRequestBody']
170        else:
171            raise service_error(service_error.req, "No request!?")
172
173        try:
174            if req['allocID'].has_key('localname'):
175                auth_attr = aid = req['allocID']['localname']
176            elif req['allocID'].has_key('fedid'):
177                aid = unicode(req['allocID']['fedid'])
178                auth_attr = req['allocID']['fedid']
179            else:
180                raise service_error(service_error.req,
181                        "Only localnames and fedids are understood")
182        except KeyError:
183            raise service_error(service_error.req, "Badly formed request")
184
185        self.log.debug("[access] deallocation requested for %s", aid)
186        if not self.auth.check_attribute(fid, auth_attr):
187            self.log.debug("[access] deallocation denied for %s", aid)
188            raise service_error(service_error.access, "Access Denied")
189
190        self.state_lock.acquire()
191        if self.state.has_key(aid):
192            self.log.debug("Found allocation for %s" %aid)
193            self.clear_allocation_authorization(aid)
194            del self.state[aid]
195            self.write_state()
196            self.state_lock.release()
197            # And remove the access cert
198            cf = "%s/%s.pem" % (self.certdir, aid)
199            self.log.debug("Removing %s" % cf)
200            os.remove(cf)
201            return { 'allocID': req['allocID'] } 
202        else:
203            self.state_lock.release()
204            raise service_error(service_error.req, "No such allocation")
205
206    def extract_parameters(self, top):
207        """
208        DRAGON currently supports a fixed capacity link between two endpoints.
209        Those endpoints may specify a VPN (or list or range) to use.  This
210        extracts the DRAGON endpoints and vpn preferences from the segments (as
211        attributes) and the capacity of the connection as given by the
212        substrate.  The two endpoints VLAN choices are intersected to get set
213        of VLANs that are acceptable (no VLAN requiremnets means any is
214        acceptable).
215        """
216        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
217
218        if len(segments) != 2 or len(top.substrates) != 1:
219            raise service_error(service_error.req,
220                    "Requests to DRAGON must have exactlty two segments " +\
221                            "and one substrate")
222
223        ends = [ ]
224        for s in segments:
225            ep = s.get_attribute('dragon_endpoint')
226            if not ep:
227                raise service_error(service_error.req, 
228                        "Missing DRAGON endpoint for %s" % s.id)
229            v = s.get_attribute('vlans')
230            vlans = None
231            vset = set()
232            # the vlans can be a single integer, a comma separated list or a
233            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
234            # 100,300-305,400
235            if v:
236                if v.count(",") > 0 :
237                    vl = [ x.strip() for x in v.split(",") ]
238                else:
239                    vl = [ v ]
240                for v in vl:
241                    try:
242                        if v.count("-")> 0:
243                            f, t = v.split("-", 1)
244                            for i in range(int(f), int(t)+1):
245                                vset.add(i)
246                        else:
247                            vset.add(int(v))
248                    except ValueError:
249                        raise service_error(service_error.req, 
250                                "VLAN tags must be integers (%s)" %s.name)
251            if len(vset) > 0:
252                if vlans: vlans &= vest
253                else: vlans = vset
254            ends.append(ep)
255
256
257        sub = top.substrates[0]
258        if sub.capacity:
259            cap = int(sub.capacity.rate / 1000.0)
260            if cap < 1: cap = 1
261        else:
262            cap = 100
263
264
265        # DRAGON's command line tool barfs if the source (ends[0]) is not in
266        # the domain controlled by the IDC.  This code ensures this situation.
267        if self.domain and not ends[0].endswith(self.domain):
268            hold = ends[0]
269            for i, e in enumerate(ends):
270                if i == 0: continue
271                if e.endswith(self.domain):
272                    ends[0] = e
273                    ends[i] = hold
274                    break
275            else:
276                raise service_error(service_error.req, 
277                        "No endpoint in my domain")
278
279
280        return cap, ends[0], ends[1], vlans
281
282    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
283
284        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
285        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
286
287        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
288            'createReservation', '-repo',  repo , '-url', self.idc_url,
289            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
290            '-vlan', "%s" % v, '-desc', 'fedd created connection',
291            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
292            '-end', "%d" % int(end)]
293        log.debug("[start_segment]: %s" % " ".join(cmd))
294        if not self.create_debug: 
295            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
296                    close_fds=True)
297            for line in p.stdout:
298                m = status_re.match(line)
299                if m:
300                    status = m.group(1)
301                    continue
302                m = gri_re.match(line)
303                if m:
304                    gri = m.group(1)
305                    continue
306            rv = p.wait()
307        else: 
308            rv = 0
309            status = 'ACCEPTED'
310            gri = 'debug_gri'
311
312        return (rv, status, gri)
313
314    def oscars_query_vpn(self, repo, gri, v, log):
315        """
316        Call the oscars query command from the command line and parse out the
317        data to see if the current request succeeded.  This is a lot of fiddly
318        code to do  a pretty simple thing.
319        """
320        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
321        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
322        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
323        path_re = re.compile("Path:")
324
325        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
326            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
327        log.debug("[start_segment]: %s" % " ".join(cmd))
328        if not self.create_debug:
329            # Really do the query
330            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
331                    close_fds=True)
332            in_path = False
333            vpn1 = None
334            vpn2 = None
335            src = None
336            dest = None
337            for line in p.stdout:
338                if not in_path:
339                    m = status_re.match(line)
340                    if m: 
341                        status = m.group(1)
342                        continue
343                    m = source_re.match(line)
344                    if m:
345                        src = m.group(1)
346                        continue
347                    m = dest_re.match(line)
348                    if m:
349                        dest = m.group(1)
350                        continue
351                    m = path_re.match(line)
352                    if m:
353                        in_path = True
354                        if src and dest:
355                            vpn1_re = re.compile(
356                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
357                                            src.replace("*", "\*"))
358                            vpn2_re = re.compile(
359                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
360                                            dest.replace("*", "\*"))
361                        else:
362                            raise service_error(service_error.internal, 
363                                    "Strange output from query")
364                else:
365                    m = vpn1_re.match(line)
366                    if m:
367                        vpn1 = m.group(1)
368                        continue
369                    m = vpn2_re.match(line)
370                    if m:
371                        vpn2 = m.group(1)
372                        continue
373            rv = p.wait()
374            # Make sure that OSCARS did what we expected.
375            if vpn1 == vpn2:
376                if v is not None:
377                    if int(vpn1) == v:
378                        vlan_no = int(v)
379                    else:
380                        raise service_error(service_error.federant, 
381                                "Unexpected vlan assignment")
382                else:
383                    vlan_no = int(v or 0)
384            else:
385                raise service_error(service_error.internal,
386                        "Different VPNs on DRAGON ends")
387            log.debug("Status: %s" % status or "none")
388        else:
389            rv = 0
390            status = 'ACTIVE'
391            vlan_no = int(v or 1)
392
393        return (rv, status, vlan_no)
394
395
396
397    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
398            log=None):
399        """
400        Do the actual work of creating the dragon connecton.
401        """
402        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
403        if not log: log = self.log
404
405        if not vpns:
406            vpns = [ None, None, None, None, None]
407
408        if not start:
409            start = time.time()
410        if not end:
411            end = start + self.duration *60
412
413
414        status = None
415        gri = None
416        rv = None
417        vlan_no = None
418        for v in vpns:
419            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
420                    start, end, log)
421            # Reservation in progress.  Poll the IDC until we know the outcome
422            while status in waiting_states:
423                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
424                if status in waiting_states:
425                    time.sleep(45)
426            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
427                break
428
429        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
430            self.log.debug("made reservation %s %s" % (gri, vlan_no))
431            return gri, vlan_no
432        else:
433            raise service_error(service_error.federant, 
434                    "Cannot make reservation")
435
436    def stop_segment(self, repo, gri, log=None):
437        """
438        Terminate the reservation.
439        """
440        if not log: log = self.log
441        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
442            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
443
444
445        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
446        if not self.create_debug:
447            try:
448                f = open("/dev/null", "w")
449                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
450                f.close()
451            except EnvironmentError, e:
452                raise service_error(service_error.internal, 
453                        "Failed to open /dev/null: %s" % e)
454
455    def export_store_info(self, cf, vlan, connInfo):
456        """
457        For the export requests in the connection info, install the peer names
458        at the experiment controller via SetValue calls.
459        """
460
461        for c in connInfo:
462            for p in [ p for p in c.get('parameter', []) \
463                    if p.get('type', '') == 'output']:
464
465                if p.get('name', '') != 'vlan_id':
466                    self.log.error("Unknown export parameter: %s" % \
467                            p.get('name'))
468                    continue
469
470                k = p.get('key', None)
471                surl = p.get('store', None)
472                if surl and k:
473                    value = "%s" % vlan
474                    req = { 'name': k, 'value': value }
475                    print "calling SetValue %s %s" % (surl, req)
476                    self.call_SetValue(surl, req, cf)
477                else:
478                    self.log.error("Bad export request: %s" % p)
479
480    def initialize_experiment_info(self, aid, ename):
481        repo = None
482        self.state_lock.acquire()
483        if aid in self.state:
484            repo = self.state[aid].get('user', None)
485            self.state[aid]['log'] = [ ]
486            # Create a logger that logs to the experiment's state object as
487            # well as to the main log file.
488            alloc_log = logging.getLogger('fedd.access.%s' % ename)
489            h = logging.StreamHandler(
490                    list_log.list_log(self.state[aid]['log']))
491            # XXX: there should be a global one of these rather than
492            # repeating the code.
493            h.setFormatter(logging.Formatter(
494                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
495            alloc_log.addHandler(h)
496            self.write_state()
497        self.state_lock.release()
498        return (repo, alloc_log)
499
500    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id):
501        """
502        Place the relevant information in the global state block, and prepare
503        the response.
504        """
505        rtopo = topo.clone()
506        for s in rtopo.substrates:
507            s.set_attribute('vlan', vlan_no)
508            s.set_attribute('gri', gri)
509
510        # Grab the log (this is some anal locking, but better safe than
511        # sorry)
512        self.state_lock.acquire()
513        self.state[aid]['gri'] = gri
514        logv = "".join(self.state[aid]['log'])
515        # It's possible that the StartSegment call gets retried (!).
516        # if the 'started' key is in the allocation, we'll return it rather
517        # than redo the setup.
518        self.state[aid]['started'] = { 
519                'allocID': alloc_id,
520                'allocationLog': logv,
521                'segmentdescription': {
522                    'topdldescription': rtopo.to_dict()
523                    },
524                }
525        retval = copy.deepcopy(self.state[aid]['started'])
526        self.write_state()
527        self.state_lock.release()
528
529        return retval
530
531    def StartSegment(self, req, fid):
532        err = None  # Any service_error generated after tmpdir is created
533        rv = None   # Return value from segment creation
534
535        try:
536            req = req['StartSegmentRequestBody']
537            topref = req['segmentdescription']['topdldescription']
538        except KeyError:
539            raise service_error(server_error.req, "Badly formed request")
540
541        auth_attr = req['allocID']['fedid']
542        aid = "%s" % auth_attr
543        attrs = req.get('fedAttr', [])
544        if not self.auth.check_attribute(fid, auth_attr):
545            raise service_error(service_error.access, "Access denied")
546        else:
547            # See if this is a replay of an earlier succeeded StartSegment -
548            # sometimes SSL kills 'em.  If so, replay the response rather than
549            # redoing the allocation.
550            self.state_lock.acquire()
551            retval = self.state[aid].get('started', None)
552            self.state_lock.release()
553            if retval:
554                self.log.warning("Duplicate StartSegment for %s: " % aid + \
555                        "replaying response")
556                return retval
557
558        certfile = "%s/%s.pem" % (self.certdir, aid)
559
560        if topref:
561            topo = topdl.Topology(**topref)
562        else:
563            raise service_error(service_error.req, 
564                    "Request missing segmentdescription'")
565
566        connInfo = req.get('connection', [])
567
568        cap, src, dest, vlans = self.extract_parameters(topo)
569
570        for a in attrs:
571            if a['attribute'] == 'experiment_name':
572                ename = a['value']
573                break
574        else: ename = aid
575       
576        repo, alloc_log = self.initialize_experiment_info(aid, ename)
577
578        if not repo:
579            raise service_error(service_error.internal, 
580                    "Can't find creation user for %s" % aid)
581
582        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
583                log=alloc_log)
584
585        self.export_store_info(certfile, vlan_no, connInfo)
586
587
588        if gri:
589            return self.finalize_experiment(topo, vlan_no, gri, aid, 
590                    req['allocID'])
591        elif err:
592            raise service_error(service_error.federant,
593                    "Swapin failed: %s" % err)
594        else:
595            raise service_error(service_error.federant, "Swapin failed")
596
597    def TerminateSegment(self, req, fid):
598        try:
599            req = req['TerminateSegmentRequestBody']
600        except KeyError:
601            raise service_error(server_error.req, "Badly formed request")
602
603        auth_attr = req['allocID']['fedid']
604        aid = "%s" % auth_attr
605
606        self.log.debug("Terminate request for %s" %aid)
607        attrs = req.get('fedAttr', [])
608        if not self.auth.check_attribute(fid, auth_attr):
609            raise service_error(service_error.access, "Access denied")
610
611        self.state_lock.acquire()
612        if self.state.has_key(aid):
613            gri = self.state[aid].get('gri', None)
614            user = self.state[aid].get('user', None)
615        else:
616            gri = None
617            user = None
618        self.state_lock.release()
619        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
620
621        if not gri:
622            raise service_error(service_error.internal, 
623                    "Can't find gri for %s" % aid)
624
625        if not user:
626            raise service_error(service_error.internal, 
627                    "Can't find creation user for %s" % aid)
628   
629        self.log.debug("Stop segment for GRI: %s" %gri)
630        self.stop_segment(user, gri)
631        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.