source: fedd/federation/dragon_access.py @ 027b87b

axis_examplecompt_changesinfo-ops
Last change on this file since 027b87b was 027b87b, checked in by Ted Faber <faber@…>, 13 years ago

This little class added a useless complexity. While I'm in here I removed it.

  • Property mode set to 100644
File size: 18.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13from access import access_base
14
15from util import *
16from allocate_project import allocate_project_local, allocate_project_remote
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access(access_base):
40    """
41    The implementation of access control based on mapping users to projects.
42
43    Users can be mapped to existing projects or have projects created
44    dynamically.  This implements both direct requests and proxies.
45    """
46
47    def __init__(self, config=None, auth=None):
48        """
49        Initializer.  Pulls parameters out of the ConfigParser's access section.
50        """
51        access_base.__init__(self, config, auth)
52
53        self.cli_dir = config.get("access", "cli_dir")
54        self.axis2_home = config.get("access", "axis2_home")
55        self.idc_url = config.get("access", "idc")
56        self.domain = config.get("access", "domain")
57        self.duration = config.getint("access", "duration", 120)
58
59        self.access = { }
60        if not (self.cli_dir and self.axis2_home and self.idc_url):
61            self.log.error("Must specify all of cli_dir, axis2_home, " +\
62                    "idc in the [access] section of the configuration")
63
64        if config.has_option("access", "accessdb"):
65            self.read_access(config.get("access", "accessdb"), self.make_repo)
66
67        # Add the ownership attributes to the authorizer.  Note that the
68        # indices of the allocation dict are strings, but the attributes are
69        # fedids, so there is a conversion.
70        self.state_lock.acquire()
71        for k in self.state.keys():
72            for o in self.state[k].get('owners', []):
73                self.auth.set_attribute(o, fedid(hexstr=k))
74            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
75        self.state_lock.release()
76
77        self.lookup_access = self.lookup_access_base
78
79        self.call_GetValue= service_caller('GetValue')
80        self.call_SetValue= service_caller('SetValue')
81
82        self.soap_services = {\
83            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
84            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
85            'StartSegment': soap_handler("StartSegment", self.StartSegment),
86            'TerminateSegment': soap_handler("TerminateSegment", 
87                self.TerminateSegment),
88            }
89        self.xmlrpc_services =  {\
90            'RequestAccess': xmlrpc_handler('RequestAccess',
91                self.RequestAccess),
92            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
93                self.ReleaseAccess),
94            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
95            'TerminateSegment': xmlrpc_handler('TerminateSegment',
96                self.TerminateSegment),
97            }
98
99    @staticmethod
100    def make_repo(s):
101        """
102        Get the repo directory from an access line.  This is removing the ()
103        from the string.
104        """
105        rv = s.strip()
106        if rv.startswith('(') and rv.endswith(')'): return rv[1:-1]
107        else: raise self.parse_error("Repo should be in parens");
108
109    def RequestAccess(self, req, fid):
110        """
111        Handle the access request.
112
113        Parse out the fields and make the allocations or rejections if for us,
114        otherwise, assuming we're willing to proxy, proxy the request out.
115        """
116
117        # The dance to get into the request body
118        if req.has_key('RequestAccessRequestBody'):
119            req = req['RequestAccessRequestBody']
120        else:
121            raise service_error(service_error.req, "No request!?")
122
123        if req.has_key('destinationTestbed'):
124            dt = unpack_id(req['destinationTestbed'])
125
126        # Request for this fedd
127        found, match = self.lookup_access(req, fid)
128        # keep track of what's been added
129        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
130        aid = unicode(allocID)
131
132        self.state_lock.acquire()
133        self.state[aid] = { }
134        self.state[aid]['user'] = found
135        self.state[aid]['owners'] = [ fid ]
136        self.write_state()
137        self.state_lock.release()
138        self.auth.set_attribute(fid, allocID)
139        self.auth.set_attribute(allocID, allocID)
140
141        try:
142            f = open("%s/%s.pem" % (self.certdir, aid), "w")
143            print >>f, alloc_cert
144            f.close()
145        except EnvironmentError, e:
146            raise service_error(service_error.internal, 
147                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
148        return { 'allocID': { 'fedid': allocID } }
149
150    def ReleaseAccess(self, req, fid):
151        # The dance to get into the request body
152        if req.has_key('ReleaseAccessRequestBody'):
153            req = req['ReleaseAccessRequestBody']
154        else:
155            raise service_error(service_error.req, "No request!?")
156
157        try:
158            if req['allocID'].has_key('localname'):
159                auth_attr = aid = req['allocID']['localname']
160            elif req['allocID'].has_key('fedid'):
161                aid = unicode(req['allocID']['fedid'])
162                auth_attr = req['allocID']['fedid']
163            else:
164                raise service_error(service_error.req,
165                        "Only localnames and fedids are understood")
166        except KeyError:
167            raise service_error(service_error.req, "Badly formed request")
168
169        self.log.debug("[access] deallocation requested for %s", aid)
170        if not self.auth.check_attribute(fid, auth_attr):
171            self.log.debug("[access] deallocation denied for %s", aid)
172            raise service_error(service_error.access, "Access Denied")
173
174        self.state_lock.acquire()
175        if self.state.has_key(aid):
176            self.log.debug("Found allocation for %s" %aid)
177            del self.state[aid]
178            self.write_state()
179            self.state_lock.release()
180            # And remove the access cert
181            cf = "%s/%s.pem" % (self.certdir, aid)
182            self.log.debug("Removing %s" % cf)
183            os.remove(cf)
184            return { 'allocID': req['allocID'] } 
185        else:
186            self.state_lock.release()
187            raise service_error(service_error.req, "No such allocation")
188
189    def extract_parameters(self, top):
190        """
191        DRAGON currently supports a fixed capacity link between two endpoints.
192        Those endpoints may specify a VPN (or list or range) to use.  This
193        extracts the DRAGON endpoints and vpn preferences from the segments (as
194        attributes) and the capacity of the connection as given by the
195        substrate.  The two endpoints VLAN choices are intersected to get set
196        of VLANs that are acceptable (no VLAN requiremnets means any is
197        acceptable).
198        """
199        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
200
201        if len(segments) != 2 or len(top.substrates) != 1:
202            raise service_error(service_error.req,
203                    "Requests to DRAGON must have exactlty two segments " +\
204                            "and one substrate")
205
206        ends = [ ]
207        for s in segments:
208            ep = s.get_attribute('dragon_endpoint')
209            if not ep:
210                raise service_error(service_error.req, 
211                        "Missing DRAGON endpoint for %s" % s.id)
212            v = s.get_attribute('vlans')
213            vlans = None
214            vset = set()
215            # the vlans can be a single integer, a comma separated list or a
216            # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
217            # 100,300-305,400
218            if v:
219                if v.count(",") > 0 :
220                    vl = [ x.strip() for x in v.split(",") ]
221                else:
222                    vl = [ v ]
223                for v in vl:
224                    try:
225                        if v.count("-")> 0:
226                            f, t = v.split("-", 1)
227                            for i in range(int(f), int(t)+1):
228                                vset.add(i)
229                        else:
230                            vset.add(int(v))
231                    except ValueError:
232                        raise service_error(service_error.req, 
233                                "VLAN tags must be integers (%s)" %s.name)
234            if len(vset) > 0:
235                if vlans: vlans &= vest
236                else: vlans = vset
237            ends.append(ep)
238
239
240        sub = top.substrates[0]
241        if sub.capacity:
242            cap = int(sub.capacity.rate / 1000.0)
243            if cap < 1: cap = 1
244        else:
245            cap = 100
246
247
248        # DRAGON's command line tool barfs if the source (ends[0]) is not in
249        # the domain controlled by the IDC.  This code ensures this situation.
250        if self.domain and not ends[0].endswith(self.domain):
251            hold = ends[0]
252            for i, e in enumerate(ends):
253                if i == 0: continue
254                if e.endswith(self.domain):
255                    ends[0] = e
256                    ends[i] = hold
257                    break
258            else:
259                raise service_error(service_error.req, 
260                        "No endpoint in my domain")
261
262
263        return cap, ends[0], ends[1], vlans
264
265    def oscars_create_vpn(self, repo, fr, to, cap, v, start, end, log):
266
267        gri_re = re.compile("GRI:\s*(.*)", re.IGNORECASE)
268        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
269
270        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
271            'createReservation', '-repo',  repo , '-url', self.idc_url,
272            '-l2source', fr, '-l2dest', to, '-bwidth', "%s" % cap,
273            '-vlan', "%s" % v, '-desc', 'fedd created connection',
274            '-pathsetup', 'timer-automatic', '-start', "%d" % int(start),
275            '-end', "%d" % int(end)]
276        log.debug("[start_segment]: %s" % " ".join(cmd))
277        if not self.create_debug: 
278            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
279                    close_fds=True)
280            for line in p.stdout:
281                m = status_re.match(line)
282                if m:
283                    status = m.group(1)
284                    continue
285                m = gri_re.match(line)
286                if m:
287                    gri = m.group(1)
288                    continue
289            rv = p.wait()
290        else: 
291            rv = 0
292            status = 'ACCEPTED'
293            gri = 'debug_gri'
294
295        return (rv, status, gri)
296
297    def oscars_query_vpn(self, repo, gri, v, log):
298        """
299        Call the oscars query command from the command line and parse out the
300        data to see if the current request succeeded.  This is a lot of fiddly
301        code to do  a pretty simple thing.
302        """
303        status_re = re.compile("Status:\s*(.*)", re.IGNORECASE)
304        source_re = re.compile("Source\s+Endpoint:\s*(.*)", re.IGNORECASE)
305        dest_re = re.compile("Destination\s+Endpoint:\s*(.*)", re.IGNORECASE)
306        path_re = re.compile("Path:")
307
308        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
309            'query', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
310        log.debug("[start_segment]: %s" % " ".join(cmd))
311        if not self.create_debug:
312            # Really do the query
313            p = Popen(cmd, cwd=self.cli_dir, stdout=PIPE, stderr=STDOUT,
314                    close_fds=True)
315            in_path = False
316            vpn1 = None
317            vpn2 = None
318            src = None
319            dest = None
320            for line in p.stdout:
321                if not in_path:
322                    m = status_re.match(line)
323                    if m: 
324                        status = m.group(1)
325                        continue
326                    m = source_re.match(line)
327                    if m:
328                        src = m.group(1)
329                        continue
330                    m = dest_re.match(line)
331                    if m:
332                        dest = m.group(1)
333                        continue
334                    m = path_re.match(line)
335                    if m:
336                        in_path = True
337                        if src and dest:
338                            vpn1_re = re.compile(
339                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
340                                            src.replace("*", "\*"))
341                            vpn2_re = re.compile(
342                                    "\s*%s,\s*\w+\s*,\s*(\d+)" % \
343                                            dest.replace("*", "\*"))
344                        else:
345                            raise service_error(service_error.internal, 
346                                    "Strange output from query")
347                else:
348                    m = vpn1_re.match(line)
349                    if m:
350                        vpn1 = m.group(1)
351                        continue
352                    m = vpn2_re.match(line)
353                    if m:
354                        vpn2 = m.group(1)
355                        continue
356            rv = p.wait()
357            # Make sure that OSCARS did what we expected.
358            if vpn1 == vpn2:
359                if v is not None:
360                    if int(vpn1) == v:
361                        vlan_no = int(v)
362                    else:
363                        raise service_error(service_error.federant, 
364                                "Unexpected vlan assignment")
365                else:
366                    vlan_no = int(v or 0)
367            else:
368                raise service_error(service_error.internal,
369                        "Different VPNs on DRAGON ends")
370            log.debug("Status: %s" % status or "none")
371        else:
372            rv = 0
373            status = 'ACTIVE'
374            vlan_no = int(v or 1)
375
376        return (rv, status, vlan_no)
377
378
379
380    def start_segment(self, repo, fr, to, cap, vpns=None, start=None, end=None,
381            log=None):
382        """
383        Do the actual work of creating the dragon connecton.
384        """
385        waiting_states = ('ACCEPTED', 'INSETUP', 'INCREATE', 'PENDING')
386        if not log: log = self.log
387
388        if not vpns:
389            vpns = [ None, None, None, None, None]
390
391        if not start:
392            start = time.time()
393        if not end:
394            end = start + self.duration *60
395
396
397        status = None
398        gri = None
399        rv = None
400        vlan_no = None
401        for v in vpns:
402            rv, status, gri = self.oscars_create_vpn(repo, fr, to, cap, v, 
403                    start, end, log)
404            # Reservation in progress.  Poll the IDC until we know the outcome
405            while status in waiting_states:
406                rv, status, vlan_no = self.oscars_query_vpn(repo, gri, v, log)
407                if status in waiting_states:
408                    time.sleep(45)
409            if status in ('ACTIVE', 'FINISHED', 'CANCELLED'):
410                break
411
412        if (rv == 0 and gri and vlan_no and status == 'ACTIVE'):
413            self.log.debug("made reservation %s %s" % (gri, vlan_no))
414            return gri, vlan_no
415        else:
416            raise service_error(service_error.federant, 
417                    "Cannot make reservation")
418
419    def stop_segment(self, repo, gri, log=None):
420        """
421        Terminate the reservation.
422        """
423        if not log: log = self.log
424        cmd = ['env', 'AXIS2_HOME=%s' % self.axis2_home, './run.sh', 
425            'cancel', '-repo',  repo , '-url', self.idc_url, '-gri', gri]
426
427
428        self.log.debug("[stop_segment]: %s" % " ".join(cmd))
429        if not self.create_debug:
430            try:
431                f = open("/dev/null", "w")
432                call(cmd, cwd=self.cli_dir, stdout=f, stderr=f, close_fds=True)
433                f.close()
434            except EnvironmentError, e:
435                raise service_error(service_error.internal, 
436                        "Failed to open /dev/null: %s" % e)
437
438    def export_store_info(self, cf, vlan, connInfo):
439        """
440        For the export requests in the connection info, install the peer names
441        at the experiment controller via SetValue calls.
442        """
443
444        for c in connInfo:
445            for p in [ p for p in c.get('parameter', []) \
446                    if p.get('type', '') == 'output']:
447
448                if p.get('name', '') != 'vlan_id':
449                    self.log.error("Unknown export parameter: %s" % \
450                            p.get('name'))
451                    continue
452
453                k = p.get('key', None)
454                surl = p.get('store', None)
455                if surl and k:
456                    value = "%s" % vlan
457                    req = { 'name': k, 'value': value }
458                    print "calling SetValue %s %s" % (surl, req)
459                    self.call_SetValue(surl, req, cf)
460                else:
461                    self.log.error("Bad export request: %s" % p)
462
463    def initialize_experiment_info(self, aid, ename):
464        repo = None
465        self.state_lock.acquire()
466        if aid in self.state:
467            repo = self.state[aid].get('user', None)
468            self.state[aid]['log'] = [ ]
469            # Create a logger that logs to the experiment's state object as
470            # well as to the main log file.
471            alloc_log = logging.getLogger('fedd.access.%s' % ename)
472            h = logging.StreamHandler(
473                    list_log.list_log(self.state[aid]['log']))
474            # XXX: there should be a global one of these rather than
475            # repeating the code.
476            h.setFormatter(logging.Formatter(
477                "%(asctime)s %(name)s %(message)s", '%d %b %y %H:%M:%S'))
478            alloc_log.addHandler(h)
479            self.write_state()
480        self.state_lock.release()
481        return (repo, alloc_log)
482
483    def finalize_experiment(self, topo, vlan_no, gri, aid, alloc_id):
484        """
485        Place the relevant information in the global state block, and prepare
486        the response.
487        """
488        rtopo = topo.clone()
489        for s in rtopo.substrates:
490            s.set_attribute('vlan', vlan_no)
491            s.set_attribute('gri', gri)
492
493        # Grab the log (this is some anal locking, but better safe than
494        # sorry)
495        self.state_lock.acquire()
496        self.state[aid]['gri'] = gri
497        logv = "".join(self.state[aid]['log'])
498        # It's possible that the StartSegment call gets retried (!).
499        # if the 'started' key is in the allocation, we'll return it rather
500        # than redo the setup.
501        self.state[aid]['started'] = { 
502                'allocID': alloc_id,
503                'allocationLog': logv,
504                'segmentdescription': {
505                    'topdldescription': rtopo.to_dict()
506                    },
507                }
508        retval = copy.deepcopy(self.state[aid]['started'])
509        self.write_state()
510        self.state_lock.release()
511
512        return retval
513
514    def StartSegment(self, req, fid):
515        err = None  # Any service_error generated after tmpdir is created
516        rv = None   # Return value from segment creation
517
518        try:
519            req = req['StartSegmentRequestBody']
520            topref = req['segmentdescription']['topdldescription']
521        except KeyError:
522            raise service_error(server_error.req, "Badly formed request")
523
524        auth_attr = req['allocID']['fedid']
525        aid = "%s" % auth_attr
526        attrs = req.get('fedAttr', [])
527        if not self.auth.check_attribute(fid, auth_attr):
528            raise service_error(service_error.access, "Access denied")
529        else:
530            # See if this is a replay of an earlier succeeded StartSegment -
531            # sometimes SSL kills 'em.  If so, replay the response rather than
532            # redoing the allocation.
533            self.state_lock.acquire()
534            retval = self.state[aid].get('started', None)
535            self.state_lock.release()
536            if retval:
537                self.log.warning("Duplicate StartSegment for %s: " % aid + \
538                        "replaying response")
539                return retval
540
541        certfile = "%s/%s.pem" % (self.certdir, aid)
542
543        if topref:
544            topo = topdl.Topology(**topref)
545        else:
546            raise service_error(service_error.req, 
547                    "Request missing segmentdescription'")
548
549        connInfo = req.get('connection', [])
550
551        cap, src, dest, vlans = self.extract_parameters(topo)
552
553        for a in attrs:
554            if a['attribute'] == 'experiment_name':
555                ename = a['value']
556                break
557        else: ename = aid
558       
559        repo, alloc_log = self.initialize_experiment_info(aid, ename)
560
561        if not repo:
562            raise service_error(service_error.internal, 
563                    "Can't find creation user for %s" % aid)
564
565        gri, vlan_no = self.start_segment(repo, src, dest, cap, vlans,
566                log=alloc_log)
567
568        self.export_store_info(certfile, vlan_no, connInfo)
569
570
571        if gri:
572            return self.finalize_experiment(topo, vlan_no, gri, aid, 
573                    req['allocID'])
574        elif err:
575            raise service_error(service_error.federant,
576                    "Swapin failed: %s" % err)
577        else:
578            raise service_error(service_error.federant, "Swapin failed")
579
580    def TerminateSegment(self, req, fid):
581        try:
582            req = req['TerminateSegmentRequestBody']
583        except KeyError:
584            raise service_error(server_error.req, "Badly formed request")
585
586        auth_attr = req['allocID']['fedid']
587        aid = "%s" % auth_attr
588
589        self.log.debug("Terminate request for %s" %aid)
590        attrs = req.get('fedAttr', [])
591        if not self.auth.check_attribute(fid, auth_attr):
592            raise service_error(service_error.access, "Access denied")
593
594        self.state_lock.acquire()
595        if self.state.has_key(aid):
596            gri = self.state[aid].get('gri', None)
597            user = self.state[aid].get('user', None)
598        else:
599            gri = None
600            user = None
601        self.state_lock.release()
602        self.log.debug("Stop segment for user: %s gre %s" %(user, gri))
603
604        if not gri:
605            raise service_error(service_error.internal, 
606                    "Can't find gri for %s" % aid)
607
608        if not user:
609            raise service_error(service_error.internal, 
610                    "Can't find creation user for %s" % aid)
611   
612        self.log.debug("Stop segment for GRI: %s" %gri)
613        self.stop_segment(user, gri)
614        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.