source: fedd/federation/dragon_access.py @ e5a595e

compt_changes
Last change on this file since e5a595e was 6bedbdba, checked in by Ted Faber <faber@…>, 13 years ago

Split topdl and fedid out to different packages. Add differential
installs

  • Property mode set to 100644
File size: 15.7 KB
RevLine 
[23dec62]1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
[ab847bc]11from threading import Thread, Lock
[23dec62]12from subprocess import Popen, call, PIPE, STDOUT
[6abed7b]13from access import access_base
[23dec62]14
15from util import *
[6bedbdba]16from deter import fedid, generate_fedid
[d03c991]17from authorizer import authorizer, abac_authorizer
[23dec62]18from service_error import service_error
19from remote_service import xmlrpc_handler, soap_handler, service_caller
20
21import httplib
22import tempfile
23from urlparse import urlparse
24
[6bedbdba]25from deter import topdl
[23dec62]26import list_log
27
28
29# Make log messages disappear if noone configures a fedd logger
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.access")
34fl.addHandler(nullHandler())
35
[ee950c2]36class access(access_base):
[23dec62]37    """
38    The implementation of access control based on mapping users to projects.
39
40    Users can be mapped to existing projects or have projects created
41    dynamically.  This implements both direct requests and proxies.
42    """
43
44    def __init__(self, config=None, auth=None):
45        """
46        Initializer.  Pulls parameters out of the ConfigParser's access section.
47        """
[6abed7b]48        access_base.__init__(self, config, auth)
[23dec62]49
50        self.cli_dir = config.get("access", "cli_dir")
51        self.axis2_home = config.get("access", "axis2_home")
52        self.idc_url = config.get("access", "idc")
[36fec1b]53        self.domain = config.get("access", "domain")
[6f82229]54        self.duration = config.getint("access", "duration", 120)
[23dec62]55
56        self.access = { }
57        if not (self.cli_dir and self.axis2_home and self.idc_url):
58            self.log.error("Must specify all of cli_dir, axis2_home, " +\
59                    "idc in the [access] section of the configuration")
60
[d03c991]61        # authorization information
62        self.auth_type = config.get('access', 'auth_type') \
[ee950c2]63                or 'abac'
[d03c991]64        self.auth_dir = config.get('access', 'auth_dir')
65        accessdb = config.get("access", "accessdb")
66        # initialize the authorization system
[ee950c2]67        if self.auth_type == 'abac':
[d03c991]68            self.auth = abac_authorizer(load=self.auth_dir)
69            self.access = [ ]
70            if accessdb:
71                self.read_access(accessdb, self.make_repo)
72        else:
73            raise service_error(service_error.internal, 
74                    "Unknown auth_type: %s" % self.auth_type)
[23dec62]75
[7a011e9]76        self.call_GetValue= service_caller('GetValue')
77        self.call_SetValue= service_caller('SetValue')
78
[23dec62]79        self.soap_services = {\
80            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
81            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
82            'StartSegment': soap_handler("StartSegment", self.StartSegment),
83            'TerminateSegment': soap_handler("TerminateSegment", 
84                self.TerminateSegment),
85            }
86        self.xmlrpc_services =  {\
87            'RequestAccess': xmlrpc_handler('RequestAccess',
88                self.RequestAccess),
89            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
90                self.ReleaseAccess),
91            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
92            'TerminateSegment': xmlrpc_handler('TerminateSegment',
93                self.TerminateSegment),
94            }
95
[6abed7b]96    @staticmethod
97    def make_repo(s):
98        """
99        Get the repo directory from an access line.  This is removing the ()
100        from the string.
101        """
102        rv = s.strip()
103        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
104        else: raise self.parse_error("Repo should be in parens");
[23dec62]105
[9973d57]106    # RequestAccess and ReleaseAccess come from the base class
[23dec62]107
108    def extract_parameters(self, top):
109        """
110        DRAGON currently supports a fixed capacity link between two endpoints.
111        Those endpoints may specify a VPN (or list or range) to use.  This
112        extracts the DRAGON endpoints and vpn preferences from the segments (as
113        attributes) and the capacity of the connection as given by the
114        substrate.  The two endpoints VLAN choices are intersected to get set
115        of VLANs that are acceptable (no VLAN requiremnets means any is
116        acceptable).
117        """
118        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
119
120        if len(segments) != 2 or len(top.substrates) != 1:
121            raise service_error(service_error.req,
122                    "Requests to DRAGON must have exactlty two segments " +\
123                            "and one substrate")
124
125        ends = [ ]
126        for s in segments:
127            ep = s.get_attribute('dragon_endpoint')
128            if not ep:
129                raise service_error(service_error.req, 
[69692a9]130                        "Missing DRAGON endpoint for %s" % s.id)
[23dec62]131            v = s.get_attribute('vlans')
132            vlans = None
133            vset = set()
134            # the vlans can be a single integer, a comma separated list or a
135            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
136            # 100,300-305,400
137            if v:
138                if v.count(",") > 0 :
139                    vl = [ x.strip() for x in v.split(",") ]
140                else:
141                    vl = [ v ]
142                for v in vl:
143                    try:
144                        if v.count("-")> 0:
145                            f, t = v.split("-", 1)
146                            for i in range(int(f), int(t)+1):
147                                vset.add(i)
148                        else:
149                            vset.add(int(v))
150                    except ValueError:
151                        raise service_error(service_error.req, 
152                                "VLAN tags must be integers (%s)" %s.name)
153            if len(vset) > 0:
154                if vlans: vlans &= vest
155                else: vlans = vset
156            ends.append(ep)
157
158
159        sub = top.substrates[0]
160        if sub.capacity:
161            cap = int(sub.capacity.rate / 1000.0)
162            if cap < 1: cap = 1
163        else:
164            cap = 100
165
[36fec1b]166
167        # DRAGON's command line tool barfs if the source (ends[0]) is not in
168        # the domain controlled by the IDC.  This code ensures this situation.
169        if self.domain and not ends[0].endswith(self.domain):
170            hold = ends[0]
171            for i, e in enumerate(ends):
172                if i == 0: continue
173                if e.endswith(self.domain):
174                    ends[0] = e
175                    ends[i] = hold
176                    break
177            else:
178                raise service_error(service_error.req, 
179                        "No endpoint in my domain")
180
181
[23dec62]182        return cap, ends[0], ends[1], vlans
183
[6abed7b]184    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
[23dec62]185
[6abed7b]186        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
187        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
188
189        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
190            'createReservation', '-repo',  repo , '-url', self.idc_url,
191            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
192            '-vlan', "%s" % v, '-desc', 'fedd created connection',
193            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
194            '-end', "%d" % int(end)]
195        log.debug("[start_segment]: %s" % " ".join(cmd))
196        if not self.create_debug: 
197            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
198                    close_fds=True)
199            for line in p.stdout:
200                m = status_re.match(line)
201                if m:
202                    status = m.group(1)
203                    continue
204                m = gri_re.match(line)
205                if m:
206                    gri = m.group(1)
207                    continue
208            rv = p.wait()
209        else: 
210            rv = 0
211            status = 'ACCEPTED'
212            gri = 'debug_gri'
213
214        return (rv, status, gri)
215
216    def oscars_query_vpn(self, repo, gri, v, log):
[23dec62]217        """
[6abed7b]218        Call the oscars query command from the command line and parse out the
219        data to see if the current request succeeded.  This is a lot of fiddly
220        code to do  a pretty simple thing.
[23dec62]221        """
222        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
223        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
224        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
225        path_re = re.compile("Path:")
226
[6abed7b]227        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
228            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
229        log.debug("[start_segment]: %s" % " ".join(cmd))
230        if not self.create_debug:
231            # Really do the query
232            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
233                    close_fds=True)
234            in_path = False
235            vpn1 = None
236            vpn2 = None
237            src = None
238            dest = None
239            for line in p.stdout:
240                if not in_path:
241                    m = status_re.match(line)
242                    if m: 
243                        status = m.group(1)
244                        continue
245                    m = source_re.match(line)
246                    if m:
247                        src = m.group(1)
248                        continue
249                    m = dest_re.match(line)
250                    if m:
251                        dest = m.group(1)
252                        continue
253                    m = path_re.match(line)
254                    if m:
255                        in_path = True
256                        if src and dest:
257                            vpn1_re = re.compile(
258                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
259                                            src.replace("*", "\*"))
260                            vpn2_re = re.compile(
261                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
262                                            dest.replace("*", "\*"))
263                        else:
264                            raise service_error(service_error.internal, 
265                                    "Strange output from query")
266                else:
267                    m = vpn1_re.match(line)
268                    if m:
269                        vpn1 = m.group(1)
270                        continue
271                    m = vpn2_re.match(line)
272                    if m:
273                        vpn2 = m.group(1)
274                        continue
275            rv = p.wait()
276            # Make sure that OSCARS did what we expected.
277            if vpn1 == vpn2:
278                if v is not None:
279                    if int(vpn1) == v:
280                        vlan_no = int(v)
281                    else:
282                        raise service_error(service_error.federant, 
283                                "Unexpected vlan assignment")
284                else:
285                    vlan_no = int(v or 0)
286            else:
287                raise service_error(service_error.internal,
288                        "Different VPNs on DRAGON ends")
289            log.debug("Status: %s" % status or "none")
290        else:
291            rv = 0
292            status = 'ACTIVE'
293            vlan_no = int(v or 1)
294
295        return (rv, status, vlan_no)
296
297
298
299    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
300            log=None):
301        """
302        Do the actual work of creating the dragon connecton.
303        """
304        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
305        if not log: log = self.log
306
[23dec62]307        if not vpns:
308            vpns = [ None, None, None, None, None]
309
310        if not start:
311            start = time.time()
312        if not end:
[6f82229]313            end = start + self.duration *60
[23dec62]314
315
316        status = None
317        gri = None
318        rv = None
319        vlan_no = None
320        for v in vpns:
[6abed7b]321            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
322                    start, end, log)
[23dec62]323            # Reservation in progress.  Poll the IDC until we know the outcome
[6abed7b]324            while status in waiting_states:
325                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
326                if status in waiting_states:
[ab33158]327                    time.sleep(45)
[23dec62]328            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
329                break
330
331        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
[6abed7b]332            self.log.debug("made reservation %s %s" % (gri, vlan_no))
[23dec62]333            return gri, vlan_no
334        else:
[69692a9]335            raise service_error(service_error.federant, 
336                    "Cannot make reservation")
[23dec62]337
338    def stop_segment(self, repo, gri, log=None):
[6abed7b]339        """
340        Terminate the reservation.
341        """
[23dec62]342        if not log: log = self.log
343        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
344            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
345
346
347        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
348        if not self.create_debug:
349            try:
350                f = open("/dev/null", "w")
351                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
352                f.close()
[d3c8759]353            except EnvironmentError, e:
[23dec62]354                raise service_error(service_error.internal, 
355                        "Failed to open /dev/null: %s" % e)
356
[7a011e9]357    def export_store_info(self, cf, vlan, connInfo):
358        """
359        For the export requests in the connection info, install the peer names
360        at the experiment controller via SetValue calls.
361        """
362
363        for c in connInfo:
364            for p in [ p for p in c.get('parameter', []) \
365                    if p.get('type', '') == 'output']:
366
367                if p.get('name', '') != 'vlan_id':
368                    self.log.error("Unknown export parameter: %s" % \
369                            p.get('name'))
370                    continue
371
372                k = p.get('key', None)
373                surl = p.get('store', None)
374                if surl and k:
375                    value = "%s" % vlan
376                    req = { 'name': k, 'value': value }
377                    print "calling SetValue %s %s" % (surl, req)
378                    self.call_SetValue(surl, req, cf)
379                else:
380                    self.log.error("Bad export request: %s" % p)
381
[6abed7b]382    def initialize_experiment_info(self, aid, ename):
383        repo = None
384        self.state_lock.acquire()
385        if aid in self.state:
386            repo = self.state[aid].get('user', None)
387            self.state[aid]['log'] = [ ]
388            # Create a logger that logs to the experiment's state object as
389            # well as to the main log file.
390            alloc_log = logging.getLogger('fedd.access.%s' % ename)
391            h = logging.StreamHandler(
392                    list_log.list_log(self.state[aid]['log']))
393            # XXX: there should be a global one of these rather than
394            # repeating the code.
395            h.setFormatter(logging.Formatter(
396                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
397            alloc_log.addHandler(h)
398            self.write_state()
399        self.state_lock.release()
400        return (repo, alloc_log)
401
[e83f2f2]402    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id, proof):
[7a011e9]403        """
[6abed7b]404        Place the relevant information in the global state block, and prepare
405        the response.
[7a011e9]406        """
[6abed7b]407        rtopo = topo.clone()
408        for s in rtopo.substrates:
409            s.set_attribute('vlan', vlan_no)
410            s.set_attribute('gri', gri)
[7a011e9]411
[6abed7b]412        # Grab the log (this is some anal locking, but better safe than
413        # sorry)
414        self.state_lock.acquire()
415        self.state[aid]['gri'] = gri
416        logv = "".join(self.state[aid]['log'])
417        # It's possible that the StartSegment call gets retried (!).
418        # if the 'started' key is in the allocation, we'll return it rather
419        # than redo the setup.
420        self.state[aid]['started'] = { 
421                'allocID': alloc_id,
422                'allocationLog': logv,
423                'segmentdescription': {
424                    'topdldescription': rtopo.to_dict()
425                    },
[e83f2f2]426                'proof': proof.to_dict(),
[6abed7b]427                }
428        retval = copy.deepcopy(self.state[aid]['started'])
429        self.write_state()
430        self.state_lock.release()
[7a011e9]431
[6abed7b]432        return retval
[7a011e9]433
[23dec62]434    def StartSegment(self, req, fid):
435        err = None  # Any service_error generated after tmpdir is created
436        rv = None   # Return value from segment creation
437
438        try:
439            req = req['StartSegmentRequestBody']
[6abed7b]440            topref = req['segmentdescription']['topdldescription']
[23dec62]441        except KeyError:
442            raise service_error(server_error.req, "Badly formed request")
443
444        auth_attr = req['allocID']['fedid']
445        aid = "%s" % auth_attr
446        attrs = req.get('fedAttr', [])
[e83f2f2]447        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
448                with_proof=True)
449        if not access_ok:
[23dec62]450            raise service_error(service_error.access, "Access denied")
[7a011e9]451        else:
[1627c3d]452            # See if this is a replay of an earlier succeeded StartSegment -
453            # sometimes SSL kills 'em.  If so, replay the response rather than
454            # redoing the allocation.
455            self.state_lock.acquire()
[ab847bc]456            retval = self.state[aid].get('started', None)
[1627c3d]457            self.state_lock.release()
458            if retval:
459                self.log.warning("Duplicate StartSegment for %s: " % aid + \
460                        "replaying response")
461                return retval
462
463        certfile = "%s/%s.pem" % (self.certdir, aid)
[23dec62]464
[6abed7b]465        if topref:
466            topo = topdl.Topology(**topref)
[23dec62]467        else:
468            raise service_error(service_error.req, 
469                    "Request missing segmentdescription'")
470
[7a011e9]471        connInfo = req.get('connection', [])
472
[23dec62]473        cap, src, dest, vlans = self.extract_parameters(topo)
474
475        for a in attrs:
476            if a['attribute'] == 'experiment_name':
477                ename = a['value']
[6abed7b]478                break
479        else: ename = aid
480       
481        repo, alloc_log = self.initialize_experiment_info(aid, ename)
[23dec62]482
483        if not repo:
484            raise service_error(service_error.internal, 
[6abed7b]485                    "Can't find creation user for %s" % aid)
[23dec62]486
[68bb551]487        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
488                log=alloc_log)
[23dec62]489
[7a011e9]490        self.export_store_info(certfile, vlan_no, connInfo)
491
[ab847bc]492
[23dec62]493        if gri:
[6abed7b]494            return self.finalize_experiment(topo, vlan_no, gri, aid, 
[e83f2f2]495                    req['allocID'], proof)
[23dec62]496        elif err:
497            raise service_error(service_error.federant,
498                    "Swapin failed: %s" % err)
499        else:
500            raise service_error(service_error.federant, "Swapin failed")
501
502    def TerminateSegment(self, req, fid):
503        try:
504            req = req['TerminateSegmentRequestBody']
505        except KeyError:
506            raise service_error(server_error.req, "Badly formed request")
507
508        auth_attr = req['allocID']['fedid']
509        aid = "%s" % auth_attr
[ab33158]510
511        self.log.debug("Terminate request for %s" %aid)
[23dec62]512        attrs = req.get('fedAttr', [])
[e83f2f2]513        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
514                with_proof=True)
515        if not access_ok:
[23dec62]516            raise service_error(service_error.access, "Access denied")
517
518        self.state_lock.acquire()
519        if self.state.has_key(aid):
520            gri = self.state[aid].get('gri', None)
521            user = self.state[aid].get('user', None)
522        else:
523            gri = None
524            user = None
525        self.state_lock.release()
[ab33158]526        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
[23dec62]527
528        if not gri:
529            raise service_error(service_error.internal, 
530                    "Can't find gri for %s" % aid)
531
532        if not user:
533            raise service_error(service_error.internal, 
534                    "Can't find creation user for %s" % aid)
[ab33158]535   
536        self.log.debug("Stop segment for GRI: %s" %gri)
[23dec62]537        self.stop_segment(user, gri)
[e83f2f2]538        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.