source: fedd/federation/deter_internal_access.py @ 301e941

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 301e941 was 8dbbdc5, checked in by Ted Faber <faber@…>, 15 years ago

Workaround for pickling errors. This needs a real fix.

  • Property mode set to 100644
File size: 24.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31
32# Make log messages disappear if noone configures a fedd logger
33class nullHandler(logging.Handler):
34    def emit(self, record): pass
35
36fl = logging.getLogger("fedd.access")
37fl.addHandler(nullHandler())
38
39class access:
40    class parse_error(RuntimeError): pass
41
42
43    proxy_RequestAccess= service_caller('RequestAccess')
44    proxy_ReleaseAccess= service_caller('ReleaseAccess')
45
46    @staticmethod
47    def parse_vlans(v, log=None):
48        """
49        Parse a vlan parameter into a set of vlan ids.  Comma separated
50        sequences of vlan ranges are acceptable.
51        """
52        # the vlans can be a single integer, a comma separated list or a
53        # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
54        # 100,300-305,400
55        vset = set()
56        if v:
57            for v in [ x.strip() for x in v.split(",") ]:
58                try:
59                    if v.count("-") == 1:
60                        f, t = v.split("-", 1)
61                        for i in range(int(f), int(t)+1):
62                            vset.add(i)
63                    else:
64                        vset.add(int(v))
65                except ValueError:
66                    if log:
67                        log.warn("Invalid expression in vlan list: %s" % v)
68        return vset
69
70    def __init__(self, config=None, auth=None):
71        """
72        Initializer.  Pulls parameters out of the ConfigParser's access section.
73        """
74
75        # Make sure that the configuration is in place
76        if not config: 
77            raise RunTimeError("No config to dragon_access.access")
78
79        self.allow_proxy = False
80        self.project_priority = config.getboolean("access", "project_priority")
81        self.certdir = config.get("access","certdir")
82        self.create_debug = config.getboolean("access", "create_debug")
83        self.domain = config.get("access", "domain")
84        vlan_str = config.get("access", "vlans")
85        self.vlans = self.parse_vlans(vlan_str)
86
87        self.attrs = { }
88        self.access = { }
89        # State is a dict of dicts indexed by segment fedid that includes the
90        # owners of the segment as fedids (who can manipulate it, key: owners),
91        # the repo dir/user for the allocation (key: user),  Current allocation
92        # log (key: log), and GRI of the reservation once made (key: gri)
93        self.state = { }
94        self.log = logging.getLogger("fedd.access")
95        set_log_level(config, "access", self.log)
96        self.state_lock = Lock()
97
98        if auth: self.auth = auth
99        else:
100            self.log.error(\
101                    "[access]: No authorizer initialized, creating local one.")
102            auth = authorizer()
103
104        tb = config.get('access', 'testbed')
105        if tb: self.testbed = [ t.strip() for t in tb.split(',') ]
106        else: self.testbed = [ ]
107
108        if config.has_option("access", "accessdb"):
109            self.read_access(config.get("access", "accessdb"))
110
111        self.state_filename = config.get("access", "access_state")
112        self.read_state()
113
114        # Keep cert_file and cert_pwd coming from the same place
115        self.cert_file = config.get("access", "cert_file")
116        if self.cert_file:
117            self.sert_pwd = config.get("access", "cert_pw")
118        else:
119            self.cert_file = config.get("globals", "cert_file")
120            self.sert_pwd = config.get("globals", "cert_pw")
121
122        self.trusted_certs = config.get("access", "trusted_certs") or \
123                config.get("globals", "trusted_certs")
124
125        self.call_GetValue= service_caller('GetValue')
126        self.call_SetValue= service_caller('SetValue')
127
128        self.soap_services = {\
129            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
130            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
131            'StartSegment': soap_handler("StartSegment", self.StartSegment),
132            'TerminateSegment': soap_handler("TerminateSegment", 
133                self.TerminateSegment),
134            }
135        self.xmlrpc_services =  {\
136            'RequestAccess': xmlrpc_handler('RequestAccess',
137                self.RequestAccess),
138            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
139                self.ReleaseAccess),
140            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
141            'TerminateSegment': xmlrpc_handler('TerminateSegment',
142                self.TerminateSegment),
143            }
144
145    def read_access(self, config):
146        """
147        Read a configuration file and set internal parameters.
148
149        There are access lines of the
150        form (tb, proj, user) -> user that map the first tuple of
151        names to the user for for access purposes.  Names in the key (left side)
152        can include "<NONE> or <ANY>" to act as wildcards or to require the
153        fields to be empty.  Similarly aproj or auser can be <SAME> or
154        <DYNAMIC> indicating that either the matching key is to be used or a
155        dynamic user or project will be created.  These names can also be
156        federated IDs (fedid's) if prefixed with fedid:.  The user is the repo
157        directory that contains the DRAGON user credentials.
158        Testbed attributes outside the forms above can be given using the
159        format attribute: name value: value.  The name is a single word and the
160        value continues to the end of the line.  Empty lines and lines startin
161        with a # are ignored.
162
163        Parsing errors result in a self.parse_error exception being raised.
164        """
165        lineno=0
166        name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+"
167        fedid_expr = "fedid:[" + string.hexdigits + "]+"
168        key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")"
169
170        attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)',
171                re.IGNORECASE)
172        access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+
173                key_name+'\s*\)\s*->\s*\(('+name_expr +')\s*\)', re.IGNORECASE)
174
175        def parse_name(n):
176            if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):])
177            else: return n
178       
179        def auth_name(n):
180            if isinstance(n, basestring):
181                if n =='<any>' or n =='<none>': return None
182                else: return unicode(n)
183            else:
184                return n
185
186        f = open(config, "r");
187        for line in f:
188            lineno += 1
189            line = line.strip();
190            if len(line) == 0 or line.startswith('#'):
191                continue
192
193            # Extended (attribute: x value: y) attribute line
194            m = attr_re.match(line)
195            if m != None:
196                attr, val = m.group(1,2)
197                self.attrs[attr] = val
198                continue
199
200            # Access line (t, p, u) -> (a) line
201            m = access_re.match(line)
202            if m != None:
203                access_key = tuple([ parse_name(x) for x in m.group(1,2,3)])
204                auth_key = tuple([ auth_name(x) for x in access_key])
205                user_name = auth_name(parse_name(m.group(4)))
206
207                self.access[access_key] = user_name
208                self.auth.set_attribute(auth_key, "access")
209                continue
210
211            # Nothing matched to here: unknown line - raise exception
212            f.close()
213            raise self.parse_error("Unknown statement at line %d of %s" % \
214                    (lineno, config))
215        f.close()
216
217    def write_state(self):
218        if self.state_filename:
219            try:
220                f = open(self.state_filename, 'w')
221                pickle.dump(self.state, f)
222            except IOError, e:
223                self.log.error("Can't write file %s: %s" % \
224                        (self.state_filename, e))
225            except pickle.PicklingError, e:
226                self.log.error("Pickling problem: %s" % e)
227            except TypeError, e:
228                self.log.error("Pickling problem (TypeError): %s" % e)
229
230
231    def read_state(self):
232        """
233        Read a new copy of access state.  Old state is overwritten.
234
235        State format is a simple pickling of the state dictionary.
236        """
237        if self.state_filename:
238            try:
239                f = open(self.state_filename, "r")
240                self.state = pickle.load(f)
241                self.log.debug("[read_state]: Read state from %s" % \
242                        self.state_filename)
243            except IOError, e:
244                self.log.warning(("[read_state]: No saved state: " +\
245                        "Can't open %s: %s") % (self.state_filename, e))
246            except EOFError, e:
247                self.log.warning(("[read_state]: " +\
248                        "Empty or damaged state file: %s:") % \
249                        self.state_filename)
250            except pickle.UnpicklingError, e:
251                self.log.warning(("[read_state]: No saved state: " + \
252                        "Unpickling failed: %s") % e)
253
254            # Add the ownership attributes to the authorizer.  Note that the
255            # indices of the allocation dict are strings, but the attributes are
256            # fedids, so there is a conversion.
257            for k in self.state.keys():
258                for o in self.state[k].get('owners', []):
259                    self.auth.set_attribute(o, fedid(hexstr=k))
260                self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
261                # If the allocation has a vlan assigned, remove it from the
262                # available vlans
263                v = self.state[k].get('vlan', None)
264                if v:
265                    self.vlans.discard(v)
266
267
268
269    def permute_wildcards(self, a, p):
270        """Return a copy of a with various fields wildcarded.
271
272        The bits of p control the wildcards.  A set bit is a wildcard
273        replacement with the lowest bit being user then project then testbed.
274        """
275        if p & 1: user = ["<any>"]
276        else: user = a[2]
277        if p & 2: proj = "<any>"
278        else: proj = a[1]
279        if p & 4: tb = "<any>"
280        else: tb = a[0]
281
282        return (tb, proj, user)
283
284    def find_access(self, search):
285        """
286        Search the access DB for a match on this tuple.  Return the matching
287        user (repo dir).
288       
289        NB, if the initial tuple fails to match we start inserting wildcards in
290        an order determined by self.project_priority.  Try the list of users in
291        order (when wildcarded, there's only one user in the list).
292        """
293        if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7)
294        else: perm = (0, 2, 1, 3, 4, 6, 5, 7)
295
296        for p in perm: 
297            s = self.permute_wildcards(search, p)
298            # s[2] is None on an anonymous, unwildcarded request
299            if s[2] != None:
300                for u in s[2]:
301                    if self.access.has_key((s[0], s[1], u)):
302                        return self.access[(s[0], s[1], u)]
303            else:
304                if self.access.has_key(s):
305                    return self.access[s]
306        return None
307
308    def lookup_access(self, req, fid):
309        """
310        Determine the allowed access for this request.  Return the access and
311        which fields are dynamic.
312
313        The fedid is needed to construct the request
314        """
315        # Search keys
316        tb = None
317        project = None
318        user = None
319        # Return values
320        rp = access_project(None, ())
321        ru = None
322        user_re = re.compile("user:\s(.*)")
323        project_re = re.compile("project:\s(.*)")
324
325        user = [ user_re.findall(x)[0] for x in req.get('credential', []) \
326                if user_re.match(x)]
327        project = [ project_re.findall(x)[0] \
328                for x in req.get('credential', []) \
329                    if project_re.match(x)]
330
331        if len(project) == 1: project = project[0]
332        elif len(project) == 0: project = None
333        else: 
334            raise service_error(service_error.req, 
335                    "More than one project credential")
336
337
338        user_fedids = [ u for u in user if isinstance(u, fedid)]
339
340        # Determine how the caller is representing itself.  If its fedid shows
341        # up as a project or a singleton user, let that stand.  If neither the
342        # usernames nor the project name is a fedid, the caller is a testbed.
343        if project and isinstance(project, fedid):
344            if project == fid:
345                # The caller is the project (which is already in the tuple
346                # passed in to the authorizer)
347                owners = user_fedids
348                owners.append(project)
349            else:
350                raise service_error(service_error.req,
351                        "Project asserting different fedid")
352        else:
353            if fid not in user_fedids:
354                tb = fid
355                owners = user_fedids
356                owners.append(fid)
357            else:
358                if len(fedids) > 1:
359                    raise service_error(service_error.req,
360                            "User asserting different fedid")
361                else:
362                    # Which is a singleton
363                    owners = user_fedids
364        # Confirm authorization
365
366        for u in user:
367            self.log.debug("[lookup_access] Checking access for %s" % \
368                    ((tb, project, u),))
369            if self.auth.check_attribute((tb, project, u), 'access'):
370                self.log.debug("[lookup_access] Access granted")
371                break
372            else:
373                self.log.debug("[lookup_access] Access Denied")
374        else:
375            raise service_error(service_error.access, "Access denied")
376
377        # This maps a valid user to the Emulab projects and users to use
378        found = self.find_access((tb, project, user))
379       
380        if found == None:
381            raise service_error(service_error.access,
382                    "Access denied - cannot map access")
383        return found, owners
384
385    def RequestAccess(self, req, fid):
386        """
387        Handle the access request.  Proxy if not for us.
388
389        Parse out the fields and make the allocations or rejections if for us,
390        otherwise, assuming we're willing to proxy, proxy the request out.
391        """
392
393        # The dance to get into the request body
394        if req.has_key('RequestAccessRequestBody'):
395            req = req['RequestAccessRequestBody']
396        else:
397            raise service_error(service_error.req, "No request!?")
398
399        if req.has_key('destinationTestbed'):
400            dt = unpack_id(req['destinationTestbed'])
401
402        if dt == None or dt in self.testbed:
403            # Request for this fedd
404            found, owners = self.lookup_access(req, fid)
405            # keep track of what's been added
406            allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
407            aid = unicode(allocID)
408
409            self.state_lock.acquire()
410            self.state[aid] = { }
411            self.state[aid]['user'] = found
412            self.state[aid]['owners'] = owners
413            self.state[aid]['vlan'] = None
414            self.write_state()
415            self.state_lock.release()
416            for o in owners:
417                self.auth.set_attribute(o, allocID)
418            self.auth.set_attribute(allocID, allocID)
419
420            try:
421                f = open("%s/%s.pem" % (self.certdir, aid), "w")
422                print >>f, alloc_cert
423                f.close()
424            except IOError, e:
425                raise service_error(service_error.internal, 
426                        "Can't open %s/%s : %s" % (self.certdir, aid, e))
427            return { 'allocID': { 'fedid': allocID } }
428        else:
429            if self.allow_proxy:
430                resp = self.proxy_RequestAccess.call_service(dt, req,
431                            self.cert_file, self.cert_pwd,
432                            self.trusted_certs)
433                if resp.has_key('RequestAccessResponseBody'):
434                    return resp['RequestAccessResponseBody']
435                else:
436                    return None
437            else:
438                raise service_error(service_error.access,
439                        "Access proxying denied")
440
441    def ReleaseAccess(self, req, fid):
442        # The dance to get into the request body
443        if req.has_key('ReleaseAccessRequestBody'):
444            req = req['ReleaseAccessRequestBody']
445        else:
446            raise service_error(service_error.req, "No request!?")
447
448        if req.has_key('destinationTestbed'):
449            dt = unpack_id(req['destinationTestbed'])
450        else:
451            dt = None
452
453        if dt == None or dt in self.testbed:
454            # Local request
455            try:
456                if req['allocID'].has_key('localname'):
457                    auth_attr = aid = req['allocID']['localname']
458                elif req['allocID'].has_key('fedid'):
459                    aid = unicode(req['allocID']['fedid'])
460                    auth_attr = req['allocID']['fedid']
461                else:
462                    raise service_error(service_error.req,
463                            "Only localnames and fedids are understood")
464            except KeyError:
465                raise service_error(service_error.req, "Badly formed request")
466
467            self.log.debug("[access] deallocation requested for %s", aid)
468            if not self.auth.check_attribute(fid, auth_attr):
469                self.log.debug("[access] deallocation denied for %s", aid)
470                raise service_error(service_error.access, "Access Denied")
471
472            self.state_lock.acquire()
473            if self.state.has_key(aid):
474                self.log.debug("Found allocation for %s" %aid)
475                del self.state[aid]
476                self.write_state()
477                self.state_lock.release()
478                # And remove the access cert
479                cf = "%s/%s.pem" % (self.certdir, aid)
480                self.log.debug("Removing %s" % cf)
481                os.remove(cf)
482                return { 'allocID': req['allocID'] } 
483            else:
484                self.state_lock.release()
485                raise service_error(service_error.req, "No such allocation")
486
487        else:
488            if self.allow_proxy:
489                resp = self.proxy_ReleaseAccess.call_service(dt, req,
490                            self.cert_file, self.cert_pwd,
491                            self.trusted_certs)
492                if resp.has_key('ReleaseAccessResponseBody'):
493                    return resp['ReleaseAccessResponseBody']
494                else:
495                    return None
496            else:
497                raise service_error(service_error.access,
498                        "Access proxying denied")
499
500    def extract_parameters(self, top):
501        """
502        DRAGON currently supports a fixed capacity link between two endpoints.
503        Those endpoints may specify a VPN (or list or range) to use.  This
504        extracts the DRAGON endpoints and vpn preferences from the segments (as
505        attributes) and the capacity of the connection as given by the
506        substrate.  The two endpoints VLAN choices are intersected to get set
507        of VLANs that are acceptable (no VLAN requiremnets means any is
508        acceptable).
509        """
510        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
511
512        if len(segments) != 2 or len(top.substrates) != 1:
513            raise service_error(service_error.req,
514                    "Requests to DRAGON must have exactlty two segments " +\
515                            "and one substrate")
516
517        vlans = set()
518        for s in segments:
519            v = s.get_attribute('vlans')
520            vlans &= self.parse_vlans(v)
521
522        if len(vlans) == 0:
523            vlans = None
524
525        sub = top.substrates[0]
526        if sub.capacity:
527            cap = int(sub.capacity.rate / 1000.0)
528            if cap < 1: cap = 1
529        else:
530            cap = 100
531
532        return cap, vlans
533
534    def export_store_info(self, cf, vlan, connInfo):
535        """
536        For the export requests in the connection info, install the peer names
537        at the experiment controller via SetValue calls.
538        """
539
540        for c in connInfo:
541            for p in [ p for p in c.get('parameter', []) \
542                    if p.get('type', '') == 'output']:
543
544                if p.get('name', '') != 'vlan_id':
545                    self.log.error("Unknown export parameter: %s" % \
546                            p.get('name'))
547                    continue
548
549                k = p.get('key', None)
550                surl = p.get('store', None)
551                if surl and k:
552                    value = "%s" % vlan
553                    req = { 'name': k, 'value': value }
554                    print "calling SetValue %s %s" % (surl, req)
555                    self.call_SetValue(surl, req, cf)
556                else:
557                    self.log.error("Bad export request: %s" % p)
558
559    def import_store_info(self, cf, connInfo):
560        """
561        Pull any import parameters in connInfo in.  We translate them either
562        into known member names or fedAddrs.
563        """
564
565        for c in connInfo:
566            for p in [ p for p in c.get('parameter', []) \
567                    if p.get('type', '') == 'input']:
568                name = p.get('name', None)
569                key = p.get('key', None)
570                store = p.get('store', None)
571
572                if name and key and store :
573                    req = { 'name': key, 'wait': True }
574                    r = self.call_GetValue(store, req, cf)
575                    r = r.get('GetValueResponseBody', None)
576                    if r :
577                        if r.get('name', '') == key:
578                            v = r.get('value', None)
579                            if v is not None:
580                                if name == 'peer':
581                                    c['peer'] = v
582                                else:
583                                    if c.has_key('fedAttr'):
584                                        c['fedAttr'].append({
585                                            'attribute': name, 'value': value})
586                                    else:
587                                        c['fedAttr']= [{
588                                            'attribute': name, 'value': value}]
589                            else:
590                                raise service_error(service_error.internal, 
591                                        'None value exported for %s'  % key)
592                        else:
593                            raise service_error(service_error.internal, 
594                                    'Different name returned for %s: %s' \
595                                            % (key, r.get('name','')))
596                    else:
597                        raise service_error(service_error.internal, 
598                            'Badly formatted response: no GetValueResponseBody')
599                else:
600                    raise service_error(service_error.internal, 
601                        'Bad Services missing info for import %s' % c)
602
603
604    def StartSegment(self, req, fid):
605        err = None  # Any service_error generated after tmpdir is created
606        rv = None   # Return value from segment creation
607
608        try:
609            req = req['StartSegmentRequestBody']
610        except KeyError:
611            raise service_error(server_error.req, "Badly formed request")
612
613        auth_attr = req['allocID']['fedid']
614        aid = "%s" % auth_attr
615        attrs = req.get('fedAttr', [])
616        if not self.auth.check_attribute(fid, auth_attr):
617            raise service_error(service_error.access, "Access denied")
618        else:
619            # See if this is a replay of an earlier succeeded StartSegment -
620            # sometimes SSL kills 'em.  If so, replay the response rather than
621            # redoing the allocation.
622            self.state_lock.acquire()
623            retval = self.state[aid].get('started', None)
624            self.state_lock.release()
625            if retval:
626                self.log.warning("Duplicate StartSegment for %s: " % aid + \
627                        "replaying response")
628                return retval
629
630        certfile = "%s/%s.pem" % (self.certdir, aid)
631
632        if req.has_key('segmentdescription') and \
633                req['segmentdescription'].has_key('topdldescription'):
634            topo = \
635                topdl.Topology(**req['segmentdescription']['topdldescription'])
636        else:
637            raise service_error(service_error.req, 
638                    "Request missing segmentdescription'")
639
640        connInfo = req.get('connection', [])
641
642        cap, vlans = self.extract_parameters(topo)
643
644        # No vlans passes in, consider any vlan acceptable
645        if not vlans: 
646            vlans = self.vlans
647
648        avail = self.vlans | vlans
649
650        if len(avail) != 0:
651            vlan_no = avail.pop()
652            self.vlans.discard(vlan_no)
653        else:
654            raise service_error(service_error.federant, "No vlan available")
655
656        self.export_store_info(certfile, vlan_no, connInfo)
657
658
659        rtopo = topo.clone()
660        for s in rtopo.substrates:
661            s.set_attribute('vlan', vlan_no)
662
663        # Grab the log (this is some anal locking, but better safe than
664        # sorry)
665        self.state_lock.acquire()
666        self.state[aid]['vlan'] = vlan_no
667        logv = "Allocated vlan: %d" % vlan_no
668        # It's possible that the StartSegment call gets retried (!).
669        # if the 'started' key is in the allocation, we'll return it rather
670        # than redo the setup.
671        self.state[aid]['started'] = { 
672                'allocID': req['allocID'],
673                'allocationLog': logv,
674                }
675        retval = copy.deepcopy(self.state[aid]['started'])
676        retval['allocID'] = req['allocID']
677        # XXX: this doesn't pickle!!
678        retval['segmentdescription'] = { 'topdldescription': topo.to_dict() }
679        self.write_state()
680        self.state_lock.release()
681
682        return retval
683
684    def TerminateSegment(self, req, fid):
685        try:
686            req = req['TerminateSegmentRequestBody']
687        except KeyError:
688            raise service_error(server_error.req, "Badly formed request")
689
690        auth_attr = req['allocID']['fedid']
691        aid = "%s" % auth_attr
692
693        self.log.debug("Terminate request for %s" %aid)
694        if not self.auth.check_attribute(fid, auth_attr):
695            raise service_error(service_error.access, "Access denied")
696
697        self.state_lock.acquire()
698        if self.state.has_key(aid):
699            vlan_no = self.state[aid].get('vlan', None)
700        else:
701            vlan_no = None
702        self.state_lock.release()
703        self.log.debug("Stop segment for vlan: %s" % vlan_no)
704
705        if not vlan_no:
706            raise service_error(service_error.internal, 
707                    "Can't find assigfned vlan for for %s" % aid)
708   
709        self.vlans.add(vlan_no)
710        self.state_lock.acquire()
711        self.state[aid]['vlan'] = None
712        self.state_lock.release()
713        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.