source: starbed_plugin/starbed.py @ d070d9f

Last change on this file since d070d9f was d070d9f, checked in by ABDULLA ALWABEL <abdullaalwabel@…>, 12 years ago

Finalize cleaning!

  • Property mode set to 100644
File size: 13.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4import subprocess
5import signal
6import re
7import string
8import copy
9import pickle
10import logging
11import random
12import tempfile
13from federation.util import *
14from deter import fedid,generate_fedid
15from federation.authorizer import authorizer, abac_authorizer
16from federation.service_error import service_error
17from federation.remote_service import xmlrpc_handler, soap_handler, service_caller
18
19from deter import topdl
20from federation.access import access_base
21from federation.proof import proof
22from topdltok import klanguage
23
24# Make log messages disappear if noone configures a fedd logger.  This is
25# something of an incantation, but basically it creates a logger object
26# registered to fedd.access if no other module above us has.  It's an extra
27# belt for the suspenders.
28class nullHandler(logging.Handler):
29    def emit(self, record): pass
30
31fl = logging.getLogger("fedd.access")
32fl.addHandler(nullHandler())
33
34
35# The plug-in itself.
36class access(access_base):
37
38    @staticmethod 
39    def access_tuple(str):
40        """
41        Convert a string (user,passwd,project) into an access_project.
42        It returns a tuple of the form (user,passwd,project).
43        """
44        str = str.strip()
45        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
46                user,passwd,project = str[1:-1].split(',')
47                return (user.strip(), passwd.strip(), project.strip())
48        else:
49                raise self.parse_error(
50                    'Bad mapping (unbalanced parens or malformed string is should be of the form (user,passwd,project)')
51       
52    def __init__(self, config=None, auth=None):
53        self.rmanager = config.get('definition','rmanager')
54        self.smanager = config.get('definition','smanager')
55        self.pmanager = config.get('definition','pmanager')
56        self.fncp = config.get('definition','fncp')
57        self.tftpdman =config.get('definition','tftpdman')
58        self.wolagent = config.get('definition','wolagent')
59        self.encd = config.get('definition','encd')
60
61        access_base.__init__(self, config, auth)
62
63        #Available ports for kuma!
64        ports = config.get('globals','freeports')
65        self.domain = config.get('globals','domain')
66        try:
67                if not ports or ports.count('-') != 1:
68                        raise TypeError("bad ports") 
69                else:
70                        a,b = ports.split('-')
71                        a = int(a)
72                        b = int(b)
73                        if a < b: 
74                                start = a
75                                end = b
76                        else: 
77                                start = b
78                                end = a
79                        if abs(start-end) < 2:
80                                raise TypeError("Bad ports")
81                        self.available_ports = set(range(start,end) )
82        except TypeError as e:
83                self.available_ports = set(range(3456,3458))
84                self.log.warning("Setting default freeports due to missing or malformed configuration. %s" % (e))
85        #Get reserved ports saved in state and remove them from the available set!
86        self.state_lock.acquire()
87        ###### work ####
88        for k in self.state.keys():
89                #remove any reserved ENCD or ESQP ports from available ports so kuma wouldn't pick an occupied port!
90                if 'ENCD' in self.state[k]: 
91                        self.available_ports.discard(self.state[k]['ENCD'])
92                elif 'ESQP' in self.state[k]:
93                        self.available_ports.discard(self.state[k]['ESQP'])
94        self.state_lock.release()
95       
96
97        # authorization information
98        self.auth_type = config.get('access', 'auth_type') \
99                or 'abac'
100        self.auth_dir = config.get('access', 'auth_dir')
101        accessdb = config.get("access", "accessdb")
102        if self.auth_type == 'abac':
103            #  Load the current authorization state
104            self.auth = abac_authorizer(load=self.auth_dir)
105            self.access = [ ]
106            if accessdb:
107                try:
108                    self.read_access(accessdb, self.access_tuple)
109                except EnvironmentError, e:
110                    self.log.error("Cannot read %s: %s" % \
111                            (config.get("access", "accessdb"), e))
112                    raise e
113        else:
114            raise service_error(service_error.internal, 
115                    "Unknown auth_type: %s" % self.auth_type)
116
117        #if self.auth_type == 'starbed':
118        #       self.log.debug("starbed authtication methond");
119        #else:
120        #    raise service_error(service_error.internal,
121        #           "Unknown auth_type: %s" % self.auth_type)
122
123        #TO DO: clean state !
124        # These dictionaries register the plug-in's local routines for handline
125        # these four messages with the server code above.  There's a version
126        # for SOAP and XMLRPC, depending on which interfaces the plugin
127        # supports.  There's rarely a technical reason not to support one or
128        # the other - the plugin code almost never deals with the transport -
129        # but if a plug-in writer wanted to disable XMLRPC, they could leave
130        # the self.xmlrpc_services dictionary empty.
131        self.soap_services = {\
132            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
133            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
134            'StartSegment': soap_handler("StartSegment", self.StartSegment),
135            'TerminateSegment': soap_handler("TerminateSegment", 
136                self.TerminateSegment),
137            }
138        self.xmlrpc_services =  {\
139            'RequestAccess': xmlrpc_handler('RequestAccess',
140                self.RequestAccess),
141            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
142                self.ReleaseAccess),
143            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
144            'TerminateSegment': xmlrpc_handler('TerminateSegment',
145                self.TerminateSegment),
146            }
147        self.log.debug("Starbed AC started!")
148    def RequestAccess(self, req, fid):
149        self.log.info("RequestAccess called by %s" % fid)
150        # The dance to get into the request body
151        if req.has_key('RequestAccessRequestBody'):
152            req = req['RequestAccessRequestBody']
153        else:
154            raise service_error(service_error.req, "No request!?")
155        if self.auth.import_credentials(
156                data_list=req.get('abac_credential', [])):
157            self.auth.save()
158        else:
159            self.log.debug('failed to import incoming credentials')
160        if self.auth_type == 'abac':
161            found, owners, proof = self.lookup_access(req, fid)
162        else:
163            raise service_error(service_error.internal, 
164                    'Unknown auth_type: %s' % self.auth_type)
165        # keep track of what's been added
166        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
167        aid = unicode(allocID)
168        self.state_lock.acquire()
169        self.state[aid] = { }
170        self.state[aid]['user'] = found[0]
171        self.state[aid]['passwd'] = found[1]
172        self.state[aid]['project'] = found[2]
173        self.state[aid]['owners'] = owners
174        self.append_allocation_authorization(aid, 
175                ((fid, allocID), (allocID, allocID)))
176        self.write_state()
177        self.state_lock.release() 
178       
179        try:
180            f = open("%s/%s.pem" % (self.certdir, aid), "w")
181            print >>f, alloc_cert
182            f.close()
183        except EnvironmentError, e:
184            self.log.info("RequestAccess failed for by %s: internal error" \
185                    % fid)
186            raise service_error(service_error.internal, 
187                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
188        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
189        return { 'allocID': { 'fedid': allocID }, 'proof': proof.to_dict() }
190    def ReleaseAccess(self, req, fid):
191        self.log.info("ReleaseAccess called by %s" % fid)
192        if req.has_key('ReleaseAccessRequestBody'):
193                req = req['ReleaseAccessRequestBody']
194        else:
195                raise service_error(service_error.req, "No request!?")
196        try:
197                if req['allocID'].has_key('localname'):
198                        auth_attr = aid = req['allocID']['localname']
199                elif req['allocID'].has_key('fedid'):
200                        aid = unicode(req['allocID']['fedid'])
201                        auth_attr = req['allocID']['fedid']
202                else:
203                        raise service_error(service_error.req,
204                                "Only localnames and fedids are understood")
205        except KeyError:
206                raise service_error(service_error.req, "Badly formed request")
207       
208        self.log.debug("[access] deallocation requested for %s by %s" %  (aid, fid))
209        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
210                        with_proof=True)
211       
212        if not access_ok:
213                self.log.debug("[access] deallocation denied for %s", aid)
214                raise service_error(service_error.access, "Access Denied")
215       
216        self.state_lock.acquire()
217        if aid in self.state:
218                self.log.debug("Found allocation for %s" %aid)
219                self.clear_allocation_authorization(aid, state_attr='state')
220                del self.state[aid]
221                self.write_state()
222                self.state_lock.release()
223                # Remove the access cert
224                cf = "%s/%s.pem" % (self.certdir, aid)
225                self.log.debug("Removing %s" % cf)
226                os.remove(cf)
227                self.log.info("ReleaseAccess succeeded for %s" % fid)
228                return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
229        else:
230                self.state_lock.release()
231                raise service_error(service_error.req, "No such allocation")
232
233    def StartSegment(self, req, fid):
234        #self.log.debug("data = %s" %(req));
235        self.log.debug('StartSegment called by %s' % (fid) );
236        try:
237            req = req['StartSegmentRequestBody']
238            self.log.debug(req)
239            # Get the request topology.  If not present, a KeyError is thrown.
240            topref = req['segmentdescription']['topdldescription']
241            #self.log.debug("topref = %s " % (topref));
242            # The fedid of the allocation we're attaching resources to
243            auth_attr = req['allocID']['fedid']
244        except KeyError:
245            raise service_error(server_error.req, "Badly formed request")
246
247        # String version of the allocation ID for keying
248        aid = "%s" % auth_attr
249        # Authorization check
250        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
251                with_proof=True)
252        #self.log.debug("lllllacc = %s, proffe= %s" %( access_ok,proof.to_dict()))
253        if not access_ok:
254            raise service_error(service_error.access, "Access denied", 
255                    proof=proof)
256        else:
257            # See if this is a replay of an earlier succeeded StartSegment -
258            # sometimes SSL kills 'em.  If so, replay the response rather than
259            # redoing the allocation.
260            self.state_lock.acquire()
261            #retval = self.state[aid].get('started', None)
262            #CAUSOION
263            retval = 0
264            self.state_lock.release()
265            if retval:
266                self.log.warning(
267                        "[StartSegment] Duplicate StartSegment for %s: " \
268                                % aid + \
269                        "replaying response")
270                return retval
271
272        certfile = "%s/%s.pem" % (self.certdir, aid)
273
274        # Convert the topology into topdl data structures.  Again, the
275        # skeletion doesn't do anything with it, but this is how one parses a
276        # topology request.
277        if topref: topo = topdl.Topology(**topref)
278        else:
279            raise service_error(service_error.req, 
280                    "Request missing segmentdescription'")
281        #rmanager = "192.168.1.10:1234"
282        #smanager = "192.168.1.10:1240"
283        #pmanager = "192.168.1.10:1242"
284        #fncp = "192.168.1.10"
285        #tftpdman="192.168.1.10"
286        #wolagent="192.168.1.10:5959:192.168.0.0/16"
287        #encd="192.168.1.21"
288        #user = "alwabel"
289        #project = "lace"
290        if len(self.available_ports) > 1:
291                self.state_lock.acquire()
292                #Pick two ports for ENCD and  ESQP
293                ENCD = random.choice([ i for i in self.available_ports])
294                ESQP = random.choice([ i for i in self.available_ports])
295                self.available_ports.discard(ENCD)
296                self.available_ports.discard(ESQP)
297                self.state[aid]['ENCD'] = ENCD
298                self.state[aid]['ESQP'] = ESQP
299                self.write_state()
300                self.state_lock.release()
301        else:
302                self.log.debug("[StartSegment] There is no enough ports for kuma!")
303                raise service_error(service_error.federant, "No available Ports for Kuma")
304        k = klanguage(self.rmanager,self.smanager,self.pmanager,self.fncp,self.state[aid]['user'],self.state[aid]['project'],self.tftpdman,self.wolagent,self.encd)
305        k.k_from_topology(topo)
306        level, kname = tempfile.mkstemp()
307        k.to_file(kname)
308        #proc = subprocess.Popen(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)])
309        #pid = proc.pid
310        pid = 100000
311        self.log.debug(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)])
312        #os.unlink(kname)
313        # The attributes of the request.  Not used by this plug-in, but that's
314        # where they are.
315        attrs = req.get('fedAttr', [])
316
317        # Gather connection information.  Used to send messages to those
318        # waiting.
319        connInfo = req.get('connection', [])
320
321        self.state_lock.acquire()
322        # It's possible that the StartSegment call gets retried (!).
323        # if the 'started' key is in the allocation, we'll return it rather
324        # than redo the setup.  The integer allocation was saved when we made
325        # it.
326        self.state[aid]['pid'] = pid
327        self.state[aid]['descfile'] = kname
328        self.state[aid]['started'] = { 
329                'allocID': req['allocID'],
330                'allocationLog': "Allocatation complete",
331                'segmentdescription': { 'topdldescription': topo.to_dict() },
332                #'proof' :  proof("me", "faber","attr",[]).to_dict(),
333                'proof': proof.to_dict(),
334                #'pid' : pid
335                'fedAttr': [
336                        { 'attribute': 'domain', 'value': self.domain } , 
337                        {'attribute': 'dragon', 'value' : '128.9.168.133'},
338                           ]
339                }
340        retval = copy.deepcopy(self.state[aid]['started'])
341        self.write_state()
342        self.state_lock.release()
343        return retval
344
345    def TerminateSegment(self, req, fid):
346        self.log.debug("Terminate Segment");
347        # Gather the same access information as for Start Segment
348        try:
349            req = req['TerminateSegmentRequestBody']
350        except KeyError:
351            raise service_error(server_error.req, "Badly formed request")
352
353        auth_attr = req['allocID']['fedid']
354        aid = "%s" % auth_attr
355
356        self.log.debug("Terminate request for %s" %aid)
357        # Check authorization
358        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
359                with_proof=True)
360        if not access_ok:
361            raise service_error(service_error.access, "Access denied", 
362                    proof=proof)
363
364        # Authorized: remove the integer from the allocation.  A more complex
365        # plug in would interface with the underlying facility to turn off the
366        # experiment here.
367        self.state_lock.acquire()
368        if aid in self.state:
369            pid = self.state[aid].get('pid', None)
370            if pid:
371                try:
372                        self.log.debug("kill process %s " % (pid))
373                        os.kill(int(pid), signal.SIGTERM)
374                        #os.kill(int(pid),9)
375                        os.wait()
376                except OSError as ex:
377                        self.log.warning("Cannot kill process %s " % (pid))
378            descfile = self.state[aid].get('descfile',None)
379            if descfile:
380                #os.unlink(descfile)
381                del self.state[aid]['descfile']
382            if 'ENCD' in self.state[aid]:
383                del self.state[aid]['ENCD']
384            if 'ESQP' in self.state[aid]:
385                del self.state[aid]['ESQP']
386            del self.state[aid]['started']
387            self.write_state()
388        self.state_lock.release()
389   
390        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
391    def InfoSegment(self, req, fid):
392        self.log.info("InfoSegment called by %s" % fid)
393
Note: See TracBrowser for help on using the repository browser.