source: fedd/federation/deter_internal_access.py @ 0b2ca42

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0b2ca42 was 1502580, checked in by Ted Faber <faber@…>, 14 years ago

Duh.

Didn't I fix this before???

  • Property mode set to 100644
File size: 11.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31from access import access_base
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
40class access(access_base):
41    @staticmethod
42    def parse_vlans(v, log=None):
43        """
44        Parse a vlan parameter into a set of vlan ids.  Comma separated
45        sequences of vlan ranges are acceptable.
46        """
47        # the vlans can be a single integer, a comma separated list or a
48        # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
49        # 100,300-305,400
50        vset = set()
51        if v:
52            for v in [ x.strip() for x in v.split(",") ]:
53                try:
54                    if v.count("-") == 1:
55                        f, t = v.split("-", 1)
56                        for i in range(int(f), int(t)+1):
57                            vset.add(i)
58                    else:
59                        vset.add(int(v))
60                except ValueError:
61                    if log:
62                        log.warn("Invalid expression in vlan list: %s" % v)
63        return vset
64
65    def __init__(self, config=None, auth=None):
66        """
67        Initializer.  Pulls parameters out of the ConfigParser's access section.
68        """
69
70        access_base.__init__(self, config, auth)
71        self.domain = config.get("access", "domain")
72        vlan_str = config.get("access", "vlans")
73        self.vlans = self.parse_vlans(vlan_str)
74
75        self.attrs = { }
76        self.access = { }
77        # State is a dict of dicts indexed by segment fedid that includes the
78        # owners of the segment as fedids (who can manipulate it, key: owners),
79        # the repo dir/user for the allocation (key: user),  Current allocation
80        # log (key: log), and GRI of the reservation once made (key: gri)
81        self.log = logging.getLogger("fedd.access")
82        set_log_level(config, "access", self.log)
83
84        if config.has_option("access", "accessdb"):
85            self.read_access(config.get("access", "accessdb"))
86
87        # Add the ownership attributes to the authorizer.  Note that the
88        # indices of the allocation dict are strings, but the attributes are
89        # fedids, so there is a conversion.
90        self.state_lock.acquire()
91        for k in self.state.keys():
92            for o in self.state[k].get('owners', []):
93                self.auth.set_attribute(o, fedid(hexstr=k))
94            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
95            # If the allocation has a vlan assigned, remove it from the
96            # available vlans
97            v = self.state[k].get('vlan', None)
98            if v:
99                self.vlans.discard(v)
100        self.state_lock.release()
101
102        self.lookup_access = self.lookup_access_base
103
104        self.call_GetValue= service_caller('GetValue')
105        self.call_SetValue= service_caller('SetValue')
106
107        self.soap_services = {\
108            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
109            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
110            'StartSegment': soap_handler("StartSegment", self.StartSegment),
111            'TerminateSegment': soap_handler("TerminateSegment", 
112                self.TerminateSegment),
113            }
114        self.xmlrpc_services =  {\
115            'RequestAccess': xmlrpc_handler('RequestAccess',
116                self.RequestAccess),
117            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
118                self.ReleaseAccess),
119            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
120            'TerminateSegment': xmlrpc_handler('TerminateSegment',
121                self.TerminateSegment),
122            }
123
124    def RequestAccess(self, req, fid):
125        """
126        Handle the access request.  Proxy if not for us.
127
128        Parse out the fields and make the allocations or rejections if for us,
129        otherwise, assuming we're willing to proxy, proxy the request out.
130        """
131
132        # The dance to get into the request body
133        if req.has_key('RequestAccessRequestBody'):
134            req = req['RequestAccessRequestBody']
135        else:
136            raise service_error(service_error.req, "No request!?")
137
138        found, match = self.lookup_access(req, fid)
139        # keep track of what's been added
140        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
141        aid = unicode(allocID)
142
143        self.state_lock.acquire()
144        self.state[aid] = { }
145        self.state[aid]['user'] = found
146        self.state[aid]['owners'] = [ fid ]
147        self.state[aid]['vlan'] = None
148        self.write_state()
149        self.state_lock.release()
150        self.auth.set_attribute(fid, allocID)
151        self.auth.set_attribute(allocID, allocID)
152
153        try:
154            f = open("%s/%s.pem" % (self.certdir, aid), "w")
155            print >>f, alloc_cert
156            f.close()
157        except EnvironmentError, e:
158            raise service_error(service_error.internal, 
159                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
160        return { 'allocID': { 'fedid': allocID } }
161
162    def ReleaseAccess(self, req, fid):
163        # The dance to get into the request body
164        if req.has_key('ReleaseAccessRequestBody'):
165            req = req['ReleaseAccessRequestBody']
166        else:
167            raise service_error(service_error.req, "No request!?")
168
169        # Local request
170        try:
171            if req['allocID'].has_key('localname'):
172                auth_attr = aid = req['allocID']['localname']
173            elif req['allocID'].has_key('fedid'):
174                aid = unicode(req['allocID']['fedid'])
175                auth_attr = req['allocID']['fedid']
176            else:
177                raise service_error(service_error.req,
178                        "Only localnames and fedids are understood")
179        except KeyError:
180            raise service_error(service_error.req, "Badly formed request")
181
182        self.log.debug("[access] deallocation requested for %s", aid)
183        if not self.auth.check_attribute(fid, auth_attr):
184            self.log.debug("[access] deallocation denied for %s", aid)
185            raise service_error(service_error.access, "Access Denied")
186
187        self.state_lock.acquire()
188        if self.state.has_key(aid):
189            self.log.debug("Found allocation for %s" %aid)
190            del self.state[aid]
191            self.write_state()
192            self.state_lock.release()
193            # And remove the access cert
194            cf = "%s/%s.pem" % (self.certdir, aid)
195            self.log.debug("Removing %s" % cf)
196            os.remove(cf)
197            return { 'allocID': req['allocID'] } 
198        else:
199            self.state_lock.release()
200            raise service_error(service_error.req, "No such allocation")
201
202    def extract_parameters(self, top):
203        """
204        DETER's internal networking currently supports a fixed capacity link
205        between two endpoints.  Those endpoints may specify a VPN (or list or
206        range) to use.  This extracts the and vpn preferences from the segments
207        (as attributes) and the capacity of the connection as given by the
208        substrate.  The two endpoints VLAN choices are intersected to get set
209        of VLANs that are acceptable (no VLAN requiremnets means any is
210        acceptable).
211        """
212        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
213
214        if len(segments) != 2 or len(top.substrates) != 1:
215            raise service_error(service_error.req,
216                    "Requests to DRAGON must have exactlty two segments " +\
217                            "and one substrate")
218
219        vlans = set()
220        for s in segments:
221            v = s.get_attribute('vlans')
222            vlans &= self.parse_vlans(v)
223
224        if len(vlans) == 0:
225            vlans = None
226
227        sub = top.substrates[0]
228        if sub.capacity:
229            cap = int(sub.capacity.rate / 1000.0)
230            if cap < 1: cap = 1
231        else:
232            cap = 100
233
234        return cap, vlans
235
236    def export_store_info(self, cf, vlan, connInfo):
237        """
238        For the export requests in the connection info, install the peer names
239        at the experiment controller via SetValue calls.
240        """
241
242        for c in connInfo:
243            for p in [ p for p in c.get('parameter', []) \
244                    if p.get('type', '') == 'output']:
245
246                if p.get('name', '') != 'vlan_id':
247                    self.log.error("Unknown export parameter: %s" % \
248                            p.get('name'))
249                    continue
250
251                k = p.get('key', None)
252                surl = p.get('store', None)
253                if surl and k:
254                    value = "%s" % vlan
255                    req = { 'name': k, 'value': value }
256                    self.call_SetValue(surl, req, cf)
257                else:
258                    self.log.error("Bad export request: %s" % p)
259
260    def StartSegment(self, req, fid):
261        err = None  # Any service_error generated after tmpdir is created
262        rv = None   # Return value from segment creation
263
264        try:
265            req = req['StartSegmentRequestBody']
266            topref = req['segmentdescription']['topdldescription']
267        except KeyError:
268            raise service_error(server_error.req, "Badly formed request")
269
270        auth_attr = req['allocID']['fedid']
271        aid = "%s" % auth_attr
272        attrs = req.get('fedAttr', [])
273        if not self.auth.check_attribute(fid, auth_attr):
274            raise service_error(service_error.access, "Access denied")
275        else:
276            # See if this is a replay of an earlier succeeded StartSegment -
277            # sometimes SSL kills 'em.  If so, replay the response rather than
278            # redoing the allocation.
279            self.state_lock.acquire()
280            retval = self.state[aid].get('started', None)
281            self.state_lock.release()
282            if retval:
283                self.log.warning("Duplicate StartSegment for %s: " % aid + \
284                        "replaying response")
285                return retval
286
287        certfile = "%s/%s.pem" % (self.certdir, aid)
288
289        if topref: topo = topdl.Topology(**topref)
290        else:
291            raise service_error(service_error.req, 
292                    "Request missing segmentdescription'")
293
294        connInfo = req.get('connection', [])
295
296        cap, vlans = self.extract_parameters(topo)
297
298        # No vlans passes in, consider any vlan acceptable
299        if not vlans: 
300            vlans = self.vlans
301
302        avail = self.vlans | vlans
303
304        if len(avail) != 0:
305            vlan_no = avail.pop()
306            self.vlans.discard(vlan_no)
307        else:
308            raise service_error(service_error.federant, "No vlan available")
309
310        self.export_store_info(certfile, vlan_no, connInfo)
311
312
313        # This annotation isn't strictly necessary, but may help in debugging
314        rtopo = topo.clone()
315        for s in rtopo.substrates:
316            s.set_attribute('vlan', vlan_no)
317
318        # Grab the log (this is some anal locking, but better safe than
319        # sorry)
320        self.state_lock.acquire()
321        self.state[aid]['vlan'] = vlan_no
322        logv = "Allocated vlan: %d" % vlan_no
323        # It's possible that the StartSegment call gets retried (!).
324        # if the 'started' key is in the allocation, we'll return it rather
325        # than redo the setup.
326        self.state[aid]['started'] = { 
327                'allocID': req['allocID'],
328                'allocationLog': logv,
329                'segmentdescription': { 'topdldescription': rtopo.to_dict() }
330                }
331        retval = copy.deepcopy(self.state[aid]['started'])
332        self.write_state()
333        self.state_lock.release()
334
335        return retval
336
337    def TerminateSegment(self, req, fid):
338        try:
339            req = req['TerminateSegmentRequestBody']
340        except KeyError:
341            raise service_error(server_error.req, "Badly formed request")
342
343        auth_attr = req['allocID']['fedid']
344        aid = "%s" % auth_attr
345
346        self.log.debug("Terminate request for %s" %aid)
347        if not self.auth.check_attribute(fid, auth_attr):
348            raise service_error(service_error.access, "Access denied")
349
350        self.state_lock.acquire()
351        if self.state.has_key(aid):
352            vlan_no = self.state[aid].get('vlan', None)
353        else:
354            vlan_no = None
355        self.state_lock.release()
356        self.log.debug("Stop segment for vlan: %s" % vlan_no)
357
358        if not vlan_no:
359            raise service_error(service_error.internal, 
360                    "Can't find assigfned vlan for for %s" % aid)
361   
362        self.vlans.add(vlan_no)
363        self.state_lock.acquire()
364        self.state[aid]['vlan'] = None
365        self.state_lock.release()
366        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.