source: fedd/federation/dragon_access.py @ ac15159

axis_examplecompt_changesinfo-ops
Last change on this file since ac15159 was 61a634d, checked in by Ted Faber <faber@…>, 14 years ago

Whoops. Only read the access DB once

  • Property mode set to 100644
File size: 18.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13from access import access_base
14from legacy_access import legacy_access
15
16from util import *
17from allocate_project import allocate_project_local, allocate_project_remote
18from fedid import fedid, generate_fedid
19from authorizer import authorizer, abac_authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_emulab_segment
30import local_emulab_segment
31
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
40class access(access_base, legacy_access):
41    """
42    The implementation of access control based on mapping users to projects.
43
44    Users can be mapped to existing projects or have projects created
45    dynamically.  This implements both direct requests and proxies.
46    """
47
48    def __init__(self, config=None, auth=None):
49        """
50        Initializer.  Pulls parameters out of the ConfigParser's access section.
51        """
52        access_base.__init__(self, config, auth)
53
54        self.cli_dir = config.get("access", "cli_dir")
55        self.axis2_home = config.get("access", "axis2_home")
56        self.idc_url = config.get("access", "idc")
57        self.domain = config.get("access", "domain")
58        self.duration = config.getint("access", "duration", 120)
59
60        self.access = { }
61        if not (self.cli_dir and self.axis2_home and self.idc_url):
62            self.log.error("Must specify all of cli_dir, axis2_home, " +\
63                    "idc in the [access] section of the configuration")
64
65        # authorization information
66        self.auth_type = config.get('access', 'auth_type') \
67                or 'legacy'
68        self.auth_dir = config.get('access', 'auth_dir')
69        accessdb = config.get("access", "accessdb")
70        # initialize the authorization system
71        if self.auth_type == 'legacy':
72            self.access = { }
73            if accessdb:
74                self.legacy_read_access(accessdb, self.make_repo)
75            # Add the ownership attributes to the authorizer.  Note that the
76            # indices of the allocation dict are strings, but the attributes are
77            # fedids, so there is a conversion.
78            self.state_lock.acquire()
79            for k in self.state.keys():
80                for o in self.state[k].get('owners', []):
81                    self.auth.set_attribute(o, fedid(hexstr=k))
82                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
83            self.state_lock.release()
84            self.lookup_access = self.legacy_lookup_access_base
85        elif self.auth_type == 'abac':
86            self.auth = abac_authorizer(load=self.auth_dir)
87            self.access = [ ]
88            if accessdb:
89                self.read_access(accessdb, self.make_repo)
90        else:
91            raise service_error(service_error.internal, 
92                    "Unknown auth_type: %s" % self.auth_type)
93
94        self.call_GetValue= service_caller('GetValue')
95        self.call_SetValue= service_caller('SetValue')
96
97        self.soap_services = {\
98            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
99            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
100            'StartSegment': soap_handler("StartSegment", self.StartSegment),
101            'TerminateSegment': soap_handler("TerminateSegment", 
102                self.TerminateSegment),
103            }
104        self.xmlrpc_services =  {\
105            'RequestAccess': xmlrpc_handler('RequestAccess',
106                self.RequestAccess),
107            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
108                self.ReleaseAccess),
109            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
110            'TerminateSegment': xmlrpc_handler('TerminateSegment',
111                self.TerminateSegment),
112            }
113
114    @staticmethod
115    def make_repo(s):
116        """
117        Get the repo directory from an access line.  This is removing the ()
118        from the string.
119        """
120        rv = s.strip()
121        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
122        else: raise self.parse_error("Repo should be in parens");
123
124    def RequestAccess(self, req, fid):
125        """
126        Handle the access request.
127
128        Parse out the fields and make the allocations or rejections if for us,
129        otherwise, assuming we're willing to proxy, proxy the request out.
130        """
131
132        # The dance to get into the request body
133        if req.has_key('RequestAccessRequestBody'):
134            req = req['RequestAccessRequestBody']
135        else:
136            raise service_error(service_error.req, "No request!?")
137
138        if req.has_key('destinationTestbed'):
139            dt = unpack_id(req['destinationTestbed'])
140
141        # Request for this fedd
142        found, match, owners = self.lookup_access(req, fid)
143        # keep track of what's been added
144        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
145        aid = unicode(allocID)
146
147        self.state_lock.acquire()
148        self.state[aid] = { }
149        self.state[aid]['user'] = found
150        self.state[aid]['owners'] = owners
151        self.write_state()
152        self.state_lock.release()
153        self.auth.set_attribute(fid, allocID)
154        self.auth.set_attribute(allocID, allocID)
155        self.auth.save()
156
157        try:
158            f = open("%s/%s.pem" % (self.certdir, aid), "w")
159            print >>f, alloc_cert
160            f.close()
161        except EnvironmentError, e:
162            raise service_error(service_error.internal, 
163                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
164        return { 'allocID': { 'fedid': allocID } }
165
166    def ReleaseAccess(self, req, fid):
167        # The dance to get into the request body
168        if req.has_key('ReleaseAccessRequestBody'):
169            req = req['ReleaseAccessRequestBody']
170        else:
171            raise service_error(service_error.req, "No request!?")
172
173        try:
174            if req['allocID'].has_key('localname'):
175                auth_attr = aid = req['allocID']['localname']
176            elif req['allocID'].has_key('fedid'):
177                aid = unicode(req['allocID']['fedid'])
178                auth_attr = req['allocID']['fedid']
179            else:
180                raise service_error(service_error.req,
181                        "Only localnames and fedids are understood")
182        except KeyError:
183            raise service_error(service_error.req, "Badly formed request")
184
185        self.log.debug("[access] deallocation requested for %s", aid)
186        if not self.auth.check_attribute(fid, auth_attr):
187            self.log.debug("[access] deallocation denied for %s", aid)
188            raise service_error(service_error.access, "Access Denied")
189
190        self.state_lock.acquire()
191        if self.state.has_key(aid):
192            self.log.debug("Found allocation for %s" %aid)
193            del self.state[aid]
194            self.write_state()
195            self.state_lock.release()
196            # And remove the access cert
197            cf = "%s/%s.pem" % (self.certdir, aid)
198            self.log.debug("Removing %s" % cf)
199            os.remove(cf)
200            return { 'allocID': req['allocID'] } 
201        else:
202            self.state_lock.release()
203            raise service_error(service_error.req, "No such allocation")
204
205    def extract_parameters(self, top):
206        """
207        DRAGON currently supports a fixed capacity link between two endpoints.
208        Those endpoints may specify a VPN (or list or range) to use.  This
209        extracts the DRAGON endpoints and vpn preferences from the segments (as
210        attributes) and the capacity of the connection as given by the
211        substrate.  The two endpoints VLAN choices are intersected to get set
212        of VLANs that are acceptable (no VLAN requiremnets means any is
213        acceptable).
214        """
215        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
216
217        if len(segments) != 2 or len(top.substrates) != 1:
218            raise service_error(service_error.req,
219                    "Requests to DRAGON must have exactlty two segments " +\
220                            "and one substrate")
221
222        ends = [ ]
223        for s in segments:
224            ep = s.get_attribute('dragon_endpoint')
225            if not ep:
226                raise service_error(service_error.req, 
227                        "Missing DRAGON endpoint for %s" % s.id)
228            v = s.get_attribute('vlans')
229            vlans = None
230            vset = set()
231            # the vlans can be a single integer, a comma separated list or a
232            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
233            # 100,300-305,400
234            if v:
235                if v.count(",") > 0 :
236                    vl = [ x.strip() for x in v.split(",") ]
237                else:
238                    vl = [ v ]
239                for v in vl:
240                    try:
241                        if v.count("-")> 0:
242                            f, t = v.split("-", 1)
243                            for i in range(int(f), int(t)+1):
244                                vset.add(i)
245                        else:
246                            vset.add(int(v))
247                    except ValueError:
248                        raise service_error(service_error.req, 
249                                "VLAN tags must be integers (%s)" %s.name)
250            if len(vset) > 0:
251                if vlans: vlans &= vest
252                else: vlans = vset
253            ends.append(ep)
254
255
256        sub = top.substrates[0]
257        if sub.capacity:
258            cap = int(sub.capacity.rate / 1000.0)
259            if cap < 1: cap = 1
260        else:
261            cap = 100
262
263
264        # DRAGON's command line tool barfs if the source (ends[0]) is not in
265        # the domain controlled by the IDC.  This code ensures this situation.
266        if self.domain and not ends[0].endswith(self.domain):
267            hold = ends[0]
268            for i, e in enumerate(ends):
269                if i == 0: continue
270                if e.endswith(self.domain):
271                    ends[0] = e
272                    ends[i] = hold
273                    break
274            else:
275                raise service_error(service_error.req, 
276                        "No endpoint in my domain")
277
278
279        return cap, ends[0], ends[1], vlans
280
281    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
282
283        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
284        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
285
286        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
287            'createReservation', '-repo',  repo , '-url', self.idc_url,
288            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
289            '-vlan', "%s" % v, '-desc', 'fedd created connection',
290            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
291            '-end', "%d" % int(end)]
292        log.debug("[start_segment]: %s" % " ".join(cmd))
293        if not self.create_debug: 
294            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
295                    close_fds=True)
296            for line in p.stdout:
297                m = status_re.match(line)
298                if m:
299                    status = m.group(1)
300                    continue
301                m = gri_re.match(line)
302                if m:
303                    gri = m.group(1)
304                    continue
305            rv = p.wait()
306        else: 
307            rv = 0
308            status = 'ACCEPTED'
309            gri = 'debug_gri'
310
311        return (rv, status, gri)
312
313    def oscars_query_vpn(self, repo, gri, v, log):
314        """
315        Call the oscars query command from the command line and parse out the
316        data to see if the current request succeeded.  This is a lot of fiddly
317        code to do  a pretty simple thing.
318        """
319        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
320        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
321        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
322        path_re = re.compile("Path:")
323
324        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
325            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
326        log.debug("[start_segment]: %s" % " ".join(cmd))
327        if not self.create_debug:
328            # Really do the query
329            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
330                    close_fds=True)
331            in_path = False
332            vpn1 = None
333            vpn2 = None
334            src = None
335            dest = None
336            for line in p.stdout:
337                if not in_path:
338                    m = status_re.match(line)
339                    if m: 
340                        status = m.group(1)
341                        continue
342                    m = source_re.match(line)
343                    if m:
344                        src = m.group(1)
345                        continue
346                    m = dest_re.match(line)
347                    if m:
348                        dest = m.group(1)
349                        continue
350                    m = path_re.match(line)
351                    if m:
352                        in_path = True
353                        if src and dest:
354                            vpn1_re = re.compile(
355                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
356                                            src.replace("*", "\*"))
357                            vpn2_re = re.compile(
358                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
359                                            dest.replace("*", "\*"))
360                        else:
361                            raise service_error(service_error.internal, 
362                                    "Strange output from query")
363                else:
364                    m = vpn1_re.match(line)
365                    if m:
366                        vpn1 = m.group(1)
367                        continue
368                    m = vpn2_re.match(line)
369                    if m:
370                        vpn2 = m.group(1)
371                        continue
372            rv = p.wait()
373            # Make sure that OSCARS did what we expected.
374            if vpn1 == vpn2:
375                if v is not None:
376                    if int(vpn1) == v:
377                        vlan_no = int(v)
378                    else:
379                        raise service_error(service_error.federant, 
380                                "Unexpected vlan assignment")
381                else:
382                    vlan_no = int(v or 0)
383            else:
384                raise service_error(service_error.internal,
385                        "Different VPNs on DRAGON ends")
386            log.debug("Status: %s" % status or "none")
387        else:
388            rv = 0
389            status = 'ACTIVE'
390            vlan_no = int(v or 1)
391
392        return (rv, status, vlan_no)
393
394
395
396    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
397            log=None):
398        """
399        Do the actual work of creating the dragon connecton.
400        """
401        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
402        if not log: log = self.log
403
404        if not vpns:
405            vpns = [ None, None, None, None, None]
406
407        if not start:
408            start = time.time()
409        if not end:
410            end = start + self.duration *60
411
412
413        status = None
414        gri = None
415        rv = None
416        vlan_no = None
417        for v in vpns:
418            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
419                    start, end, log)
420            # Reservation in progress.  Poll the IDC until we know the outcome
421            while status in waiting_states:
422                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
423                if status in waiting_states:
424                    time.sleep(45)
425            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
426                break
427
428        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
429            self.log.debug("made reservation %s %s" % (gri, vlan_no))
430            return gri, vlan_no
431        else:
432            raise service_error(service_error.federant, 
433                    "Cannot make reservation")
434
435    def stop_segment(self, repo, gri, log=None):
436        """
437        Terminate the reservation.
438        """
439        if not log: log = self.log
440        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
441            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
442
443
444        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
445        if not self.create_debug:
446            try:
447                f = open("/dev/null", "w")
448                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
449                f.close()
450            except EnvironmentError, e:
451                raise service_error(service_error.internal, 
452                        "Failed to open /dev/null: %s" % e)
453
454    def export_store_info(self, cf, vlan, connInfo):
455        """
456        For the export requests in the connection info, install the peer names
457        at the experiment controller via SetValue calls.
458        """
459
460        for c in connInfo:
461            for p in [ p for p in c.get('parameter', []) \
462                    if p.get('type', '') == 'output']:
463
464                if p.get('name', '') != 'vlan_id':
465                    self.log.error("Unknown export parameter: %s" % \
466                            p.get('name'))
467                    continue
468
469                k = p.get('key', None)
470                surl = p.get('store', None)
471                if surl and k:
472                    value = "%s" % vlan
473                    req = { 'name': k, 'value': value }
474                    print "calling SetValue %s %s" % (surl, req)
475                    self.call_SetValue(surl, req, cf)
476                else:
477                    self.log.error("Bad export request: %s" % p)
478
479    def initialize_experiment_info(self, aid, ename):
480        repo = None
481        self.state_lock.acquire()
482        if aid in self.state:
483            repo = self.state[aid].get('user', None)
484            self.state[aid]['log'] = [ ]
485            # Create a logger that logs to the experiment's state object as
486            # well as to the main log file.
487            alloc_log = logging.getLogger('fedd.access.%s' % ename)
488            h = logging.StreamHandler(
489                    list_log.list_log(self.state[aid]['log']))
490            # XXX: there should be a global one of these rather than
491            # repeating the code.
492            h.setFormatter(logging.Formatter(
493                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
494            alloc_log.addHandler(h)
495            self.write_state()
496        self.state_lock.release()
497        return (repo, alloc_log)
498
499    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id):
500        """
501        Place the relevant information in the global state block, and prepare
502        the response.
503        """
504        rtopo = topo.clone()
505        for s in rtopo.substrates:
506            s.set_attribute('vlan', vlan_no)
507            s.set_attribute('gri', gri)
508
509        # Grab the log (this is some anal locking, but better safe than
510        # sorry)
511        self.state_lock.acquire()
512        self.state[aid]['gri'] = gri
513        logv = "".join(self.state[aid]['log'])
514        # It's possible that the StartSegment call gets retried (!).
515        # if the 'started' key is in the allocation, we'll return it rather
516        # than redo the setup.
517        self.state[aid]['started'] = { 
518                'allocID': alloc_id,
519                'allocationLog': logv,
520                'segmentdescription': {
521                    'topdldescription': rtopo.to_dict()
522                    },
523                }
524        retval = copy.deepcopy(self.state[aid]['started'])
525        self.write_state()
526        self.state_lock.release()
527
528        return retval
529
530    def StartSegment(self, req, fid):
531        err = None  # Any service_error generated after tmpdir is created
532        rv = None   # Return value from segment creation
533
534        try:
535            req = req['StartSegmentRequestBody']
536            topref = req['segmentdescription']['topdldescription']
537        except KeyError:
538            raise service_error(server_error.req, "Badly formed request")
539
540        auth_attr = req['allocID']['fedid']
541        aid = "%s" % auth_attr
542        attrs = req.get('fedAttr', [])
543        if not self.auth.check_attribute(fid, auth_attr):
544            raise service_error(service_error.access, "Access denied")
545        else:
546            # See if this is a replay of an earlier succeeded StartSegment -
547            # sometimes SSL kills 'em.  If so, replay the response rather than
548            # redoing the allocation.
549            self.state_lock.acquire()
550            retval = self.state[aid].get('started', None)
551            self.state_lock.release()
552            if retval:
553                self.log.warning("Duplicate StartSegment for %s: " % aid + \
554                        "replaying response")
555                return retval
556
557        certfile = "%s/%s.pem" % (self.certdir, aid)
558
559        if topref:
560            topo = topdl.Topology(**topref)
561        else:
562            raise service_error(service_error.req, 
563                    "Request missing segmentdescription'")
564
565        connInfo = req.get('connection', [])
566
567        cap, src, dest, vlans = self.extract_parameters(topo)
568
569        for a in attrs:
570            if a['attribute'] == 'experiment_name':
571                ename = a['value']
572                break
573        else: ename = aid
574       
575        repo, alloc_log = self.initialize_experiment_info(aid, ename)
576
577        if not repo:
578            raise service_error(service_error.internal, 
579                    "Can't find creation user for %s" % aid)
580
581        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
582                log=alloc_log)
583
584        self.export_store_info(certfile, vlan_no, connInfo)
585
586
587        if gri:
588            return self.finalize_experiment(topo, vlan_no, gri, aid, 
589                    req['allocID'])
590        elif err:
591            raise service_error(service_error.federant,
592                    "Swapin failed: %s" % err)
593        else:
594            raise service_error(service_error.federant, "Swapin failed")
595
596    def TerminateSegment(self, req, fid):
597        try:
598            req = req['TerminateSegmentRequestBody']
599        except KeyError:
600            raise service_error(server_error.req, "Badly formed request")
601
602        auth_attr = req['allocID']['fedid']
603        aid = "%s" % auth_attr
604
605        self.log.debug("Terminate request for %s" %aid)
606        attrs = req.get('fedAttr', [])
607        if not self.auth.check_attribute(fid, auth_attr):
608            raise service_error(service_error.access, "Access denied")
609
610        self.state_lock.acquire()
611        if self.state.has_key(aid):
612            gri = self.state[aid].get('gri', None)
613            user = self.state[aid].get('user', None)
614        else:
615            gri = None
616            user = None
617        self.state_lock.release()
618        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
619
620        if not gri:
621            raise service_error(service_error.internal, 
622                    "Can't find gri for %s" % aid)
623
624        if not user:
625            raise service_error(service_error.internal, 
626                    "Can't find creation user for %s" % aid)
627   
628        self.log.debug("Stop segment for GRI: %s" %gri)
629        self.stop_segment(user, gri)
630        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.