source: starbed_plugin/starbed.py @ 2f45140

Last change on this file since 2f45140 was 2f45140, checked in by ABDULLA ALWABEL <abdullaalwabel@…>, 12 years ago

Handeling ssh key properly and portal connection

  • Property mode set to 100644
File size: 20.8 KB
Line 
1#!/usr/bin/python
2
3import os,sys
4#temp
5import traceback 
6#
7import shutil
8import subprocess
9import signal
10import re
11import string
12import copy
13import pickle
14import logging
15import random
16import tempfile
17from federation.util import *
18from deter import fedid,generate_fedid
19from federation.authorizer import authorizer, abac_authorizer
20from federation.service_error import service_error
21from federation.remote_service import xmlrpc_handler, soap_handler, service_caller
22
23from deter import topdl
24from federation.access import access_base
25from federation.proof import proof
26from topdltok import klanguage
27from peer_setup import Peer
28# Make log messages disappear if noone configures a fedd logger.  This is
29# something of an incantation, but basically it creates a logger object
30# registered to fedd.access if no other module above us has.  It's an extra
31# belt for the suspenders.
32class nullHandler(logging.Handler):
33    def emit(self, record): pass
34
35fl = logging.getLogger("fedd.access")
36fl.addHandler(nullHandler())
37
38
39# The plug-in itself.
40class access(access_base):
41
42    @staticmethod 
43    def access_tuple(str):
44        """
45        Convert a string (user,passwd,project) into an access_project.
46        It returns a tuple of the form (user,passwd,project).
47        """
48        str = str.strip()
49        if str.startswith('(') and str.endswith(')') and str.count(',') == 2:
50                user,passwd,project = str[1:-1].split(',')
51                return (user.strip(), passwd.strip(), project.strip())
52        else:
53                raise self.parse_error(
54                    'Bad mapping (unbalanced parens or malformed string is should be of the form (user,passwd,project)')
55       
56    def __init__(self, config=None, auth=None):
57        self.rmanager = config.get('definition','rmanager')
58        self.smanager = config.get('definition','smanager')
59        self.pmanager = config.get('definition','pmanager')
60        self.fncp = config.get('definition','fncp')
61        self.tftpdman =config.get('definition','tftpdman')
62        self.wolagent = config.get('definition','wolagent')
63        self.encd = config.get('definition','encd')
64
65        access_base.__init__(self, config, auth)
66
67        self.ssh_port = config.get("access","ssh_port") or "22"
68        self.domain = config.get('globals','domain')
69        #Available ports for kuma!
70        ports = config.get('globals','freeports')
71        try:
72                if not ports or ports.count('-') != 1:
73                        raise TypeError("bad ports") 
74                else:
75                        a,b = ports.split('-')
76                        a = int(a)
77                        b = int(b)
78                        if a < b: 
79                                start = a
80                                end = b
81                        else: 
82                                start = b
83                                end = a
84                        if abs(start-end) < 2:
85                                raise TypeError("Bad ports")
86                        self.available_ports = set(range(start,end) )
87        except TypeError as e:
88                self.available_ports = set(range(3456,3458))
89                self.log.warning("Setting default freeports due to missing or malformed configuration. %s" % (e))
90        #Available internal and external interfaces for the commander!
91        internal_iface = config.get('globals','internal_interfaces')
92        external_iface = config.get('globals','external_interfaces')
93        if internal_iface:
94                try:
95                        self.iiface = internal_iface.split(',')
96                except BaseException as e:
97                        self.log.warning('Could not retrieve internal interfaces')
98        else:
99                self.log.warning("No internal interfaces specificed in the configration!")
100        if external_iface:
101                try:
102                        self.eiface = external_iface.split(',')
103                except BaseException as e:
104                        selflog.warning('Could not retrive external interfaces')
105        else:
106                self.log.warning("No external interfaces specificed in the configration!")
107
108        #Get reserved ports saved in state and remove them from the available set!
109        self.state_lock.acquire()
110        ###### work ####
111        for k in self.state.keys():
112                #remove any reserved ENCD or ESQP ports from available ports so kuma wouldn't pick an occupied port!
113                if 'ENCD' in self.state[k]: 
114                        self.available_ports.discard(self.state[k]['ENCD'])
115                elif 'ESQP' in self.state[k]:
116                        self.available_ports.discard(self.state[k]['ESQP'])
117                #Get reserved interfaces in state and remove them from the available set!
118                elif 'iiface' in self.state[k]:
119                        self.iiface.remove(self.state[k]['iiface'] )
120                elif 'eiface' in self.state[k]:
121                        self.eiface.remove(self.state[k]['eiface'] )
122        self.state_lock.release()
123       
124
125        # authorization information
126        self.auth_type = config.get('access', 'auth_type') \
127                or 'abac'
128        self.auth_dir = config.get('access', 'auth_dir')
129        accessdb = config.get("access", "accessdb")
130        if self.auth_type == 'abac':
131            #  Load the current authorization state
132            self.auth = abac_authorizer(load=self.auth_dir)
133            self.access = [ ]
134            if accessdb:
135                try:
136                    self.read_access(accessdb, self.access_tuple)
137                except EnvironmentError, e:
138                    self.log.error("Cannot read %s: %s" % \
139                            (config.get("access", "accessdb"), e))
140                    raise e
141        else:
142            raise service_error(service_error.internal, 
143                    "Unknown auth_type: %s" % self.auth_type)
144
145        #if self.auth_type == 'starbed':
146        #       self.log.debug("starbed authtication methond");
147        #else:
148        #    raise service_error(service_error.internal,
149        #           "Unknown auth_type: %s" % self.auth_type)
150
151        #TO DO: clean state !
152        # These dictionaries register the plug-in's local routines for handline
153        # these four messages with the server code above.  There's a version
154        # for SOAP and XMLRPC, depending on which interfaces the plugin
155        # supports.  There's rarely a technical reason not to support one or
156        # the other - the plugin code almost never deals with the transport -
157        # but if a plug-in writer wanted to disable XMLRPC, they could leave
158        # the self.xmlrpc_services dictionary empty.
159        self.soap_services = {\
160            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
161            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
162            'StartSegment': soap_handler("StartSegment", self.StartSegment),
163            'TerminateSegment': soap_handler("TerminateSegment", 
164                self.TerminateSegment),
165            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
166            'OperationSegment': soap_handler("OperationSegment",
167                self.OperationSegment),
168
169            }
170        self.xmlrpc_services =  {\
171            'RequestAccess': xmlrpc_handler('RequestAccess',
172                self.RequestAccess),
173            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
174                self.ReleaseAccess),
175            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
176            'TerminateSegment': xmlrpc_handler('TerminateSegment',
177                self.TerminateSegment),
178            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
179            'OperationSegment': xmlrpc_handler("OperationSegment",
180                self.OperationSegment),
181            }
182
183        self.call_SetValue = service_caller('SetValue', log=self.log)
184        self.call_GetValue = service_caller('GetValue', log=self.log)
185
186        self.log.debug("Starbed AC started!")
187    def RequestAccess(self, req, fid):
188        self.log.info("RequestAccess called by %s" % fid)
189        # The dance to get into the request body
190        if req.has_key('RequestAccessRequestBody'):
191            req = req['RequestAccessRequestBody']
192        else:
193            raise service_error(service_error.req, "No request!?")
194        if self.auth.import_credentials(
195                data_list=req.get('abac_credential', [])):
196            self.auth.save()
197        else:
198            self.log.debug('failed to import incoming credentials')
199        if self.auth_type == 'abac':
200            found, owners, proof = self.lookup_access(req, fid)
201        else:
202            raise service_error(service_error.internal, 
203                    'Unknown auth_type: %s' % self.auth_type)
204        # keep track of what's been added
205        allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log)
206        aid = unicode(allocID)
207        self.state_lock.acquire()
208        self.state[aid] = { }
209        self.state[aid]['user'] = found[0]
210        self.state[aid]['passwd'] = found[1]
211        self.state[aid]['project'] = found[2]
212        self.state[aid]['owners'] = owners
213        self.append_allocation_authorization(aid, 
214                ((fid, allocID), (allocID, allocID)))
215        self.write_state()
216        self.state_lock.release() 
217       
218        try:
219            f = open("%s/%s.pem" % (self.certdir, aid), "w")
220            print >>f, alloc_cert
221            f.close()
222        except EnvironmentError, e:
223            self.log.info("RequestAccess failed for by %s: internal error" \
224                    % fid)
225            raise service_error(service_error.internal, 
226                    "Can't open %s/%s : %s" % (self.certdir, aid, e))
227        self.log.debug('RequestAccess Returning allocation ID: %s' % allocID)
228        return { 'allocID': { 'fedid': allocID }, 'proof': proof.to_dict() }
229    def ReleaseAccess(self, req, fid):
230        self.log.info("ReleaseAccess called by %s" % fid)
231        if req.has_key('ReleaseAccessRequestBody'):
232                req = req['ReleaseAccessRequestBody']
233        else:
234                raise service_error(service_error.req, "No request!?")
235        try:
236                if req['allocID'].has_key('localname'):
237                        auth_attr = aid = req['allocID']['localname']
238                elif req['allocID'].has_key('fedid'):
239                        aid = unicode(req['allocID']['fedid'])
240                        auth_attr = req['allocID']['fedid']
241                else:
242                        raise service_error(service_error.req,
243                                "Only localnames and fedids are understood")
244        except KeyError:
245                raise service_error(service_error.req, "Badly formed request")
246       
247        self.log.debug("[access] deallocation requested for %s by %s" %  (aid, fid))
248        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
249                        with_proof=True)
250       
251        if not access_ok:
252                self.log.debug("[access] deallocation denied for %s", aid)
253                raise service_error(service_error.access, "Access Denied")
254       
255        self.state_lock.acquire()
256        if aid in self.state:
257                self.log.debug("Found allocation for %s" %aid)
258                self.clear_allocation_authorization(aid, state_attr='state')
259                del self.state[aid]
260                self.write_state()
261                self.state_lock.release()
262                # Remove the access cert
263                cf = "%s/%s.pem" % (self.certdir, aid)
264                self.log.debug("Removing %s" % cf)
265                os.remove(cf)
266                self.log.info("ReleaseAccess succeeded for %s" % fid)
267                return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 
268        else:
269                self.state_lock.release()
270                raise service_error(service_error.req, "No such allocation")
271    def exportInfo(self,certfile,connInfo,peer_ip):
272
273        for c in connInfo:
274                 for p in [ p for p in c.get('parameter', []) if p.get('type', '') == 'output']:
275                        if p.get('name', '') == 'peer':
276                                 k = p.get('key', None)
277                                 surl = p.get('store', None)
278                                 if surl :
279                                        if k and k.index('/') != -1:
280                                                value = peer_ip
281                                                #value = "%s.%s.%s" % \
282                                                #       (k[k.index('/')+1:], "tcarch02" , self.domain)
283                                        else:
284                                                self.log.error("Bad export request: %s" % p)
285                                                continue
286                                        self.log.debug("Setting %s to %s on %s" % \
287                                                (k, value, surl))
288                                        req = { 'name': k, 'value': value }
289                                        self.call_SetValue(surl, req, certfile)
290                                 else:
291                                        self.log.error("Bad export request: %s" % p)
292                        elif p.get('name', '') == 'ssh_port':
293                                k = p.get('key', None)
294                                surl = p.get('store', None)
295                                if surl and k:
296                                        req = { 'name': k, 'value': self.ssh_port }
297                                        self.log.debug("Setting %s to %s on %s" % \
298                                                (k, self.ssh_port, surl))
299                                        self.call_SetValue(surl, req, certfile )
300                                else:
301                                        self.log.error("Bad export request: %s" % p)
302                        else:
303                                self.log.error("Unknown export parameter: %s" % \
304                                        p.get('name'))
305                                continue
306    def fetchConfiguration(self,attrs,aid,certfile,tmpdir):
307        configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey')
308        pubkey_base = None
309        secretkey_base = None
310       
311        for a in attrs:
312                if a['attribute'] in configs:
313                        try:
314                                self.log.debug("Retrieving %s from %s to %s" % (a['attribute'], a['value'],tmpdir))
315                                get_url(a['value'], certfile, tmpdir, log=self.log)
316                        except:
317                                t, v, st = sys.exc_info()
318                                raise service_error(service_error.internal,
319                                        "Error retrieving %s: %s" % (a.get('value', ""), v))
320                if a['attribute'] == 'ssh_pubkey':
321                        pubkey_base = a['value'].rpartition('/')[2]
322                if a['attribute'] == 'ssh_secretkey':
323                        secretkey_base = a['value'].rpartition('/')[2]
324        if not pubkey_base:
325                raise service_error(service_error.req, 
326                        "No public key attribute")
327        if not secretkey_base:
328                raise service_error(service_error.req, 
329                        "No secret key attribute")
330        return (pubkey_base,secretkey_base)
331    def getPortalInfo(self,connInfo):
332        peer = None
333        conn_type = None
334                               
335    def StartSegment(self, req, fid):
336        #self.log.debug("data = %s" %(req));
337        self.log.debug('StartSegment called by %s' % (fid) );
338        try:
339            req = req['StartSegmentRequestBody']
340            # Get the request topology.  If not present, a KeyError is thrown.
341            topref = req['segmentdescription']['topdldescription']
342            #self.log.debug("topref = %s " % (topref));
343            # The fedid of the allocation we're attaching resources to
344            auth_attr = req['allocID']['fedid']
345        except KeyError:
346            raise service_error(server_error.req, "Badly formed request")
347
348        connInfo = req.get('connection', [])
349        attrs = req.get('fedAttr', [])
350        # String version of the allocation ID for keying
351        aid = "%s" % auth_attr
352        # Authorization check
353        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
354                with_proof=True)
355        #self.log.debug("lllllacc = %s, proffe= %s" %( access_ok,proof.to_dict()))
356        if not access_ok:
357            raise service_error(service_error.access, "Access denied", 
358                    proof=proof)
359        else:
360            # See if this is a replay of an earlier succeeded StartSegment -
361            # sometimes SSL kills 'em.  If so, replay the response rather than
362            # redoing the allocation.
363            self.state_lock.acquire()
364            #retval = self.state[aid].get('started', None)
365            #CAUSOION
366            retval = 0
367            self.state_lock.release()
368            if retval:
369                self.log.warning(
370                        "[StartSegment] Duplicate StartSegment for %s: " \
371                                % aid + \
372                        "replaying response")
373                return retval
374
375        certfile = "%s/%s.pem" % (self.certdir, aid)
376
377        # Convert the topology into topdl data structures.  Again, the
378        # skeletion doesn't do anything with it, but this is how one parses a
379        # topology request.
380        if topref: topo = topdl.Topology(**topref)
381        else:
382            raise service_error(service_error.req, 
383                    "Request missing segmentdescription'")
384        #rmanager = "192.168.1.10:1234"
385        #smanager = "192.168.1.10:1240"
386        #pmanager = "192.168.1.10:1242"
387        #fncp = "192.168.1.10"
388        #tftpdman="192.168.1.10"
389        #wolagent="192.168.1.10:5959:192.168.0.0/16"
390        #encd="192.168.1.21"
391        #user = "alwabel"
392        #project = "lace"
393        if len(self.available_ports) > 1:
394                self.state_lock.acquire()
395                #Pick two ports for ENCD and  ESQP
396                ENCD = random.choice([ i for i in self.available_ports])
397                ESQP = random.choice([ i for i in self.available_ports])
398                self.available_ports.discard(ENCD)
399                self.available_ports.discard(ESQP)
400                self.state[aid]['ENCD'] = ENCD
401                self.state[aid]['ESQP'] = ESQP
402                self.write_state()
403                self.state_lock.release()
404        else:
405                self.log.debug("[StartSegment] There is no enough ports for kuma!")
406                raise service_error(service_error.federant, "No available Ports for Kuma")
407        k = klanguage(self.rmanager,self.smanager,self.pmanager,self.fncp,self.state[aid]['user'],self.state[aid]['project'],self.tftpdman,self.wolagent,self.encd)
408        k.k_from_topology(topo)
409        level, kname = tempfile.mkstemp()
410        k.to_file(kname)
411       
412        pubkey_base = None
413        secretkey_base = None
414        tmpdir = None
415        peer = None
416
417        if k.isPortal():
418                self.log.debug("Setting up a portal!")
419                try:   
420                        #pick interface
421                        self.state_lock.acquire()
422                        if len(self.iiface) > 1:
423                                IIFACE = random.choice([i for i in self.iiface] )
424                                self.iiface.remove(IIFACE)
425                                self.state[aid]['iiface'] = IIFACE
426                                ip,substrate = k.getPortalInfo()
427                                self.log.debug("IIFACE = %s, ip = %s" ,IIFACE,ip)
428                                Peer.setIP(IIFACE,ip)
429                                #set interface IP!
430                                #bridge name always have the following convention interfacenamebr
431                                Peer.setBridge("%sbr" % (IIFACE) )
432                                Peer.addIfaceToBridge(IIFACE,"%sbr" % (IIFACE) )
433
434                        else: 
435                                raise BaseException("No available internal interfaces!")
436                        if len(self.eiface) > 1:
437                                EIFACE = random.choice([i for i in self.eiface] )
438                                self.eiface.remove(EIFACE)
439                                self.state[aid]['eiface'] = EIFACE
440                                peer_ip = Peer.getIP(EIFACE)
441                        else:
442                                raise BaseException("No available external interfaces!")
443                        self.state_lock.release()
444                        tmpdir = tempfile.mkdtemp(prefix="access-")
445                        pubkey_base, secretkey_base = self.fetchConfiguration(attrs,aid,certfile,tmpdir)
446                        self.exportInfo(certfile,connInfo,peer_ip)
447                        self.import_store_info(certfile, connInfo)
448                        peer = connInfo[0].get('peer')
449                        con_type = connInfo[0].get('type')
450                        self.log.debug("peer = %s, type = %s, ", str(peer),con_type)
451                except BaseException as e:
452                        self.state_lock.release()
453                        self.log.error("Problem reteriving peer information")
454                        traceback.print_exc(file=sys.stdout)   
455                        raise service_error(service_error.internal, "Cannot handel peer information!")
456        proc = subprocess.Popen(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)])
457        pid = proc.pid
458        #pid = 100000
459        self.log.debug(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)])
460        #os.unlink(kname)
461        # The attributes of the request.  Not used by this plug-in, but that's
462        # where they are.
463        attrs = req.get('fedAttr', [])
464
465        # Gather connection information.  Used to send messages to those
466        # waiting.
467        connInfo = req.get('connection', [])
468
469        proce1 = None
470        if k.isPortal():
471                try:
472                        proce1 = subprocess.Popen(['./peer_setup.py','-d',peer,'-k','%s/%s'% (tmpdir,pubkey_base),'-b',"%sbr" % (IIFACE),'-i', IIFACE])
473                except:
474                        self.log.warning("Could not start connection with peer!") 
475        self.state_lock.acquire()
476        # It's possible that the StartSegment call gets retried (!).
477        # if the 'started' key is in the allocation, we'll return it rather
478        # than redo the setup.  The integer allocation was saved when we made
479        # it.
480        if pubkey_base:
481                self.state[aid]['pubkey_base'] = pubkey_base
482        if secretkey_base:
483                self.state[aid]['secretkey_base'] = secretkey_base
484        if tmpdir:
485                self.state[aid]['tmpdir'] = tmpdir
486        if peer:
487                self.state[aid]['peer'] = peer
488        if proce1:
489                self.state[aid]['ssh_pid'] = proce1.pid
490        self.state[aid]['pid'] = pid
491        self.state[aid]['descfile'] = kname
492        self.state[aid]['started'] = { 
493                'allocID': req['allocID'],
494                'allocationLog': "Allocatation complete",
495                'segmentdescription': { 'topdldescription': topo.to_dict() },
496                #'proof' :  proof("me", "faber","attr",[]).to_dict(),
497                'proof': proof.to_dict(),
498                #'pid' : pid
499                'fedAttr': [
500                        { 'attribute': 'domain', 'value': self.domain } , 
501                        {'attribute': 'dragon', 'value' : '128.9.168.133'},
502                           ]
503                }
504        retval = copy.deepcopy(self.state[aid]['started'])
505        self.write_state()
506        self.state_lock.release()
507        return retval
508
509    def TerminateSegment(self, req, fid):
510        self.log.debug("Terminate Segment");
511        # Gather the same access information as for Start Segment
512        try:
513            req = req['TerminateSegmentRequestBody']
514        except KeyError:
515            raise service_error(server_error.req, "Badly formed request")
516
517        auth_attr = req['allocID']['fedid']
518        aid = "%s" % auth_attr
519
520        self.log.debug("Terminate request for %s" %aid)
521        # Check authorization
522        access_ok, proof = self.auth.check_attribute(fid, auth_attr, 
523                with_proof=True)
524        if not access_ok:
525            raise service_error(service_error.access, "Access denied", 
526                    proof=proof)
527
528        # Authorized: remove the integer from the allocation.  A more complex
529        # plug in would interface with the underlying facility to turn off the
530        # experiment here.
531        self.state_lock.acquire()
532        if aid in self.state:
533            pid = self.state[aid].get('pid', None)
534            if pid:
535                try:
536                        self.log.debug("kill process %s " % (pid))
537                        os.kill(int(pid), signal.SIGTERM)
538                        #os.kill(int(pid),9)
539                        os.wait()
540                except OSError as ex:
541                        self.log.warning("Cannot kill process %s " % (pid))
542            if 'ssh_pid' in self.state[aid]:
543                try:
544                        os.kill(int(self.state[aid]['ssh_pid'] ) )
545                        os.wait()
546                except OSError as ex:
547                        self.log.warning("Cannot kill process") 
548            descfile = self.state[aid].get('descfile',None)
549            if descfile:
550                os.unlink(descfile)
551                del self.state[aid]['descfile']
552            if 'ENCD' in self.state[aid]:
553                del self.state[aid]['ENCD']
554            if 'ESQP' in self.state[aid]:
555                del self.state[aid]['ESQP']
556            del self.state[aid]['started']
557
558            if 'eiface' in self.state[aid]:
559                del self.state[aid]['eiface']
560            if 'iiface' in self.state[aid]:
561                try:
562                        Peer.delBridge("%sbr" % self.state[aid]['iiface'] )
563                except:
564                        self.log.warning("Problem removing bridge %sbr" % self.state[aid]['eiface'] )
565                del self.state[aid]['iiface']
566            if 'pubkey_base' in self.state[aid]:
567                del self.state[aid]['pubkey_base']
568            if 'secretkey_base' in self.state[aid]:
569                del self.state[aid]['secretkey_base']
570            if 'tmpdir' in self.state[aid]:
571                try:   
572                        shutil.rmtree(self.state[aid]['tmpdir'])
573                except  OSError as e:
574                        self.log.warning("Problem removing directory %s" % (self.state[aid]['tmpdir'] ))
575                del self.state[aid]['tmpdir']
576            if 'peer' in self.state[aid]:
577                del self.state[aid]['peer']
578            self.write_state()
579        self.state_lock.release()
580   
581        return { 'allocID': req['allocID'], 'proof': proof.to_dict() }
582    def InfoSegment(self, req, fid):
583        self.log.info("InfoSegment called by %s" % fid)
584
585    def OperationSegment(self, req, fid):
586        self.log.info("OperationSegment called by %s" % (fid) ) 
587        self.log.debug("req = %s " % req )
Note: See TracBrowser for help on using the repository browser.