source: fedd/federation/dragon_access.py @ 709306c

axis_examplecompt_changesinfo-ops
Last change on this file since 709306c was e83f2f2, checked in by Ted Faber <faber@…>, 14 years ago

Move proofs around. Lots of changes, including fault handling.

  • Property mode set to 100644
File size: 16.5 KB
RevLine 
[23dec62]1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
[ab847bc]11from threading import Thread, Lock
[23dec62]12from subprocess import Popen, call, PIPE, STDOUT
[6abed7b]13from access import access_base
[d03c991]14from legacy_access import legacy_access
[23dec62]15
16from util import *
17from allocate_project import allocate_project_local, allocate_project_remote
18from fedid import fedid, generate_fedid
[d03c991]19from authorizer import authorizer, abac_authorizer
[23dec62]20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29import proxy_emulab_segment
30import local_emulab_segment
31
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
[d03c991]40class access(access_base, legacy_access):
[23dec62]41    """
42    The implementation of access control based on mapping users to projects.
43
44    Users can be mapped to existing projects or have projects created
45    dynamically.  This implements both direct requests and proxies.
46    """
47
48    def __init__(self, config=None, auth=None):
49        """
50        Initializer.  Pulls parameters out of the ConfigParser's access section.
51        """
[6abed7b]52        access_base.__init__(self, config, auth)
[23dec62]53
54        self.cli_dir = config.get("access", "cli_dir")
55        self.axis2_home = config.get("access", "axis2_home")
56        self.idc_url = config.get("access", "idc")
[36fec1b]57        self.domain = config.get("access", "domain")
[6f82229]58        self.duration = config.getint("access", "duration", 120)
[23dec62]59
60        self.access = { }
61        if not (self.cli_dir and self.axis2_home and self.idc_url):
62            self.log.error("Must specify all of cli_dir, axis2_home, " +\
63                    "idc in the [access] section of the configuration")
64
[d03c991]65        # authorization information
66        self.auth_type = config.get('access', 'auth_type') \
67                or 'legacy'
68        self.auth_dir = config.get('access', 'auth_dir')
69        accessdb = config.get("access", "accessdb")
70        # initialize the authorization system
71        if self.auth_type == 'legacy':
72            self.access = { }
73            if accessdb:
74                self.legacy_read_access(accessdb, self.make_repo)
75            # Add the ownership attributes to the authorizer.  Note that the
76            # indices of the allocation dict are strings, but the attributes are
77            # fedids, so there is a conversion.
78            self.state_lock.acquire()
79            for k in self.state.keys():
80                for o in self.state[k].get('owners', []):
81                    self.auth.set_attribute(o, fedid(hexstr=k))
82                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
83            self.state_lock.release()
84            self.lookup_access = self.legacy_lookup_access_base
85        elif self.auth_type == 'abac':
86            self.auth = abac_authorizer(load=self.auth_dir)
87            self.access = [ ]
88            if accessdb:
89                self.read_access(accessdb, self.make_repo)
90        else:
91            raise service_error(service_error.internal, 
92                    "Unknown auth_type: %s" % self.auth_type)
[23dec62]93
[7a011e9]94        self.call_GetValue= service_caller('GetValue')
95        self.call_SetValue= service_caller('SetValue')
96
[23dec62]97        self.soap_services = {\
98            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
99            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
100            'StartSegment': soap_handler("StartSegment", self.StartSegment),
101            'TerminateSegment': soap_handler("TerminateSegment", 
102                self.TerminateSegment),
103            }
104        self.xmlrpc_services =  {\
105            'RequestAccess': xmlrpc_handler('RequestAccess',
106                self.RequestAccess),
107            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
108                self.ReleaseAccess),
109            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
110            'TerminateSegment': xmlrpc_handler('TerminateSegment',
111                self.TerminateSegment),
112            }
113
[6abed7b]114    @staticmethod
115    def make_repo(s):
116        """
117        Get the repo directory from an access line.  This is removing the ()
118        from the string.
119        """
120        rv = s.strip()
121        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
122        else: raise self.parse_error("Repo should be in parens");
[23dec62]123
[9973d57]124    # RequestAccess and ReleaseAccess come from the base class
[23dec62]125
126    def extract_parameters(self, top):
127        """
128        DRAGON currently supports a fixed capacity link between two endpoints.
129        Those endpoints may specify a VPN (or list or range) to use.  This
130        extracts the DRAGON endpoints and vpn preferences from the segments (as
131        attributes) and the capacity of the connection as given by the
132        substrate.  The two endpoints VLAN choices are intersected to get set
133        of VLANs that are acceptable (no VLAN requiremnets means any is
134        acceptable).
135        """
136        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
137
138        if len(segments) != 2 or len(top.substrates) != 1:
139            raise service_error(service_error.req,
140                    "Requests to DRAGON must have exactlty two segments " +\
141                            "and one substrate")
142
143        ends = [ ]
144        for s in segments:
145            ep = s.get_attribute('dragon_endpoint')
146            if not ep:
147                raise service_error(service_error.req, 
[69692a9]148                        "Missing DRAGON endpoint for %s" % s.id)
[23dec62]149            v = s.get_attribute('vlans')
150            vlans = None
151            vset = set()
152            # the vlans can be a single integer, a comma separated list or a
153            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
154            # 100,300-305,400
155            if v:
156                if v.count(",") > 0 :
157                    vl = [ x.strip() for x in v.split(",") ]
158                else:
159                    vl = [ v ]
160                for v in vl:
161                    try:
162                        if v.count("-")> 0:
163                            f, t = v.split("-", 1)
164                            for i in range(int(f), int(t)+1):
165                                vset.add(i)
166                        else:
167                            vset.add(int(v))
168                    except ValueError:
169                        raise service_error(service_error.req, 
170                                "VLAN tags must be integers (%s)" %s.name)
171            if len(vset) > 0:
172                if vlans: vlans &= vest
173                else: vlans = vset
174            ends.append(ep)
175
176
177        sub = top.substrates[0]
178        if sub.capacity:
179            cap = int(sub.capacity.rate / 1000.0)
180            if cap < 1: cap = 1
181        else:
182            cap = 100
183
[36fec1b]184
185        # DRAGON's command line tool barfs if the source (ends[0]) is not in
186        # the domain controlled by the IDC.  This code ensures this situation.
187        if self.domain and not ends[0].endswith(self.domain):
188            hold = ends[0]
189            for i, e in enumerate(ends):
190                if i == 0: continue
191                if e.endswith(self.domain):
192                    ends[0] = e
193                    ends[i] = hold
194                    break
195            else:
196                raise service_error(service_error.req, 
197                        "No endpoint in my domain")
198
199
[23dec62]200        return cap, ends[0], ends[1], vlans
201
[6abed7b]202    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
[23dec62]203
[6abed7b]204        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
205        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
206
207        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
208            'createReservation', '-repo',  repo , '-url', self.idc_url,
209            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
210            '-vlan', "%s" % v, '-desc', 'fedd created connection',
211            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
212            '-end', "%d" % int(end)]
213        log.debug("[start_segment]: %s" % " ".join(cmd))
214        if not self.create_debug: 
215            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
216                    close_fds=True)
217            for line in p.stdout:
218                m = status_re.match(line)
219                if m:
220                    status = m.group(1)
221                    continue
222                m = gri_re.match(line)
223                if m:
224                    gri = m.group(1)
225                    continue
226            rv = p.wait()
227        else: 
228            rv = 0
229            status = 'ACCEPTED'
230            gri = 'debug_gri'
231
232        return (rv, status, gri)
233
234    def oscars_query_vpn(self, repo, gri, v, log):
[23dec62]235        """
[6abed7b]236        Call the oscars query command from the command line and parse out the
237        data to see if the current request succeeded.  This is a lot of fiddly
238        code to do  a pretty simple thing.
[23dec62]239        """
240        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
241        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
242        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
243        path_re = re.compile("Path:")
244
[6abed7b]245        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
246            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
247        log.debug("[start_segment]: %s" % " ".join(cmd))
248        if not self.create_debug:
249            # Really do the query
250            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
251                    close_fds=True)
252            in_path = False
253            vpn1 = None
254            vpn2 = None
255            src = None
256            dest = None
257            for line in p.stdout:
258                if not in_path:
259                    m = status_re.match(line)
260                    if m: 
261                        status = m.group(1)
262                        continue
263                    m = source_re.match(line)
264                    if m:
265                        src = m.group(1)
266                        continue
267                    m = dest_re.match(line)
268                    if m:
269                        dest = m.group(1)
270                        continue
271                    m = path_re.match(line)
272                    if m:
273                        in_path = True
274                        if src and dest:
275                            vpn1_re = re.compile(
276                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
277                                            src.replace("*", "\*"))
278                            vpn2_re = re.compile(
279                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
280                                            dest.replace("*", "\*"))
281                        else:
282                            raise service_error(service_error.internal, 
283                                    "Strange output from query")
284                else:
285                    m = vpn1_re.match(line)
286                    if m:
287                        vpn1 = m.group(1)
288                        continue
289                    m = vpn2_re.match(line)
290                    if m:
291                        vpn2 = m.group(1)
292                        continue
293            rv = p.wait()
294            # Make sure that OSCARS did what we expected.
295            if vpn1 == vpn2:
296                if v is not None:
297                    if int(vpn1) == v:
298                        vlan_no = int(v)
299                    else:
300                        raise service_error(service_error.federant, 
301                                "Unexpected vlan assignment")
302                else:
303                    vlan_no = int(v or 0)
304            else:
305                raise service_error(service_error.internal,
306                        "Different VPNs on DRAGON ends")
307            log.debug("Status: %s" % status or "none")
308        else:
309            rv = 0
310            status = 'ACTIVE'
311            vlan_no = int(v or 1)
312
313        return (rv, status, vlan_no)
314
315
316
317    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
318            log=None):
319        """
320        Do the actual work of creating the dragon connecton.
321        """
322        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
323        if not log: log = self.log
324
[23dec62]325        if not vpns:
326            vpns = [ None, None, None, None, None]
327
328        if not start:
329            start = time.time()
330        if not end:
[6f82229]331            end = start + self.duration *60
[23dec62]332
333
334        status = None
335        gri = None
336        rv = None
337        vlan_no = None
338        for v in vpns:
[6abed7b]339            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
340                    start, end, log)
[23dec62]341            # Reservation in progress.  Poll the IDC until we know the outcome
[6abed7b]342            while status in waiting_states:
343                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
344                if status in waiting_states:
[ab33158]345                    time.sleep(45)
[23dec62]346            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
347                break
348
349        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
[6abed7b]350            self.log.debug("made reservation %s %s" % (gri, vlan_no))
[23dec62]351            return gri, vlan_no
352        else:
[69692a9]353            raise service_error(service_error.federant, 
354                    "Cannot make reservation")
[23dec62]355
356    def stop_segment(self, repo, gri, log=None):
[6abed7b]357        """
358        Terminate the reservation.
359        """
[23dec62]360        if not log: log = self.log
361        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
362            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
363
364
365        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
366        if not self.create_debug:
367            try:
368                f = open("/dev/null", "w")
369                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
370                f.close()
[d3c8759]371            except EnvironmentError, e:
[23dec62]372                raise service_error(service_error.internal, 
373                        "Failed to open /dev/null: %s" % e)
374
[7a011e9]375    def export_store_info(self, cf, vlan, connInfo):
376        """
377        For the export requests in the connection info, install the peer names
378        at the experiment controller via SetValue calls.
379        """
380
381        for c in connInfo:
382            for p in [ p for p in c.get('parameter', []) \
383                    if p.get('type', '') == 'output']:
384
385                if p.get('name', '') != 'vlan_id':
386                    self.log.error("Unknown export parameter: %s" % \
387                            p.get('name'))
388                    continue
389
390                k = p.get('key', None)
391                surl = p.get('store', None)
392                if surl and k:
393                    value = "%s" % vlan
394                    req = { 'name': k, 'value': value }
395                    print "calling SetValue %s %s" % (surl, req)
396                    self.call_SetValue(surl, req, cf)
397                else:
398                    self.log.error("Bad export request: %s" % p)
399
[6abed7b]400    def initialize_experiment_info(self, aid, ename):
401        repo = None
402        self.state_lock.acquire()
403        if aid in self.state:
404            repo = self.state[aid].get('user', None)
405            self.state[aid]['log'] = [ ]
406            # Create a logger that logs to the experiment's state object as
407            # well as to the main log file.
408            alloc_log = logging.getLogger('fedd.access.%s' % ename)
409            h = logging.StreamHandler(
410                    list_log.list_log(self.state[aid]['log']))
411            # XXX: there should be a global one of these rather than
412            # repeating the code.
413            h.setFormatter(logging.Formatter(
414                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
415            alloc_log.addHandler(h)
416            self.write_state()
417        self.state_lock.release()
418        return (repo, alloc_log)
419
[e83f2f2]420    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id, proof):
[7a011e9]421        """
[6abed7b]422        Place the relevant information in the global state block, and prepare
423        the response.
[7a011e9]424        """
[6abed7b]425        rtopo = topo.clone()
426        for s in rtopo.substrates:
427            s.set_attribute('vlan', vlan_no)
428            s.set_attribute('gri', gri)
[7a011e9]429
[6abed7b]430        # Grab the log (this is some anal locking, but better safe than
431        # sorry)
432        self.state_lock.acquire()
433        self.state[aid]['gri'] = gri
434        logv = "".join(self.state[aid]['log'])
435        # It's possible that the StartSegment call gets retried (!).
436        # if the 'started' key is in the allocation, we'll return it rather
437        # than redo the setup.
438        self.state[aid]['started'] = { 
439                'allocID': alloc_id,
440                'allocationLog': logv,
441                'segmentdescription': {
442                    'topdldescription': rtopo.to_dict()
443                    },
[e83f2f2]444                'proof': proof.to_dict(),
[6abed7b]445                }
446        retval = copy.deepcopy(self.state[aid]['started'])
447        self.write_state()
448        self.state_lock.release()
[7a011e9]449
[6abed7b]450        return retval
[7a011e9]451
[23dec62]452    def StartSegment(self, req, fid):
453        err = None  # Any service_error generated after tmpdir is created
454        rv = None   # Return value from segment creation
455
456        try:
457            req = req['StartSegmentRequestBody']
[6abed7b]458            topref = req['segmentdescription']['topdldescription']
[23dec62]459        except KeyError:
460            raise service_error(server_error.req, "Badly formed request")
461
462        auth_attr = req['allocID']['fedid']
463        aid = "%s" % auth_attr
464        attrs = req.get('fedAttr', [])
[e83f2f2]465        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
466                with_proof=True)
467        if not access_ok:
[23dec62]468            raise service_error(service_error.access, "Access denied")
[7a011e9]469        else:
[1627c3d]470            # See if this is a replay of an earlier succeeded StartSegment -
471            # sometimes SSL kills 'em.  If so, replay the response rather than
472            # redoing the allocation.
473            self.state_lock.acquire()
[ab847bc]474            retval = self.state[aid].get('started', None)
[1627c3d]475            self.state_lock.release()
476            if retval:
477                self.log.warning("Duplicate StartSegment for %s: " % aid + \
478                        "replaying response")
479                return retval
480
481        certfile = "%s/%s.pem" % (self.certdir, aid)
[23dec62]482
[6abed7b]483        if topref:
484            topo = topdl.Topology(**topref)
[23dec62]485        else:
486            raise service_error(service_error.req, 
487                    "Request missing segmentdescription'")
488
[7a011e9]489        connInfo = req.get('connection', [])
490
[23dec62]491        cap, src, dest, vlans = self.extract_parameters(topo)
492
493        for a in attrs:
494            if a['attribute'] == 'experiment_name':
495                ename = a['value']
[6abed7b]496                break
497        else: ename = aid
498       
499        repo, alloc_log = self.initialize_experiment_info(aid, ename)
[23dec62]500
501        if not repo:
502            raise service_error(service_error.internal, 
[6abed7b]503                    "Can't find creation user for %s" % aid)
[23dec62]504
[68bb551]505        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
506                log=alloc_log)
[23dec62]507
[7a011e9]508        self.export_store_info(certfile, vlan_no, connInfo)
509
[ab847bc]510
[23dec62]511        if gri:
[6abed7b]512            return self.finalize_experiment(topo, vlan_no, gri, aid, 
[e83f2f2]513                    req['allocID'], proof)
[23dec62]514        elif err:
515            raise service_error(service_error.federant,
516                    "Swapin failed: %s" % err)
517        else:
518            raise service_error(service_error.federant, "Swapin failed")
519
520    def TerminateSegment(self, req, fid):
521        try:
522            req = req['TerminateSegmentRequestBody']
523        except KeyError:
524            raise service_error(server_error.req, "Badly formed request")
525
526        auth_attr = req['allocID']['fedid']
527        aid = "%s" % auth_attr
[ab33158]528
529        self.log.debug("Terminate request for %s" %aid)
[23dec62]530        attrs = req.get('fedAttr', [])
[e83f2f2]531        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
532                with_proof=True)
533        if not access_ok:
[23dec62]534            raise service_error(service_error.access, "Access denied")
535
536        self.state_lock.acquire()
537        if self.state.has_key(aid):
538            gri = self.state[aid].get('gri', None)
539            user = self.state[aid].get('user', None)
540        else:
541            gri = None
542            user = None
543        self.state_lock.release()
[ab33158]544        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
[23dec62]545
546        if not gri:
547            raise service_error(service_error.internal, 
548                    "Can't find gri for %s" % aid)
549
550        if not user:
551            raise service_error(service_error.internal, 
552                    "Can't find creation user for %s" % aid)
[ab33158]553   
554        self.log.debug("Stop segment for GRI: %s" %gri)
[23dec62]555        self.stop_segment(user, gri)
[e83f2f2]556        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.