Changeset 11a08b0 for fedd/fedd_experiment_control.py
- Timestamp:
- Oct 7, 2008 6:12:57 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 0ea11af
- Parents:
- 8ecfbad
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/fedd_experiment_control.py
r8ecfbad r11a08b0 29 29 import parse_detail 30 30 from service_error import * 31 32 import logging 33 34 class nullHandler(logging.Handler): 35 def emit(self, record): pass 36 37 fl = logging.getLogger("fedd.experiment_control") 38 fl.addHandler(nullHandler()) 31 39 32 40 class fedd_experiment_control_local: … … 91 99 self.exception = s 92 100 if self.trace_file: 93 print >>self.trace_file, "Thread exception: %s %s" % \ 94 (s.code_string(), s.desc) 95 101 logging.error("Thread exception: %s %s" % \ 102 (s.code_string(), s.desc)) 96 103 except: 97 104 self.exception = sys.exc_info()[1] 98 105 if self.trace_file: 99 print >>self.trace_file, \ 100 "Unexpected thread exception: %s" % \ 101 self.exception 102 print >>self.trace_file, "Trace: %s" % \ 103 traceback.format_exc() 106 logging.error(("Unexpected thread exception: %s" +\ 107 "Trace %s") % (self.exception,\ 108 traceback.format_exc())) 104 109 if self.pdata: 105 110 self.pdata.terminate() … … 133 138 self.exp_stem = "fed-stem" 134 139 self.debug = config.create_debug 140 self.log = logging.getLogger("fedd.experiment_control") 135 141 self.muxmax = 2 136 142 self.nthreads = 2 … … 176 182 raise service_error(service_error.internal, 177 183 "Cannot read sshpubkey") 184 185 # Set the logging level to the value passed in. The getattr slieght of 186 # hand finds the logging level constant corrersponding to the string. 187 # We're a little paranoid to avoid user mayhem. 188 if config.experiment_log: 189 try: 190 level = int(getattr(logging, config.experiment_log.upper(),-1)) 191 192 if logging.DEBUG <= level <= logging.CRITICAL: 193 self.log.setLevel(level) 194 else: 195 self.log.error("Bad experiment_log value: %s" % \ 196 config.experiment_log) 197 198 except ValueError: 199 self.log.error("Bad experiment_log value: %s" % \ 200 config.experiment_log) 178 201 179 202 # Grab saved state … … 262 285 pickle.dump(self.state, f) 263 286 except IOError, e: 264 print >>sys.stderr,"Can't write file %s: %s" % \265 (self.state_filename, e) 287 self.log.error("Can't write file %s: %s" % \ 288 (self.state_filename, e)) 266 289 except pickle.PicklingError, e: 267 print >>sys.stderr, "Pickling problem: %s" % e290 self.log.error("Pickling problem: %s" % e) 268 291 269 292 # Call while holding self.state_lock … … 273 296 self.state = pickle.load(f) 274 297 except IOError, e: 275 print >>sys.stderr, "Can't open %s: %s" % \276 (self.state_filename, e) 298 self.log.warning("No saved state: Can't open %s: %s" % \ 299 (self.state_filename, e)) 277 300 except pickle.UnpicklingError, e: 278 print >>sys.stderr, "Unpickling failed: %s" % e301 self.log.warning("No saved state: Unpickling failed: %s" % e) 279 302 280 303 def scp_file(self, file, user, host, dest=""): … … 284 307 285 308 scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)] 286 287 trace = self.trace_file 288 if not trace: 289 try: 290 trace = open("/dev/null", "w") 291 except IOError: 292 raise service_error(service_error.internal, 293 "Cannot open /dev/null??"); 294 309 rv = 0 310 311 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 295 312 if not self.debug: 296 313 rv = call(scp_cmd, stdout=trace, stderr=trace) 297 else:298 if self.trace_file:299 print >>self.trace_file, "debug [scp_file]: %s" % \300 " ".join(scp_cmd)301 rv = 0302 314 303 315 return rv == 0 … … 306 318 sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd) 307 319 308 trace = self.trace_file 309 if not trace: 310 try: 311 trace = open("/dev/null", "w") 312 except IOError: 313 raise service_error(service_error.internal, 314 "Cannot open /dev/null??"); 315 320 self.log.debug("[ssh_cmd]: %s" % sh_str) 316 321 if not self.debug: 317 322 sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace) 318 323 return sub.wait() == 0 319 324 else: 320 if self.trace_file:321 print >>self.trace_file,"debug [ssh_cmd]: %s" % sh_str322 325 return True 323 326 … … 365 368 cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid] 366 369 367 368 if self.trace_file: 369 print >>self.trace_file, "status request: %s" % " ".join(cmd) 370 371 if not self.trace_file: 372 try: 373 st_file = open("/dev/null", "w") 374 except IOError: 375 raise service_error(service_error.internal, 376 "Cannot open /dev/null!?") 377 else: 378 st_file = self.trace_file 379 380 status = Popen(cmd, stdout=PIPE, stderr=st_file) 370 self.log.debug("[start_segment]: %s"% " ".join(cmd)) 371 dev_null = None 372 try: 373 dev_null = open("/dev/null", "a") 374 except IOError, e: 375 self.log.error("[start_segment]: can't open /dev/null: %s" %e) 376 377 status = Popen(cmd, stdout=PIPE, stderr=dev_null) 381 378 for line in status.stdout: 382 379 m = state_re.match(line) … … 392 389 raise service_error(service_error.internal, 393 390 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 394 if self.trace_file: 395 print >>self.trace_file, "%s: %s" % (tb, state)396 print >>self.trace_file, "transferring experiment to %s" % tb391 392 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 393 self.log.info("[start_segment]:transferring experiment to %s" % tb) 397 394 398 395 if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host): … … 433 430 "%s/rpms" % tmpdir, tarfiles_dir): 434 431 return False 435 if self.trace_file: 436 print >>self.trace_file, "Modifying %s on %s" % (eid, tb) 432 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 437 433 if not self.ssh_cmd(user, host, 438 434 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ … … 459 455 "%s/rpms" % tmpdir, tarfiles_dir): 460 456 return False 461 if self.trace_file: 462 print >>self.trace_file, "Modifying %s on %s" % (eid, tb) 457 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 463 458 if not self.ssh_cmd(user, host, 464 459 "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile), 465 460 "modexp"): 466 461 return False 467 if self.trace_file: 468 print >>self.trace_file, "Swapping %s in on %s" % (eid, tb) 462 self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb)) 469 463 if not self.ssh_cmd(user, host, 470 464 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), … … 486 480 "%s/rpms" % tmpdir, tarfiles_dir): 487 481 return False 488 if self.trace_file: 489 print >>self.trace_file, "Creating %s on %s" % (eid, tb) 482 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 490 483 if not self.ssh_cmd(user, host, 491 484 "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \ … … 502 495 proj_dir): 503 496 return False 504 if self.trace_file: 505 print >>self.trace_file, "Swapping %s in on %s" % (eid, tb) 497 self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb)) 506 498 if not self.ssh_cmd(user, host, 507 499 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), … … 510 502 return True 511 503 else: 512 if self.trace_file: 513 print >>self.trace_file, "unknown state %s" % state 504 self.log.debug("[start_segment]:unknown state %s" % state) 514 505 return False 515 506 … … 519 510 pid = tbparams[tb]['project'] 520 511 521 if self.trace_file: 522 print >>self.trace_file, "Stopping %s on %s" % (eid, tb) 512 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 523 513 return self.ssh_cmd(user, host, 524 514 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) … … 534 524 t = type.lower(); 535 525 if t not in valid_types: raise ValueError 536 537 trace = self.trace_file 538 if not trace: 539 try: 540 trace = open("/dev/null", "w") 541 except IOError: 542 raise service_error(service_error.internal, 543 "Cannot open /dev/null??"); 526 cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] 527 528 try: 529 trace = open("/dev/null", "w") 530 except IOError: 531 raise service_error(service_error.internal, 532 "Cannot open /dev/null??"); 544 533 545 534 # May raise CalledProcessError 546 rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest],547 535 self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) 536 rv = call(cmd, stdout=trace, stderr=trace) 548 537 if rv != 0: 549 538 raise service_error(service_error.internal, … … 1007 996 else: 1008 997 # XXX 1009 print >>sys.stderr, \ 1010 "Ignoring gateways for unknown testbed %s" \ 1011 % self.current_gateways 998 self.log.error("[gateways]: Ignoring gateways for " + \ 999 "unknown testbed %s" % self.current_gateways) 1012 1000 self.current_gateways = None 1013 1001 return True … … 1052 1040 "Error creating seer config") 1053 1041 else: 1054 if self.trace_file: 1055 print >>sys.stderr, "No control gateway for %s" %\ 1056 self.current_gateways 1042 debug.error("[gateways]: No control gateway for %s" %\ 1043 self.current_gateways) 1057 1044 self.current_gateways = None 1058 1045 else: … … 1374 1361 "Swap in failed on %s" % ",".join(failed)) 1375 1362 else: 1376 if self.trace_file: 1377 print >>self.trace_file, "Experiment started" 1363 self.log.info("[start_segment]: Experiment %s started" % eid) 1378 1364 1379 1365 # Generate an ID for the experiment (slice) and a certificate that the 1380 1366 # allocator can use to prove they own it. We'll ship it back through 1381 1367 # the encrypted connection. 1382 (expid, expcert) = generate_fedid("test", dir=tmpdir, 1383 trace=self.trace_file) 1384 1385 if self.trace_file: 1386 print >>self.trace_file, "removing %s" % tmpdir 1368 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1369 1370 self.log.debug("[start_experiment]: removing %s" % tmpdir) 1387 1371 1388 1372 # Walk up tmpdir, deleting as we go
Note: See TracChangeset
for help on using the changeset viewer.