source: fedd/federation/deter_internal_access.py @ fa4a4a8

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since fa4a4a8 was fa4a4a8, checked in by Ted Faber <faber@…>, 14 years ago

Drieve from access. Lots of redundant code excised.

  • Property mode set to 100644
File size: 11.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import re
5import string
6import copy
7import pickle
8import logging
9import time
10
11from threading import Thread, Lock
12from subprocess import Popen, call, PIPE, STDOUT
13
14from util import *
15from allocate_project import allocate_project_local, allocate_project_remote
16from access_project import access_project
17from fedid import fedid, generate_fedid
18from authorizer import authorizer
19from service_error import service_error
20from remote_service import xmlrpc_handler, soap_handler, service_caller
21
22import httplib
23import tempfile
24from urlparse import urlparse
25
26import topdl
27import list_log
28import proxy_emulab_segment
29import local_emulab_segment
30
31from access import access_base
32
33# Make log messages disappear if noone configures a fedd logger
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.access")
38fl.addHandler(nullHandler())
39
40class access(access_base):
41    @staticmethod
42    def parse_vlans(v, log=None):
43        """
44        Parse a vlan parameter into a set of vlan ids.  Comma separated
45        sequences of vlan ranges are acceptable.
46        """
47        # the vlans can be a single integer, a comma separated list or a
48        # comma separated lists of dashed ranges.  E.g 100 or 100,300 or
49        # 100,300-305,400
50        vset = set()
51        if v:
52            for v in [ x.strip() for x in v.split(",") ]:
53                try:
54                    if v.count("-") == 1:
55                        f, t = v.split("-", 1)
56                        for i in range(int(f), int(t)+1):
57                            vset.add(i)
58                    else:
59                        vset.add(int(v))
60                except ValueError:
61                    if log:
62                        log.warn("Invalid expression in vlan list: %s" % v)
63        return vset
64
65    def __init__(self, config=None, auth=None):
66        """
67        Initializer.  Pulls parameters out of the ConfigParser's access section.
68        """
69
70        access_base.__init__(self, config, auth)
71        self.domain = config.get("access", "domain")
72        vlan_str = config.get("access", "vlans")
73        self.vlans = self.parse_vlans(vlan_str)
74
75        self.attrs = { }
76        self.access = { }
77        # State is a dict of dicts indexed by segment fedid that includes the
78        # owners of the segment as fedids (who can manipulate it, key: owners),
79        # the repo dir/user for the allocation (key: user),  Current allocation
80        # log (key: log), and GRI of the reservation once made (key: gri)
81        self.state = { }
82        self.log = logging.getLogger("fedd.access")
83        set_log_level(config, "access", self.log)
84
85        if config.has_option("access", "accessdb"):
86            self.read_access(config.get("access", "accessdb"))
87
88        # Add the ownership attributes to the authorizer.  Note that the
89        # indices of the allocation dict are strings, but the attributes are
90        # fedids, so there is a conversion.
91        self.state_lock.acquire()
92        for k in self.state.keys():
93            for o in self.state[k].get('owners', []):
94                self.auth.set_attribute(o, fedid(hexstr=k))
95            self.auth.set_attribute(fedid(hexstr=k),fedid(hexstr=k))
96            # If the allocation has a vlan assigned, remove it from the
97            # available vlans
98            v = self.state[k].get('vlan', None)
99            if v:
100                self.vlans.discard(v)
101        self.state_lock.release()
102
103        self.lookup_access = self.lookup_access_base
104
105        self.call_GetValue= service_caller('GetValue')
106        self.call_SetValue= service_caller('SetValue')
107
108        self.soap_services = {\
109            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
110            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
111            'StartSegment': soap_handler("StartSegment", self.StartSegment),
112            'TerminateSegment': soap_handler("TerminateSegment", 
113                self.TerminateSegment),
114            }
115        self.xmlrpc_services =  {\
116            'RequestAccess': xmlrpc_handler('RequestAccess',
117                self.RequestAccess),
118            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
119                self.ReleaseAccess),
120            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
121            'TerminateSegment': xmlrpc_handler('TerminateSegment',
122                self.TerminateSegment),
123            }
124
125    def RequestAccess(self, req, fid):
126        """
127        Handle the access request.  Proxy if not for us.
128
129        Parse out the fields and make the allocations or rejections if for us,
130        otherwise, assuming we're willing to proxy, proxy the request out.
131        """
132
133        # The dance to get into the request body
134        if req.has_key('RequestAccessRequestBody'):
135            req = req['RequestAccessRequestBody']
136        else:
137            raise service_error(service_error.req, "No request!?")
138
139        found, match = self.lookup_access(req, fid)
140        # keep track of what's been added
141        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
142        aid = unicode(allocID)
143
144        self.state_lock.acquire()
145        self.state[aid] = { }
146        self.state[aid]['user'] = found
147        self.state[aid]['owners'] = [ fid ]
148        self.state[aid]['vlan'] = None
149        self.write_state()
150        self.state_lock.release()
151        self.auth.set_attribute(fid, allocID)
152        self.auth.set_attribute(allocID, allocID)
153
154        try:
155            f = open("%s/%s.pem" % (self.certdir, aid), "w")
156            print >>f, alloc_cert
157            f.close()
158        except EnvironmentError, e:
159            raise service_error(service_error.internal, 
160                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
161        return { 'allocID': { 'fedid': allocID } }
162
163    def ReleaseAccess(self, req, fid):
164        # The dance to get into the request body
165        if req.has_key('ReleaseAccessRequestBody'):
166            req = req['ReleaseAccessRequestBody']
167        else:
168            raise service_error(service_error.req, "No request!?")
169
170        # Local request
171        try:
172            if req['allocID'].has_key('localname'):
173                auth_attr = aid = req['allocID']['localname']
174            elif req['allocID'].has_key('fedid'):
175                aid = unicode(req['allocID']['fedid'])
176                auth_attr = req['allocID']['fedid']
177            else:
178                raise service_error(service_error.req,
179                        "Only localnames and fedids are understood")
180        except KeyError:
181            raise service_error(service_error.req, "Badly formed request")
182
183        self.log.debug("[access] deallocation requested for %s", aid)
184        if not self.auth.check_attribute(fid, auth_attr):
185            self.log.debug("[access] deallocation denied for %s", aid)
186            raise service_error(service_error.access, "Access Denied")
187
188        self.state_lock.acquire()
189        if self.state.has_key(aid):
190            self.log.debug("Found allocation for %s" %aid)
191            del self.state[aid]
192            self.write_state()
193            self.state_lock.release()
194            # And remove the access cert
195            cf = "%s/%s.pem" % (self.certdir, aid)
196            self.log.debug("Removing %s" % cf)
197            os.remove(cf)
198            return { 'allocID': req['allocID'] } 
199        else:
200            self.state_lock.release()
201            raise service_error(service_error.req, "No such allocation")
202
203    def extract_parameters(self, top):
204        """
205        DETER's internal networking currently supports a fixed capacity link
206        between two endpoints.  Those endpoints may specify a VPN (or list or
207        range) to use.  This extracts the and vpn preferences from the segments
208        (as attributes) and the capacity of the connection as given by the
209        substrate.  The two endpoints VLAN choices are intersected to get set
210        of VLANs that are acceptable (no VLAN requiremnets means any is
211        acceptable).
212        """
213        segments = filter(lambda x: isinstance(x, topdl.Segment), top.elements)
214
215        if len(segments) != 2 or len(top.substrates) != 1:
216            raise service_error(service_error.req,
217                    "Requests to DRAGON must have exactlty two segments " +\
218                            "and one substrate")
219
220        vlans = set()
221        for s in segments:
222            v = s.get_attribute('vlans')
223            vlans &= self.parse_vlans(v)
224
225        if len(vlans) == 0:
226            vlans = None
227
228        sub = top.substrates[0]
229        if sub.capacity:
230            cap = int(sub.capacity.rate / 1000.0)
231            if cap < 1: cap = 1
232        else:
233            cap = 100
234
235        return cap, vlans
236
237    def export_store_info(self, cf, vlan, connInfo):
238        """
239        For the export requests in the connection info, install the peer names
240        at the experiment controller via SetValue calls.
241        """
242
243        for c in connInfo:
244            for p in [ p for p in c.get('parameter', []) \
245                    if p.get('type', '') == 'output']:
246
247                if p.get('name', '') != 'vlan_id':
248                    self.log.error("Unknown export parameter: %s" % \
249                            p.get('name'))
250                    continue
251
252                k = p.get('key', None)
253                surl = p.get('store', None)
254                if surl and k:
255                    value = "%s" % vlan
256                    req = { 'name': k, 'value': value }
257                    self.call_SetValue(surl, req, cf)
258                else:
259                    self.log.error("Bad export request: %s" % p)
260
261    def StartSegment(self, req, fid):
262        err = None  # Any service_error generated after tmpdir is created
263        rv = None   # Return value from segment creation
264
265        try:
266            req = req['StartSegmentRequestBody']
267            topref = req['segmentdescription']['topdldescription']
268        except KeyError:
269            raise service_error(server_error.req, "Badly formed request")
270
271        auth_attr = req['allocID']['fedid']
272        aid = "%s" % auth_attr
273        attrs = req.get('fedAttr', [])
274        if not self.auth.check_attribute(fid, auth_attr):
275            raise service_error(service_error.access, "Access denied")
276        else:
277            # See if this is a replay of an earlier succeeded StartSegment -
278            # sometimes SSL kills 'em.  If so, replay the response rather than
279            # redoing the allocation.
280            self.state_lock.acquire()
281            retval = self.state[aid].get('started', None)
282            self.state_lock.release()
283            if retval:
284                self.log.warning("Duplicate StartSegment for %s: " % aid + \
285                        "replaying response")
286                return retval
287
288        certfile = "%s/%s.pem" % (self.certdir, aid)
289
290        if topref: topo = topdl.Topology(**topref)
291        else:
292            raise service_error(service_error.req, 
293                    "Request missing segmentdescription'")
294
295        connInfo = req.get('connection', [])
296
297        cap, vlans = self.extract_parameters(topo)
298
299        # No vlans passes in, consider any vlan acceptable
300        if not vlans: 
301            vlans = self.vlans
302
303        avail = self.vlans | vlans
304
305        if len(avail) != 0:
306            vlan_no = avail.pop()
307            self.vlans.discard(vlan_no)
308        else:
309            raise service_error(service_error.federant, "No vlan available")
310
311        self.export_store_info(certfile, vlan_no, connInfo)
312
313
314        # This annotation isn't strictly necessary, but may help in debugging
315        rtopo = topo.clone()
316        for s in rtopo.substrates:
317            s.set_attribute('vlan', vlan_no)
318
319        # Grab the log (this is some anal locking, but better safe than
320        # sorry)
321        self.state_lock.acquire()
322        self.state[aid]['vlan'] = vlan_no
323        logv = "Allocated vlan: %d" % vlan_no
324        # It's possible that the StartSegment call gets retried (!).
325        # if the 'started' key is in the allocation, we'll return it rather
326        # than redo the setup.
327        self.state[aid]['started'] = { 
328                'allocID': req['allocID'],
329                'allocationLog': logv,
330                'segmentdescription': { 'topdldescription': rtopo.to_dict() }
331                }
332        retval = copy.deepcopy(self.state[aid]['started'])
333        self.write_state()
334        self.state_lock.release()
335
336        return retval
337
338    def TerminateSegment(self, req, fid):
339        try:
340            req = req['TerminateSegmentRequestBody']
341        except KeyError:
342            raise service_error(server_error.req, "Badly formed request")
343
344        auth_attr = req['allocID']['fedid']
345        aid = "%s" % auth_attr
346
347        self.log.debug("Terminate request for %s" %aid)
348        if not self.auth.check_attribute(fid, auth_attr):
349            raise service_error(service_error.access, "Access denied")
350
351        self.state_lock.acquire()
352        if self.state.has_key(aid):
353            vlan_no = self.state[aid].get('vlan', None)
354        else:
355            vlan_no = None
356        self.state_lock.release()
357        self.log.debug("Stop segment for vlan: %s" % vlan_no)
358
359        if not vlan_no:
360            raise service_error(service_error.internal, 
361                    "Can't find assigfned vlan for for %s" % aid)
362   
363        self.vlans.add(vlan_no)
364        self.state_lock.acquire()
365        self.state[aid]['vlan'] = None
366        self.state_lock.release()
367        return { 'allocID': req['allocID'] }
Note: See TracBrowser for help on using the repository browser.