source: fedd/federation/dragon_access.py @ c7141dc

compt_changesinfo-ops
Last change on this file since c7141dc was c7141dc, checked in by Ted Faber <faber@…>, 12 years ago

Single access works

  • Property mode set to 100644
File size: 16.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13from access import access_base
14from legacy_access import legacy_access
15
16from util import *
17from allocate_project import allocate_project_local, allocate_project_remote
18from fedid import fedid, generate_fedid
19from authorizer import authorizer, abac_authorizer
20from service_error import service_error
21from remote_service import xmlrpc_handler, soap_handler, service_caller
22
23import httplib
24import tempfile
25from urlparse import urlparse
26
27import topdl
28import list_log
29
30
31# Make log messages disappear if noone configures a fedd logger
32class nullHandler(logging.Handler):
33    def emit(self, record): pass
34
35fl = logging.getLogger("fedd.access")
36fl.addHandler(nullHandler())
37
38class access(access_base, legacy_access):
39    """
40    The implementation of access control based on mapping users to projects.
41
42    Users can be mapped to existing projects or have projects created
43    dynamically.  This implements both direct requests and proxies.
44    """
45
46    def __init__(self, config=None, auth=None):
47        """
48        Initializer.  Pulls parameters out of the ConfigParser's access section.
49        """
50        access_base.__init__(self, config, auth)
51
52        self.cli_dir = config.get("access", "cli_dir")
53        self.axis2_home = config.get("access", "axis2_home")
54        self.idc_url = config.get("access", "idc")
55        self.domain = config.get("access", "domain")
56        self.duration = config.getint("access", "duration", 120)
57
58        self.access = { }
59        if not (self.cli_dir and self.axis2_home and self.idc_url):
60            self.log.error("Must specify all of cli_dir, axis2_home, " +\
61                    "idc in the [access] section of the configuration")
62
63        # authorization information
64        self.auth_type = config.get('access', 'auth_type') \
65                or 'legacy'
66        self.auth_dir = config.get('access', 'auth_dir')
67        accessdb = config.get("access", "accessdb")
68        # initialize the authorization system
69        if self.auth_type == 'legacy':
70            self.access = { }
71            if accessdb:
72                self.legacy_read_access(accessdb, self.make_repo)
73            # Add the ownership attributes to the authorizer.  Note that the
74            # indices of the allocation dict are strings, but the attributes are
75            # fedids, so there is a conversion.
76            self.state_lock.acquire()
77            for k in self.state.keys():
78                for o in self.state[k].get('owners', []):
79                    self.auth.set_attribute(o, fedid(hexstr=k))
80                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
81            self.state_lock.release()
82            self.lookup_access = self.legacy_lookup_access_base
83        elif self.auth_type == 'abac':
84            self.auth = abac_authorizer(load=self.auth_dir)
85            self.access = [ ]
86            if accessdb:
87                self.read_access(accessdb, self.make_repo)
88        else:
89            raise service_error(service_error.internal, 
90                    "Unknown auth_type: %s" % self.auth_type)
91
92        self.call_GetValue= service_caller('GetValue')
93        self.call_SetValue= service_caller('SetValue')
94
95        self.soap_services = {\
96            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
97            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
98            'StartSegment': soap_handler("StartSegment", self.StartSegment),
99            'TerminateSegment': soap_handler("TerminateSegment", 
100                self.TerminateSegment),
101            }
102        self.xmlrpc_services =  {\
103            'RequestAccess': xmlrpc_handler('RequestAccess',
104                self.RequestAccess),
105            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
106                self.ReleaseAccess),
107            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
108            'TerminateSegment': xmlrpc_handler('TerminateSegment',
109                self.TerminateSegment),
110            }
111
112    @staticmethod
113    def make_repo(s):
114        """
115        Get the repo directory from an access line.  This is removing the ()
116        from the string.
117        """
118        rv = s.strip()
119        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
120        else: raise self.parse_error("Repo should be in parens");
121
122    # RequestAccess and ReleaseAccess come from the base class
123
124    def extract_parameters(self, top):
125        """
126        DRAGON currently supports a fixed capacity link between two endpoints.
127        Those endpoints may specify a VPN (or list or range) to use.  This
128        extracts the DRAGON endpoints and vpn preferences from the segments (as
129        attributes) and the capacity of the connection as given by the
130        substrate.  The two endpoints VLAN choices are intersected to get set
131        of VLANs that are acceptable (no VLAN requiremnets means any is
132        acceptable).
133        """
134        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
135
136        if len(segments) != 2 or len(top.substrates) != 1:
137            raise service_error(service_error.req,
138                    "Requests to DRAGON must have exactlty two segments " +\
139                            "and one substrate")
140
141        ends = [ ]
142        for s in segments:
143            ep = s.get_attribute('dragon_endpoint')
144            if not ep:
145                raise service_error(service_error.req, 
146                        "Missing DRAGON endpoint for %s" % s.id)
147            v = s.get_attribute('vlans')
148            vlans = None
149            vset = set()
150            # the vlans can be a single integer, a comma separated list or a
151            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
152            # 100,300-305,400
153            if v:
154                if v.count(",") > 0 :
155                    vl = [ x.strip() for x in v.split(",") ]
156                else:
157                    vl = [ v ]
158                for v in vl:
159                    try:
160                        if v.count("-")> 0:
161                            f, t = v.split("-", 1)
162                            for i in range(int(f), int(t)+1):
163                                vset.add(i)
164                        else:
165                            vset.add(int(v))
166                    except ValueError:
167                        raise service_error(service_error.req, 
168                                "VLAN tags must be integers (%s)" %s.name)
169            if len(vset) > 0:
170                if vlans: vlans &= vest
171                else: vlans = vset
172            ends.append(ep)
173
174
175        sub = top.substrates[0]
176        if sub.capacity:
177            cap = int(sub.capacity.rate / 1000.0)
178            if cap < 1: cap = 1
179        else:
180            cap = 100
181
182
183        # DRAGON's command line tool barfs if the source (ends[0]) is not in
184        # the domain controlled by the IDC.  This code ensures this situation.
185        if self.domain and not ends[0].endswith(self.domain):
186            hold = ends[0]
187            for i, e in enumerate(ends):
188                if i == 0: continue
189                if e.endswith(self.domain):
190                    ends[0] = e
191                    ends[i] = hold
192                    break
193            else:
194                raise service_error(service_error.req, 
195                        "No endpoint in my domain")
196
197
198        return cap, ends[0], ends[1], vlans
199
200    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
201
202        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
203        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
204
205        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
206            'createReservation', '-repo',  repo , '-url', self.idc_url,
207            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
208            '-vlan', "%s" % v, '-desc', 'fedd created connection',
209            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
210            '-end', "%d" % int(end)]
211        log.debug("[start_segment]: %s" % " ".join(cmd))
212        if not self.create_debug: 
213            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
214                    close_fds=True)
215            for line in p.stdout:
216                m = status_re.match(line)
217                if m:
218                    status = m.group(1)
219                    continue
220                m = gri_re.match(line)
221                if m:
222                    gri = m.group(1)
223                    continue
224            rv = p.wait()
225        else: 
226            rv = 0
227            status = 'ACCEPTED'
228            gri = 'debug_gri'
229
230        return (rv, status, gri)
231
232    def oscars_query_vpn(self, repo, gri, v, log):
233        """
234        Call the oscars query command from the command line and parse out the
235        data to see if the current request succeeded.  This is a lot of fiddly
236        code to do  a pretty simple thing.
237        """
238        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
239        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
240        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
241        path_re = re.compile("Path:")
242
243        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
244            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
245        log.debug("[start_segment]: %s" % " ".join(cmd))
246        if not self.create_debug:
247            # Really do the query
248            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
249                    close_fds=True)
250            in_path = False
251            vpn1 = None
252            vpn2 = None
253            src = None
254            dest = None
255            for line in p.stdout:
256                if not in_path:
257                    m = status_re.match(line)
258                    if m: 
259                        status = m.group(1)
260                        continue
261                    m = source_re.match(line)
262                    if m:
263                        src = m.group(1)
264                        continue
265                    m = dest_re.match(line)
266                    if m:
267                        dest = m.group(1)
268                        continue
269                    m = path_re.match(line)
270                    if m:
271                        in_path = True
272                        if src and dest:
273                            vpn1_re = re.compile(
274                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
275                                            src.replace("*", "\*"))
276                            vpn2_re = re.compile(
277                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
278                                            dest.replace("*", "\*"))
279                        else:
280                            raise service_error(service_error.internal, 
281                                    "Strange output from query")
282                else:
283                    m = vpn1_re.match(line)
284                    if m:
285                        vpn1 = m.group(1)
286                        continue
287                    m = vpn2_re.match(line)
288                    if m:
289                        vpn2 = m.group(1)
290                        continue
291            rv = p.wait()
292            # Make sure that OSCARS did what we expected.
293            if vpn1 == vpn2:
294                if v is not None:
295                    if int(vpn1) == v:
296                        vlan_no = int(v)
297                    else:
298                        raise service_error(service_error.federant, 
299                                "Unexpected vlan assignment")
300                else:
301                    vlan_no = int(v or 0)
302            else:
303                raise service_error(service_error.internal,
304                        "Different VPNs on DRAGON ends")
305            log.debug("Status: %s" % status or "none")
306        else:
307            rv = 0
308            status = 'ACTIVE'
309            vlan_no = int(v or 1)
310
311        return (rv, status, vlan_no)
312
313
314
315    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
316            log=None):
317        """
318        Do the actual work of creating the dragon connecton.
319        """
320        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
321        if not log: log = self.log
322
323        if not vpns:
324            vpns = [ None, None, None, None, None]
325
326        if not start:
327            start = time.time()
328        if not end:
329            end = start + self.duration *60
330
331
332        status = None
333        gri = None
334        rv = None
335        vlan_no = None
336        for v in vpns:
337            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
338                    start, end, log)
339            # Reservation in progress.  Poll the IDC until we know the outcome
340            while status in waiting_states:
341                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
342                if status in waiting_states:
343                    time.sleep(45)
344            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
345                break
346
347        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
348            self.log.debug("made reservation %s %s" % (gri, vlan_no))
349            return gri, vlan_no
350        else:
351            raise service_error(service_error.federant, 
352                    "Cannot make reservation")
353
354    def stop_segment(self, repo, gri, log=None):
355        """
356        Terminate the reservation.
357        """
358        if not log: log = self.log
359        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
360            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
361
362
363        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
364        if not self.create_debug:
365            try:
366                f = open("/dev/null", "w")
367                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
368                f.close()
369            except EnvironmentError, e:
370                raise service_error(service_error.internal, 
371                        "Failed to open /dev/null: %s" % e)
372
373    def export_store_info(self, cf, vlan, connInfo):
374        """
375        For the export requests in the connection info, install the peer names
376        at the experiment controller via SetValue calls.
377        """
378
379        for c in connInfo:
380            for p in [ p for p in c.get('parameter', []) \
381                    if p.get('type', '') == 'output']:
382
383                if p.get('name', '') != 'vlan_id':
384                    self.log.error("Unknown export parameter: %s" % \
385                            p.get('name'))
386                    continue
387
388                k = p.get('key', None)
389                surl = p.get('store', None)
390                if surl and k:
391                    value = "%s" % vlan
392                    req = { 'name': k, 'value': value }
393                    print "calling SetValue %s %s" % (surl, req)
394                    self.call_SetValue(surl, req, cf)
395                else:
396                    self.log.error("Bad export request: %s" % p)
397
398    def initialize_experiment_info(self, aid, ename):
399        repo = None
400        self.state_lock.acquire()
401        if aid in self.state:
402            repo = self.state[aid].get('user', None)
403            self.state[aid]['log'] = [ ]
404            # Create a logger that logs to the experiment's state object as
405            # well as to the main log file.
406            alloc_log = logging.getLogger('fedd.access.%s' % ename)
407            h = logging.StreamHandler(
408                    list_log.list_log(self.state[aid]['log']))
409            # XXX: there should be a global one of these rather than
410            # repeating the code.
411            h.setFormatter(logging.Formatter(
412                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
413            alloc_log.addHandler(h)
414            self.write_state()
415        self.state_lock.release()
416        return (repo, alloc_log)
417
418    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id, proof):
419        """
420        Place the relevant information in the global state block, and prepare
421        the response.
422        """
423        rtopo = topo.clone()
424        for s in rtopo.substrates:
425            s.set_attribute('vlan', vlan_no)
426            s.set_attribute('gri', gri)
427
428        # Grab the log (this is some anal locking, but better safe than
429        # sorry)
430        self.state_lock.acquire()
431        self.state[aid]['gri'] = gri
432        logv = "".join(self.state[aid]['log'])
433        # It's possible that the StartSegment call gets retried (!).
434        # if the 'started' key is in the allocation, we'll return it rather
435        # than redo the setup.
436        self.state[aid]['started'] = { 
437                'allocID': alloc_id,
438                'allocationLog': logv,
439                'segmentdescription': {
440                    'topdldescription': rtopo.to_dict()
441                    },
442                'proof': proof.to_dict(),
443                }
444        retval = copy.deepcopy(self.state[aid]['started'])
445        self.write_state()
446        self.state_lock.release()
447
448        return retval
449
450    def StartSegment(self, req, fid):
451        err = None  # Any service_error generated after tmpdir is created
452        rv = None   # Return value from segment creation
453
454        try:
455            req = req['StartSegmentRequestBody']
456            topref = req['segmentdescription']['topdldescription']
457        except KeyError:
458            raise service_error(server_error.req, "Badly formed request")
459
460        auth_attr = req['allocID']['fedid']
461        aid = "%s" % auth_attr
462        attrs = req.get('fedAttr', [])
463        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
464                with_proof=True)
465        if not access_ok:
466            raise service_error(service_error.access, "Access denied")
467        else:
468            # See if this is a replay of an earlier succeeded StartSegment -
469            # sometimes SSL kills 'em.  If so, replay the response rather than
470            # redoing the allocation.
471            self.state_lock.acquire()
472            retval = self.state[aid].get('started', None)
473            self.state_lock.release()
474            if retval:
475                self.log.warning("Duplicate StartSegment for %s: " % aid + \
476                        "replaying response")
477                return retval
478
479        certfile = "%s/%s.pem" % (self.certdir, aid)
480
481        if topref:
482            topo = topdl.Topology(**topref)
483        else:
484            raise service_error(service_error.req, 
485                    "Request missing segmentdescription'")
486
487        connInfo = req.get('connection', [])
488
489        cap, src, dest, vlans = self.extract_parameters(topo)
490
491        for a in attrs:
492            if a['attribute'] == 'experiment_name':
493                ename = a['value']
494                break
495        else: ename = aid
496       
497        repo, alloc_log = self.initialize_experiment_info(aid, ename)
498
499        if not repo:
500            raise service_error(service_error.internal, 
501                    "Can't find creation user for %s" % aid)
502
503        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
504                log=alloc_log)
505
506        self.export_store_info(certfile, vlan_no, connInfo)
507
508
509        if gri:
510            return self.finalize_experiment(topo, vlan_no, gri, aid, 
511                    req['allocID'], proof)
512        elif err:
513            raise service_error(service_error.federant,
514                    "Swapin failed: %s" % err)
515        else:
516            raise service_error(service_error.federant, "Swapin failed")
517
518    def TerminateSegment(self, req, fid):
519        try:
520            req = req['TerminateSegmentRequestBody']
521        except KeyError:
522            raise service_error(server_error.req, "Badly formed request")
523
524        auth_attr = req['allocID']['fedid']
525        aid = "%s" % auth_attr
526
527        self.log.debug("Terminate request for %s" %aid)
528        attrs = req.get('fedAttr', [])
529        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
530                with_proof=True)
531        if not access_ok:
532            raise service_error(service_error.access, "Access denied")
533
534        self.state_lock.acquire()
535        if self.state.has_key(aid):
536            gri = self.state[aid].get('gri', None)
537            user = self.state[aid].get('user', None)
538        else:
539            gri = None
540            user = None
541        self.state_lock.release()
542        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
543
544        if not gri:
545            raise service_error(service_error.internal, 
546                    "Can't find gri for %s" % aid)
547
548        if not user:
549            raise service_error(service_error.internal, 
550                    "Can't find creation user for %s" % aid)
551   
552        self.log.debug("Stop segment for GRI: %s" %gri)
553        self.stop_segment(user, gri)
554        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
Note: See TracBrowser for help on using the repository browser.