Changeset 9d40cc9 for starbed_plugin/starbed.py
- Timestamp:
- Aug 26, 2012 1:04:57 PM (12 years ago)
- Branches:
- master
- Children:
- 2f45140
- Parents:
- d070d9f (diff), eb117fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
starbed_plugin/starbed.py
reb117fe r9d40cc9 3 3 import os,sys 4 4 import subprocess 5 import signal 5 6 import re 6 7 import string … … 11 12 import tempfile 12 13 from federation.util import * 13 from federation.fedid import fedid,generate_fedid14 from federation.authorizer import authorizer 14 from deter import fedid,generate_fedid 15 from federation.authorizer import authorizer, abac_authorizer 15 16 from federation.service_error import service_error 16 17 from federation.remote_service import xmlrpc_handler, soap_handler, service_caller 17 18 18 import federation.topdl as topdl 19 19 from deter import topdl 20 20 from federation.access import access_base 21 from federation.legacy_access import legacy_access22 21 from federation.proof import proof 23 22 from topdltok import klanguage … … 38 37 39 38 @staticmethod 40 def parse_access_string(s):39 def access_tuple(str): 41 40 """ 42 Parse a parenthesized string from the access db by removing the parens. 43 If the string isn't in parens, we just return it with whitespace 44 trimmed in either case. 41 Convert a string (user,passwd,project) into an access_project. 42 It returns a tuple of the form (user,passwd,project). 45 43 """ 46 st = s.strip() 47 if st.startswith("(") and st.endswith(")"): return st[1:-1] 48 else: return st 49 44 str = str.strip() 45 if str.startswith('(') and str.endswith(')') and str.count(',') == 2: 46 user,passwd,project = str[1:-1].split(',') 47 return (user.strip(), passwd.strip(), project.strip()) 48 else: 49 raise self.parse_error( 50 'Bad mapping (unbalanced parens or malformed string is should be of the form (user,passwd,project)') 51 50 52 def __init__(self, config=None, auth=None): 51 53 self.rmanager = config.get('definition','rmanager') … … 56 58 self.wolagent = config.get('definition','wolagent') 57 59 self.encd = config.get('definition','encd') 58 self.user = config.get('definition','user') 59 self.project= config.get('definition','project') 60 self.rpassword = config.get('definition','rpassword') 61 62 access_base.__init__(self, config, auth) 60 61 access_base.__init__(self, config, auth) 62 63 #Available ports for kuma! 64 ports = config.get('globals','freeports') 65 self.domain = config.get('globals','domain') 66 try: 67 if not ports or ports.count('-') != 1: 68 raise TypeError("bad ports") 69 else: 70 a,b = ports.split('-') 71 a = int(a) 72 b = int(b) 73 if a < b: 74 start = a 75 end = b 76 else: 77 start = b 78 end = a 79 if abs(start-end) < 2: 80 raise TypeError("Bad ports") 81 self.available_ports = set(range(start,end) ) 82 except TypeError as e: 83 self.available_ports = set(range(3456,3458)) 84 self.log.warning("Setting default freeports due to missing or malformed configuration. %s" % (e)) 85 #Get reserved ports saved in state and remove them from the available set! 86 self.state_lock.acquire() 87 ###### work #### 88 for k in self.state.keys(): 89 #remove any reserved ENCD or ESQP ports from available ports so kuma wouldn't pick an occupied port! 90 if 'ENCD' in self.state[k]: 91 self.available_ports.discard(self.state[k]['ENCD']) 92 elif 'ESQP' in self.state[k]: 93 self.available_ports.discard(self.state[k]['ESQP']) 94 self.state_lock.release() 95 63 96 64 97 # authorization information 65 98 self.auth_type = config.get('access', 'auth_type') \ 66 or ' legacy'99 or 'abac' 67 100 self.auth_dir = config.get('access', 'auth_dir') 68 101 accessdb = config.get("access", "accessdb") 69 print "authentication type = %s" %(self.auth_type); 70 if self.auth_type == 'starbed': 71 self.log.debug("starbed authtication methond"); 102 if self.auth_type == 'abac': 103 # Load the current authorization state 104 self.auth = abac_authorizer(load=self.auth_dir) 105 self.access = [ ] 106 if accessdb: 107 try: 108 self.read_access(accessdb, self.access_tuple) 109 except EnvironmentError, e: 110 self.log.error("Cannot read %s: %s" % \ 111 (config.get("access", "accessdb"), e)) 112 raise e 72 113 else: 73 114 raise service_error(service_error.internal, 74 115 "Unknown auth_type: %s" % self.auth_type) 75 116 117 #if self.auth_type == 'starbed': 118 # self.log.debug("starbed authtication methond"); 119 #else: 120 # raise service_error(service_error.internal, 121 # "Unknown auth_type: %s" % self.auth_type) 122 123 #TO DO: clean state ! 76 124 # These dictionaries register the plug-in's local routines for handline 77 125 # these four messages with the server code above. There's a version … … 98 146 } 99 147 self.log.debug("Starbed AC started!") 100 # RequestAccess and ReleaseAccess come from the base class101 148 def RequestAccess(self, req, fid): 149 self.log.info("RequestAccess called by %s" % fid) 102 150 # The dance to get into the request body 103 151 if req.has_key('RequestAccessRequestBody'): … … 105 153 else: 106 154 raise service_error(service_error.req, "No request!?") 107 #self.log.debug("req = %s\n-------\n fid=%s\n" % (req,fid)) 155 if self.auth.import_credentials( 156 data_list=req.get('abac_credential', [])): 157 self.auth.save() 158 else: 159 self.log.debug('failed to import incoming credentials') 160 if self.auth_type == 'abac': 161 found, owners, proof = self.lookup_access(req, fid) 162 else: 163 raise service_error(service_error.internal, 164 'Unknown auth_type: %s' % self.auth_type) 165 # keep track of what's been added 108 166 allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) 109 167 aid = unicode(allocID) 110 168 self.state_lock.acquire() 111 169 self.state[aid] = { } 112 self.state[aid]['user'] = 'alwabel' 113 self.state[aid]['owners'] = 'alwabel' 114 self.state[aid]['auth'] = set() 170 self.state[aid]['user'] = found[0] 171 self.state[aid]['passwd'] = found[1] 172 self.state[aid]['project'] = found[2] 173 self.state[aid]['owners'] = owners 174 self.append_allocation_authorization(aid, 175 ((fid, allocID), (allocID, allocID))) 115 176 self.write_state() 116 self.state_lock.release() 117 #self.log.debug("to_dict() = %s " % (proof("me","faber","attr",[]).to_dict())); 118 return { 'allocID': { 'fedid': allocID }, 'proof' : proof("me", "faber","attr",[]).to_dict() } 177 self.state_lock.release() 178 179 try: 180 f = open("%s/%s.pem" % (self.certdir, aid), "w") 181 print >>f, alloc_cert 182 f.close() 183 except EnvironmentError, e: 184 self.log.info("RequestAccess failed for by %s: internal error" \ 185 % fid) 186 raise service_error(service_error.internal, 187 "Can't open %s/%s : %s" % (self.certdir, aid, e)) 188 self.log.debug('RequestAccess Returning allocation ID: %s' % allocID) 189 return { 'allocID': { 'fedid': allocID }, 'proof': proof.to_dict() } 119 190 def ReleaseAccess(self, req, fid): 120 self.log.debug("request releasing access!"); 191 self.log.info("ReleaseAccess called by %s" % fid) 192 if req.has_key('ReleaseAccessRequestBody'): 193 req = req['ReleaseAccessRequestBody'] 194 else: 195 raise service_error(service_error.req, "No request!?") 196 try: 197 if req['allocID'].has_key('localname'): 198 auth_attr = aid = req['allocID']['localname'] 199 elif req['allocID'].has_key('fedid'): 200 aid = unicode(req['allocID']['fedid']) 201 auth_attr = req['allocID']['fedid'] 202 else: 203 raise service_error(service_error.req, 204 "Only localnames and fedids are understood") 205 except KeyError: 206 raise service_error(service_error.req, "Badly formed request") 207 208 self.log.debug("[access] deallocation requested for %s by %s" % (aid, fid)) 209 access_ok, proof = self.auth.check_attribute(fid, auth_attr, 210 with_proof=True) 211 212 if not access_ok: 213 self.log.debug("[access] deallocation denied for %s", aid) 214 raise service_error(service_error.access, "Access Denied") 215 216 self.state_lock.acquire() 217 if aid in self.state: 218 self.log.debug("Found allocation for %s" %aid) 219 self.clear_allocation_authorization(aid, state_attr='state') 220 del self.state[aid] 221 self.write_state() 222 self.state_lock.release() 223 # Remove the access cert 224 cf = "%s/%s.pem" % (self.certdir, aid) 225 self.log.debug("Removing %s" % cf) 226 os.remove(cf) 227 self.log.info("ReleaseAccess succeeded for %s" % fid) 228 return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 229 else: 230 self.state_lock.release() 231 raise service_error(service_error.req, "No such allocation") 232 121 233 def StartSegment(self, req, fid): 122 234 #self.log.debug("data = %s" %(req)); 123 235 self.log.debug('StartSegment called by %s' % (fid) ); 124 236 try: 125 237 req = req['StartSegmentRequestBody'] 126 #print req;238 self.log.debug(req) 127 239 # Get the request topology. If not present, a KeyError is thrown. 128 240 topref = req['segmentdescription']['topdldescription'] … … 139 251 with_proof=True) 140 252 #self.log.debug("lllllacc = %s, proffe= %s" %( access_ok,proof.to_dict())) 141 #CAUSIO XXXX 142 if access_ok: 253 if not access_ok: 143 254 raise service_error(service_error.access, "Access denied", 144 255 proof=proof) … … 177 288 #user = "alwabel" 178 289 #project = "lace" 179 180 k = klanguage(self.rmanager,self.smanager,self.pmanager,self.fncp,self.user,self.project,self.tftpdman,self.wolagent,self.encd) 290 if len(self.available_ports) > 1: 291 self.state_lock.acquire() 292 #Pick two ports for ENCD and ESQP 293 ENCD = random.choice([ i for i in self.available_ports]) 294 ESQP = random.choice([ i for i in self.available_ports]) 295 self.available_ports.discard(ENCD) 296 self.available_ports.discard(ESQP) 297 self.state[aid]['ENCD'] = ENCD 298 self.state[aid]['ESQP'] = ESQP 299 self.write_state() 300 self.state_lock.release() 301 else: 302 self.log.debug("[StartSegment] There is no enough ports for kuma!") 303 raise service_error(service_error.federant, "No available Ports for Kuma") 304 k = klanguage(self.rmanager,self.smanager,self.pmanager,self.fncp,self.state[aid]['user'],self.state[aid]['project'],self.tftpdman,self.wolagent,self.encd) 181 305 k.k_from_topology(topo) 182 306 level, kname = tempfile.mkstemp() 183 307 k.to_file(kname) 184 pid = subprocess.Popen(['/usr/local/springos/bin/kuma', '-p',self.rpassword,kname]) 185 #pid = subprocess.Popen([sys.executable, ['/usr/local/springos/bin/kuma', '-p',self.rpassword,kname] ]) 308 #proc = subprocess.Popen(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)]) 309 #pid = proc.pid 310 pid = 100000 311 self.log.debug(['/usr/local/springos/bin/kuma', '-p',self.state[aid]['passwd'],kname,'-P',str(ENCD),'-Q',str(ESQP)]) 186 312 #os.unlink(kname) 187 313 # The attributes of the request. Not used by this plug-in, but that's … … 198 324 # than redo the setup. The integer allocation was saved when we made 199 325 # it. 326 self.state[aid]['pid'] = pid 327 self.state[aid]['descfile'] = kname 200 328 self.state[aid]['started'] = { 201 329 'allocID': req['allocID'], … … 204 332 #'proof' : proof("me", "faber","attr",[]).to_dict(), 205 333 'proof': proof.to_dict(), 334 #'pid' : pid 335 'fedAttr': [ 336 { 'attribute': 'domain', 'value': self.domain } , 337 {'attribute': 'dragon', 'value' : '128.9.168.133'}, 338 ] 206 339 } 207 340 retval = copy.deepcopy(self.state[aid]['started']) 208 341 self.write_state() 209 342 self.state_lock.release() 210 211 343 return retval 212 344 213 345 def TerminateSegment(self, req, fid): 214 346 self.log.debug("Terminate Segment"); 215 """216 Remove the resources associated with th eallocation and stop the music.217 In this example, this simply means removing the integer we allocated.218 """219 347 # Gather the same access information as for Start Segment 220 348 try: … … 239 367 self.state_lock.acquire() 240 368 if aid in self.state: 241 assigned = self.state[aid].get('integer', None) 242 self.available_ints.add(assigned) 243 if 'integer' in self.state[aid]: 244 del self.state[aid]['integer'] 369 pid = self.state[aid].get('pid', None) 370 if pid: 371 try: 372 self.log.debug("kill process %s " % (pid)) 373 os.kill(int(pid), signal.SIGTERM) 374 #os.kill(int(pid),9) 375 os.wait() 376 except OSError as ex: 377 self.log.warning("Cannot kill process %s " % (pid)) 378 descfile = self.state[aid].get('descfile',None) 379 if descfile: 380 #os.unlink(descfile) 381 del self.state[aid]['descfile'] 382 if 'ENCD' in self.state[aid]: 383 del self.state[aid]['ENCD'] 384 if 'ESQP' in self.state[aid]: 385 del self.state[aid]['ESQP'] 386 del self.state[aid]['started'] 245 387 self.write_state() 246 388 self.state_lock.release() 247 389 248 390 return { 'allocID': req['allocID'], 'proof': proof.to_dict() } 391 def InfoSegment(self, req, fid): 392 self.log.info("InfoSegment called by %s" % fid) 393
Note: See TracChangeset
for help on using the changeset viewer.