#!/usr/bin/python import os,sys #temp import traceback # import shutil import subprocess import signal import re import string import copy import pickle import logging import random import tempfile from federation.util import * from deter import fedid,generate_fedid from federation.authorizer import authorizer, abac_authorizer from federation.service_error import service_error from federation.remote_service import xmlrpc_handler, soap_handler, service_caller from deter import topdl from federation.access import access_base from federation.proof import proof from topdltok import klanguage from peer_setup import Peer # Make log messages disappear if noone configures a fedd logger. This is # something of an incantation, but basically it creates a logger object # registered to fedd.access if no other module above us has. It's an extra # belt for the suspenders. class nullHandler(logging.Handler): def emit(self, record): pass fl = logging.getLogger("fedd.access") fl.addHandler(nullHandler()) # The plug-in itself. class access(access_base): @staticmethod def access_tuple(str): """ Convert a string (user,passwd,project) into an access_project. It returns a tuple of the form (user,passwd,project). """ str = str.strip() if str.startswith('(') and str.endswith(')') and str.count(',') == 2: user,passwd,project = str[1:-1].split(',') return (user.strip(), passwd.strip(), project.strip()) else: raise self.parse_error( 'Bad mapping (unbalanced parens or malformed string is should be of the form (user,passwd,project)') def __init__(self, config=None, auth=None): self.rmanager = config.get('definition','rmanager') self.smanager = config.get('definition','smanager') self.pmanager = config.get('definition','pmanager') self.fncp = config.get('definition','fncp') self.tftpdman =config.get('definition','tftpdman') self.wolagent = config.get('definition','wolagent') self.encd = config.get('definition','encd') access_base.__init__(self, config, auth) self.ssh_port = config.get("access","ssh_port") or "22" self.domain = config.get('globals','domain') #Available ports for kuma! ports = config.get('globals','freeports') try: if not ports or ports.count('-') != 1: raise TypeError("bad ports") else: a,b = ports.split('-') a = int(a) b = int(b) if a < b: start = a end = b else: start = b end = a if abs(start-end) < 2: raise TypeError("Bad ports") self.available_ports = set(range(start,end) ) except TypeError as e: self.available_ports = set(range(3456,3458)) self.log.warning("Setting default freeports due to missing or malformed configuration. %s" % (e)) #Available internal and external interfaces for the commander! internal_iface = config.get('globals','internal_interfaces') external_iface = config.get('globals','external_interfaces') if internal_iface: try: self.iiface = internal_iface.split(',') except BaseException as e: self.log.warning('Could not retrieve internal interfaces') else: self.log.warning("No internal interfaces specificed in the configration!") if external_iface: try: self.eiface = external_iface.split(',') except BaseException as e: selflog.warning('Could not retrive external interfaces') else: self.log.warning("No external interfaces specificed in the configration!") #Get reserved ports saved in state and remove them from the available set! self.state_lock.acquire() ###### work #### for k in self.state.keys(): #remove any reserved ENCD or ESQP ports from available ports so kuma wouldn't pick an occupied port! if 'ENCD' in self.state[k]: self.available_ports.discard(self.state[k]['ENCD']) elif 'ESQP' in self.state[k]: self.available_ports.discard(self.state[k]['ESQP']) #Get reserved interfaces in state and remove them from the available set! elif 'iiface' in self.state[k]: self.iiface.remove(self.state[k]['iiface'] ) elif 'eiface' in self.state[k]: self.eiface.remove(self.state[k]['eiface'] ) self.state_lock.release() # authorization information self.auth_type = config.get('access', 'auth_type') \ or 'abac' self.auth_dir = config.get('access', 'auth_dir') accessdb = config.get("access", "accessdb") if self.auth_type == 'abac': # Load the current authorization state self.auth = abac_authorizer(load=self.auth_dir) self.access = [ ] if accessdb: try: self.read_access(accessdb, self.access_tuple) except EnvironmentError, e: self.log.error("Cannot read %s: %s" % \ (config.get("access", "accessdb"), e)) raise e else: raise service_error(service_error.internal, "Unknown auth_type: %s" % self.auth_type) #if self.auth_type == 'starbed': # self.log.debug("starbed authtication methond"); #else: # raise service_error(service_error.internal, # "Unknown auth_type: %s" % self.auth_type) #TO DO: clean state ! # These dictionaries register the plug-in's local routines for handline # these four messages with the server code above. There's a version # for SOAP and XMLRPC, depending on which interfaces the plugin # supports. There's rarely a technical reason not to support one or # the other - the plugin code almost never deals with the transport - # but if a plug-in writer wanted to disable XMLRPC, they could leave # the self.xmlrpc_services dictionary empty. self.soap_services = {\ 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 'StartSegment': soap_handler("StartSegment", self.StartSegment), 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), 'InfoSegment': soap_handler("InfoSegment", self.InfoSegment), 'OperationSegment': soap_handler("OperationSegment", self.OperationSegment), } self.xmlrpc_services = {\ 'RequestAccess': xmlrpc_handler('RequestAccess', self.RequestAccess), 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', self.ReleaseAccess), 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 'TerminateSegment': xmlrpc_handler('TerminateSegment', self.TerminateSegment), 'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment), 'OperationSegment': xmlrpc_handler("OperationSegment", self.OperationSegment), } self.call_SetValue = service_caller('SetValue', log=self.log) self.call_GetValue = service_caller('GetValue', log=self.log) self.log.debug("Starbed AC started!") def RequestAccess(self, req, fid): self.log.info("RequestAccess called by %s" % fid) # The dance to get into the request body if req.has_key('RequestAccessRequestBody'): req = req['RequestAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") if self.auth.import_credentials( data_list=req.get('abac_credential', [])): self.auth.save() else: self.log.debug('failed to import incoming credentials') if self.auth_type == 'abac': found, owners, proof = self.lookup_access(req, fid) else: raise service_error(service_error.internal, 'Unknown auth_type: %s' % self.auth_type) # keep track of what's been added allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) aid = unicode(allocID) self.state_lock.acquire() self.state[aid] = { } self.state[aid]['user'] = found[0] self.state[aid]['passwd'] = found[1] self.state[aid]['project'] = found[2] self.state[aid]['owners'] = owners self.append_allocation_authorization(aid, ((fid, allocID), (allocID, allocID))) self.write_state() self.state_lock.release() try: f = open("%s/%s.pem" % (self.certdir, aid), "w") print >>f, alloc_cert f.close() except EnvironmentError, e: self.log.info("RequestAccess failed for by %s: internal error" \ % fid) raise service_error(service_error.internal, "Can't open %s/%s : %s" % (self.certdir, aid, e)) self.log.debug('RequestAccess Returning allocation ID: %s' % allocID) return { 'allocID': { 'fedid': allocID }, 'proof': proof.to_dict() } def ReleaseAccess(self, req, fid): self.log.info("ReleaseAccess called by %s" % fid) if req.has_key('ReleaseAccessRequestBody'): req = req['ReleaseAccessRequestBody'] else: raise service_error(service_error.req, "No request!?") try: if req['allocID'].has_key('localname'): auth_attr = aid = req['allocID']['localname'] elif req['allocID'].has_key('fedid'): aid = unicode(req['allocID']['fedid']) auth_attr = req['allocID']['fedid'] else: raise service_error(service_error.req, "Only localnames and fedids are understood") except KeyError: raise service_error(service_error.req, "Badly formed request") self.log.debug("[access] deallocation requested for %s by %s" % (aid, fid)) access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: self.log.debug("[access] deallocation denied for %s", aid) raise service_error(service_error.access, "Access Denied") self.state_lock.acquire() if aid in self.state: self.log.debug("Found allocation for %s" %aid) self.clear_allocation_authorization(aid, state_attr='state') del self.state[aid] self.write_state() self.state_lock.release() # Remove the access cert cf = "%s/%s.pem" % (self.certdir, aid) self.log.debug("Removing %s" % cf) os.remove(cf) self.log.info("ReleaseAccess succeeded for %s" % fid) return { 'allocID': req['allocID'], 'proof': proof.to_dict() } else: self.state_lock.release() raise service_error(service_error.req, "No such allocation") def exportInfo(self,certfile,connInfo,peer_ip): for c in connInfo: for p in [ p for p in c.get('parameter', []) if p.get('type', '') == 'output']: if p.get('name', '') == 'peer': k = p.get('key', None) surl = p.get('store', None) if surl : if k and k.index('/') != -1: value = peer_ip #value = "%s.%s.%s" % \ # (k[k.index('/')+1:], "tcarch02" , self.domain) else: self.log.error("Bad export request: %s" % p) continue self.log.debug("Setting %s to %s on %s" % \ (k, value, surl)) req = { 'name': k, 'value': value } self.call_SetValue(surl, req, certfile) else: self.log.error("Bad export request: %s" % p) elif p.get('name', '') == 'ssh_port': k = p.get('key', None) surl = p.get('store', None) if surl and k: req = { 'name': k, 'value': self.ssh_port } self.log.debug("Setting %s to %s on %s" % \ (k, self.ssh_port, surl)) self.call_SetValue(surl, req, certfile ) else: self.log.error("Bad export request: %s" % p) else: self.log.error("Unknown export parameter: %s" % \ p.get('name')) continue def fetchConfiguration(self,attrs,aid,certfile,tmpdir): configs = ('hosts', 'ssh_pubkey', 'ssh_secretkey') pubkey_base = None secretkey_base = None for a in attrs: if a['attribute'] in configs: try: self.log.debug("Retrieving %s from %s to %s" % (a['attribute'], a['value'],tmpdir)) get_url(a['value'], certfile, tmpdir, log=self.log) except: t, v, st = sys.exc_info() raise service_error(service_error.internal, "Error retrieving %s: %s" % (a.get('value', ""), v)) if a['attribute'] == 'ssh_pubkey': pubkey_base = a['value'].rpartition('/')[2] if a['attribute'] == 'ssh_secretkey': secretkey_base = a['value'].rpartition('/')[2] if not pubkey_base: raise service_error(service_error.req, "No public key attribute") if not secretkey_base: raise service_error(service_error.req, "No secret key attribute") return (pubkey_base,secretkey_base) def getPortalInfo(self,connInfo): peer = None conn_type = None def StartSegment(self, req, fid): #self.log.debug("data = %s" %(req)); self.log.debug('StartSegment called by %s' % (fid) ); try: req = req['StartSegmentRequestBody'] # Get the request topology. If not present, a KeyError is thrown. topref = req['segmentdescription']['topdldescription'] #self.log.debug("topref = %s " % (topref)); # The fedid of the allocation we're attaching resources to auth_attr = req['allocID']['fedid'] except KeyError: raise service_error(server_error.req, "Badly formed request") connInfo = req.get('connection', []) attrs = req.get('fedAttr', []) # String version of the allocation ID for keying aid = "%s" % auth_attr # Authorization check access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) #self.log.debug("lllllacc = %s, proffe= %s" %( access_ok,proof.to_dict())) if not access_ok: raise service_error(service_error.access, "Access denied", proof=proof) else: # See if this is a replay of an earlier succeeded StartSegment - # sometimes SSL kills 'em. If so, replay the response rather than # redoing the allocation. self.state_lock.acquire() #retval = self.state[aid].get('started', None) #CAUSOION retval = 0 self.state_lock.release() if retval: self.log.warning( "[StartSegment] Duplicate StartSegment for %s: " \ % aid + \ "replaying response") return retval certfile = "%s/%s.pem" % (self.certdir, aid) # Convert the topology into topdl data structures. Again, the # skeletion doesn't do anything with it, but this is how one parses a # topology request. if topref: topo = topdl.Topology(**topref) else: raise service_error(service_error.req, "Request missing segmentdescription'") #rmanager = "192.168.1.10:1234" #smanager = "192.168.1.10:1240" #pmanager = "192.168.1.10:1242" #fncp = "192.168.1.10" #tftpdman="192.168.1.10" #wolagent="192.168.1.10:5959:192.168.0.0/16" #encd="192.168.1.21" #user = "alwabel" #project = "lace" if len(self.available_ports) > 1: self.state_lock.acquire() #Pick two ports for ENCD and ESQP ENCD = random.choice([ i for i in self.available_ports]) ESQP = random.choice([ i for i in self.available_ports]) self.available_ports.discard(ENCD) self.available_ports.discard(ESQP) self.state[aid]['ENCD'] = ENCD self.state[aid]['ESQP'] = ESQP self.write_state() self.state_lock.release() else: self.log.debug("[StartSegment] There is no enough ports for kuma!") raise service_error(service_error.federant, "No available Ports for Kuma") k = klanguage(self.rmanager,self.smanager,self.pmanager,self.fncp,self.state[aid]['user'],self.state[aid]['project'],self.tftpdman,self.wolagent,self.encd) k.k_from_topology(topo) level, kname = tempfile.mkstemp() k.to_file(kname) pubkey_base = None secretkey_base = None tmpdir = None peer = None if k.isPortal(): self.log.debug("Setting up a portal!") try: #pick interface self.state_lock.acquire() if len(self.iiface) > 1: IIFACE = random.choice([i for i in self.iiface] ) self.iiface.remove(IIFACE) self.state[aid]['iiface'] = IIFACE ip,substrate = k.getPortalInfo() self.log.debug("IIFACE = %s, ip = %s" ,IIFACE,ip) Peer.setIP(IIFACE,ip) #set interface IP! #bridge name always have the following convention interfacenamebr Peer.setBridge("%sbr" % (IIFACE) ) Peer.addIfaceToBridge(IIFACE,"%sbr" % (IIFACE) ) else: raise BaseException("No available internal interfaces!") if len(self.eiface) > 1: EIFACE = random.choice([i for i in self.eiface] ) self.eiface.remove(EIFACE) self.state[aid]['eiface'] = EIFACE peer_ip = Peer.getIP(EIFACE) else: raise BaseException("No available external interfaces!") self.state_lock.release() tmpdir = tempfile.mkdtemp(prefix="access-") pubkey_base, secretkey_base = self.fetchConfiguration(attrs,aid,certfile,tmpdir) self.exportInfo(certfile,connInfo,peer_ip) self.import_store_info(certfile, connInfo) peer = connInfo[0].get('peer') con_type = connInfo[0].get('type') self.log.debug("peer = %s, type = %s, ", str(peer),con_type) except BaseException as e: self.state_lock.release() self.log.error("Problem reteriving peer information") traceback.print_exc(file=sys.stdout) raise service_error(service_error.internal, "Cannot handel peer information!") proc = subprocess.Popen(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)]) pid = proc.pid #pid = 100000 self.log.debug(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)]) #os.unlink(kname) # The attributes of the request. Not used by this plug-in, but that's # where they are. attrs = req.get('fedAttr', []) # Gather connection information. Used to send messages to those # waiting. connInfo = req.get('connection', []) proce1 = None if k.isPortal(): try: proce1 = subprocess.Popen(['./peer_setup.py','-d',peer,'-k','%s/%s'% (tmpdir,pubkey_base),'-b',"%sbr" % (IIFACE),'-i', IIFACE]) except: self.log.warning("Could not start connection with peer!") self.state_lock.acquire() # It's possible that the StartSegment call gets retried (!). # if the 'started' key is in the allocation, we'll return it rather # than redo the setup. The integer allocation was saved when we made # it. if pubkey_base: self.state[aid]['pubkey_base'] = pubkey_base if secretkey_base: self.state[aid]['secretkey_base'] = secretkey_base if tmpdir: self.state[aid]['tmpdir'] = tmpdir if peer: self.state[aid]['peer'] = peer if proce1: self.state[aid]['ssh_pid'] = proce1.pid self.state[aid]['pid'] = pid self.state[aid]['descfile'] = kname self.state[aid]['started'] = { 'allocID': req['allocID'], 'allocationLog': "Allocatation complete", 'segmentdescription': { 'topdldescription': topo.to_dict() }, #'proof' : proof("me", "faber","attr",[]).to_dict(), 'proof': proof.to_dict(), #'pid' : pid 'fedAttr': [ { 'attribute': 'domain', 'value': self.domain } , {'attribute': 'dragon', 'value' : '128.9.168.133'}, ] } retval = copy.deepcopy(self.state[aid]['started']) self.write_state() self.state_lock.release() return retval def TerminateSegment(self, req, fid): self.log.debug("Terminate Segment"); # Gather the same access information as for Start Segment try: req = req['TerminateSegmentRequestBody'] except KeyError: raise service_error(server_error.req, "Badly formed request") auth_attr = req['allocID']['fedid'] aid = "%s" % auth_attr self.log.debug("Terminate request for %s" %aid) # Check authorization access_ok, proof = self.auth.check_attribute(fid, auth_attr, with_proof=True) if not access_ok: raise service_error(service_error.access, "Access denied", proof=proof) # Authorized: remove the integer from the allocation. A more complex # plug in would interface with the underlying facility to turn off the # experiment here. self.state_lock.acquire() if aid in self.state: pid = self.state[aid].get('pid', None) if pid: try: self.log.debug("kill process %s " % (pid)) os.kill(int(pid), signal.SIGTERM) #os.kill(int(pid),9) os.wait() except OSError as ex: self.log.warning("Cannot kill process %s " % (pid)) if 'ssh_pid' in self.state[aid]: try: os.kill(int(self.state[aid]['ssh_pid'] ) ) os.wait() except OSError as ex: self.log.warning("Cannot kill process") descfile = self.state[aid].get('descfile',None) if descfile: os.unlink(descfile) del self.state[aid]['descfile'] if 'ENCD' in self.state[aid]: del self.state[aid]['ENCD'] if 'ESQP' in self.state[aid]: del self.state[aid]['ESQP'] del self.state[aid]['started'] if 'eiface' in self.state[aid]: del self.state[aid]['eiface'] if 'iiface' in self.state[aid]: try: Peer.delBridge("%sbr" % self.state[aid]['iiface'] ) except: self.log.warning("Problem removing bridge %sbr" % self.state[aid]['eiface'] ) del self.state[aid]['iiface'] if 'pubkey_base' in self.state[aid]: del self.state[aid]['pubkey_base'] if 'secretkey_base' in self.state[aid]: del self.state[aid]['secretkey_base'] if 'tmpdir' in self.state[aid]: try: shutil.rmtree(self.state[aid]['tmpdir']) except OSError as e: self.log.warning("Problem removing directory %s" % (self.state[aid]['tmpdir'] )) del self.state[aid]['tmpdir'] if 'peer' in self.state[aid]: del self.state[aid]['peer'] self.write_state() self.state_lock.release() return { 'allocID': req['allocID'], 'proof': proof.to_dict() } def InfoSegment(self, req, fid): self.log.info("InfoSegment called by %s" % fid) def OperationSegment(self, req, fid): self.log.info("OperationSegment called by %s" % (fid) ) self.log.debug("req = %s " % req )