Changeset 5ae3857
- Timestamp:
- Sep 6, 2009 2:15:52 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 2b7d768
- Parents:
- 1da6a23
- Location:
- fedd/federation
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r1da6a23 r5ae3857 67 67 self.certdir = config.get("access","certdir") 68 68 self.ssh_privkey_file = config.get("access","ssh_privkey_file") 69 self.create_debug = config.getboolean("access", "create_debug") 70 print self.create_debug 69 71 70 72 self.attrs = { } … … 116 118 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 117 119 'StartSegment': soap_handler("StartSegment", self.StartSegment), 120 'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment), 118 121 } 119 122 self.xmlrpc_services = {\ … … 122 125 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', 123 126 self.ReleaseAccess), 124 'StartSegment': xmlrpc_handler('StartSegment', 125 self.StartSegment), 127 'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment), 128 'TerminateSegment': xmlrpc_handler('TerminateSegment', 129 self.TerminateSegment), 126 130 } 127 131 … … 1083 1087 class stop_segment(proxy_emulab_segment): 1084 1088 def __init__(self, log=None, keyfile=None, debug=False): 1085 experiment_control_local.emulab_segment.__init__(self,1089 access.proxy_emulab_segment.__init__(self, 1086 1090 log=log, keyfile=keyfile, debug=debug) 1087 1091 1088 def __call__(self, tb, eid, tbparams):1092 def __call__(self, parent, user, pid, eid): 1089 1093 """ 1090 1094 Stop a sub experiment by calling swapexp on the federant 1091 1095 """ 1092 user = tbparams[tb]['user'] 1093 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 1094 pid = tbparams[tb]['project'] 1095 1096 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 1096 host = "%s%s" % (parent.ops, parent.domain) 1097 self.log.info("[stop_segment]: Stopping %s" % eid) 1097 1098 rv = False 1098 1099 try: 1099 1100 # Clean out tar files: we've gone over quota in the past 1100 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid)) 1101 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \ 1102 (pid, eid)) 1101 self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \ 1102 (pid, eid)) 1103 1103 rv = self.ssh_cmd(user, host, 1104 1104 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) … … 1335 1335 proj = self.allocation[aid].get('sproject', None) 1336 1336 user = self.allocation[aid].get('user', None) 1337 self.allocation[aid]['experiment'] = ename 1338 self.write_state() 1337 1339 self.state_lock.release() 1338 1340 … … 1351 1353 self.generate_ns2(topo, expfile, 1352 1354 "/proj/%s/software/%s/" % (proj, ename), master) 1353 starter = self.start_segment(keyfile=self.ssh_privkey_file, debug= False)1355 starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug) 1354 1356 starter(self, ename, proj, user, expfile, tmpdir) 1355 1357 1356 1358 return { 'allocID': req['allocID'] } 1359 1360 def TerminateSegment(self, req, fid): 1361 print "In terminate" 1362 try: 1363 req = req['TerminateSegmentRequestBody'] 1364 except KeyError: 1365 raise service_error(server_error.req, "Badly formed request") 1366 1367 auth_attr = req['allocID']['fedid'] 1368 aid = "%s" % auth_attr 1369 attrs = req.get('fedAttr', []) 1370 print "in terminate %s" % auth_attr 1371 if not self.auth.check_attribute(fid, auth_attr): 1372 print "access denied" 1373 raise service_error(service_error.access, "Access denied") 1374 1375 self.state_lock.acquire() 1376 if self.allocation.has_key(aid): 1377 proj = self.allocation[aid].get('project', None) 1378 if not proj: 1379 proj = self.allocation[aid].get('sproject', None) 1380 user = self.allocation[aid].get('user', None) 1381 ename = self.allocation[aid].get('experiment', None) 1382 self.state_lock.release() 1383 1384 if not proj: 1385 raise service_error(service_error.internal, 1386 "Can't find project for %s" % aid) 1387 1388 if not user: 1389 raise service_error(service_error.internal, 1390 "Can't find creation user for %s" % aid) 1391 if not ename: 1392 raise service_error(service_error.internal, 1393 "Can't find experiment name for %s" % aid) 1394 stopper = self.stop_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug) 1395 stopper(self, user, proj, ename) 1396 print { 'allocID': req['allocID'] } 1397 return { 'allocID': req['allocID'] } -
fedd/federation/experiment_control.py
r1da6a23 r5ae3857 197 197 call_ReleaseAccess = service_caller('ReleaseAccess') 198 198 call_StartSegment = service_caller('StartSegment') 199 call_TerminateSegment = service_caller('TerminateSegment') 199 200 call_Ns2Split = service_caller('Ns2Split') 200 201 … … 350 351 'MultiInfo': soap_handler('MultiInfo', self.get_multi_info), 351 352 'Terminate': soap_handler('Terminate', 352 self. terminate_experiment),353 self.new_terminate_experiment), 353 354 } 354 355 … … 360 361 'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info), 361 362 'Terminate': xmlrpc_handler('Terminate', 362 self. terminate_experiment),363 self.new_terminate_experiment), 363 364 } 364 365 … … 2166 2167 self.trusted_certs) 2167 2168 print r 2169 return True 2170 2171 2172 2173 class new_terminate_segment: 2174 def __init__(self, debug=False, log=None, cert_file=None, 2175 cert_pwd=None, trusted_certs=None, caller=None): 2176 self.log = log 2177 self.debug = debug 2178 self.cert_file = cert_file 2179 self.cert_pwd = cert_pwd 2180 self.trusted_certs = None 2181 self.caller = caller 2182 2183 def __call__(self, uri, aid ): 2184 print "in terminate_segment: %s" % aid 2185 req = { 2186 'allocID': aid , 2187 } 2188 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 2189 self.trusted_certs) 2168 2190 return True 2169 2191 … … 3236 3258 self.state_lock.release() 3237 3259 raise service_error(service_error.req, "No saved state") 3260 3261 def new_terminate_experiment(self, req, fid): 3262 """ 3263 Swap this experiment out on the federants and delete the shared 3264 information 3265 """ 3266 tbparams = { } 3267 req = req.get('TerminateRequestBody', None) 3268 if not req: 3269 raise service_error(service_error.req, 3270 "Bad request format (no TerminateRequestBody)") 3271 force = req.get('force', False) 3272 exp = req.get('experiment', None) 3273 if exp: 3274 if exp.has_key('fedid'): 3275 key = exp['fedid'] 3276 keytype = "fedid" 3277 elif exp.has_key('localname'): 3278 key = exp['localname'] 3279 keytype = "localname" 3280 else: 3281 raise service_error(service_error.req, "Unknown lookup type") 3282 else: 3283 raise service_error(service_error.req, "No request?") 3284 3285 self.check_experiment_access(fid, key) 3286 3287 dealloc_list = [ ] 3288 3289 3290 # Create a logger that logs to the dealloc_list as well as to the main 3291 # log file. 3292 dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) 3293 h = logging.StreamHandler(self.list_log(dealloc_list)) 3294 # XXX: there should be a global one of these rather than repeating the 3295 # code. 3296 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 3297 '%d %b %y %H:%M:%S')) 3298 dealloc_log.addHandler(h) 3299 3300 self.state_lock.acquire() 3301 fed_exp = self.state.get(key, None) 3302 3303 if fed_exp: 3304 # This branch of the conditional holds the lock to generate a 3305 # consistent temporary tbparams variable to deallocate experiments. 3306 # It releases the lock to do the deallocations and reacquires it to 3307 # remove the experiment state when the termination is complete. 3308 3309 # First make sure that the experiment creation is complete. 3310 status = fed_exp.get('experimentStatus', None) 3311 3312 if status: 3313 if status in ('starting', 'terminating'): 3314 if not force: 3315 self.state_lock.release() 3316 raise service_error(service_error.partial, 3317 'Experiment still being created or destroyed') 3318 else: 3319 self.log.warning('Experiment in %s state ' % status + \ 3320 'being terminated by force.') 3321 else: 3322 # No status??? trouble 3323 self.state_lock.release() 3324 raise service_error(service_error.internal, 3325 "Experiment has no status!?") 3326 3327 ids = [] 3328 # experimentID is a list of dicts that are self-describing 3329 # identifiers. This finds all the fedids and localnames - the 3330 # keys of self.state - and puts them into ids. 3331 for id in fed_exp.get('experimentID', []): 3332 if id.has_key('fedid'): ids.append(id['fedid']) 3333 if id.has_key('localname'): ids.append(id['localname']) 3334 3335 # Collect the allocation/segment ids 3336 for fed in fed_exp.get('federant', []): 3337 try: 3338 print "looking at %s" % fed 3339 tb = fed['emulab']['project']['testbed']['localname'] 3340 aid = fed['allocID'] 3341 except KeyError, e: 3342 print "Key error: %s" %e 3343 continue 3344 tbparams[tb] = aid 3345 fed_exp['experimentStatus'] = 'terminating' 3346 if self.state_filename: self.write_state() 3347 self.state_lock.release() 3348 3349 # Stop everyone. NB, wait_for_all waits until a thread starts and 3350 # then completes, so we can't wait if nothing starts. So, no 3351 # tbparams, no start. 3352 if len(tbparams) > 0: 3353 thread_pool = self.thread_pool(self.nthreads) 3354 for tb in tbparams.keys(): 3355 # Create and start a thread to stop the segment 3356 thread_pool.wait_for_slot() 3357 uri = self.tbmap.get(tb, None) 3358 t = self.pooled_thread(\ 3359 target=self.new_terminate_segment(log=dealloc_log, 3360 cert_file=self.cert_file, 3361 cert_pwd=self.cert_pwd, 3362 trusted_certs=self.trusted_certs, 3363 caller=self.call_TerminateSegment), 3364 args=(uri, tbparams[tb]), name=tb, 3365 pdata=thread_pool, trace_file=self.trace_file) 3366 t.start() 3367 # Wait for completions 3368 thread_pool.wait_for_all_done() 3369 3370 # release the allocations (failed experiments have done this 3371 # already, and starting experiments may be in odd states, so we 3372 # ignore errors releasing those allocations 3373 try: 3374 for tb in tbparams.keys(): 3375 self.release_access(tb, tbparams[tb]) 3376 except service_error, e: 3377 if status != 'failed' and not force: 3378 raise e 3379 3380 # Remove the terminated experiment 3381 self.state_lock.acquire() 3382 for id in ids: 3383 if self.state.has_key(id): del self.state[id] 3384 3385 if self.state_filename: self.write_state() 3386 self.state_lock.release() 3387 3388 return { 3389 'experiment': exp , 3390 'deallocationLog': "".join(dealloc_list), 3391 } 3392 else: 3393 # Don't forget to release the lock 3394 self.state_lock.release() 3395 raise service_error(service_error.req, "No saved state")
Note: See TracChangeset
for help on using the changeset viewer.