- Timestamp:
- Oct 7, 2008 6:12:57 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 0ea11af
- Parents:
- 8ecfbad
- Location:
- fedd
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/fedd.py
r8ecfbad r11a08b0 19 19 20 20 from threading import * 21 from signal import signal, pause, SIGINT, SIGTERM 22 from select import select 23 import logging 21 24 22 25 # The SSL server here is based on the implementation described at … … 169 172 170 173 self.set_defaults(host="localhost", port=23235, transport="soap", 171 debug=0)174 logfile=None, debug=0) 172 175 173 176 self.add_option("-d", "--debug", action="count", dest="debug", … … 177 180 self.add_option("-H", "--host", action="store", type="string", 178 181 dest="host", help="Hostname to listen on (default %default)") 182 self.add_option("-l", "--logfile", action="store", dest="logfile", 183 help="File to send log messages to") 179 184 self.add_option("-p", "--port", action="store", type="int", 180 185 dest="port", help="Port to listen on (default %default)") … … 188 193 const=sys.stderr, help="Print SOAP exchange to stderr") 189 194 195 servers_active = True 196 197 log_params = {\ 198 'format': "%(asctime)s %(levelname)-8s %(message)s",\ 199 'datefmt': '%a, %d %b %Y %H:%M:%S'\ 200 } 201 202 def shutdown(sig, frame): 203 global servers_active, flog 204 servers_active = False 205 flog.info("Received signal %d, shutting down" % sig); 206 190 207 def run_server(s): 191 if s: s.serve_forever() 208 global servers_active # Not strictly needed: servers_active is only read 209 210 if s: 211 while servers_active: 212 i, o, e = select((s,), (), (), 5.0) 213 if s in i: 214 s.handle_request() 192 215 193 216 services = [ ] … … 195 218 196 219 opts, args = fedd_opts().parse_args() 220 221 flog = logging.getLogger("fedd") 222 ffmt = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s", 223 '%a, %d %b %Y %H:%M:%S') 224 225 if opts.logfile: fh = logging.FileHandler(opts.logfile) 226 else: fh = logging.StreamHandler(sys.stdout) 227 228 # The handler will print anything, setting the logger level will affect what 229 # gets recorded. 230 fh.setLevel(logging.DEBUG) 231 232 if opts.debug: flog.setLevel(logging.DEBUG) 233 else: flog.setLevel(logging.INFO) 234 235 fh.setFormatter(ffmt) 236 flog.addHandler(fh) 237 197 238 198 239 if opts.configfile != None: … … 239 280 elif s[0] == "xmlrpc": 240 281 servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl)) 241 else: print >>sys.stderr, "Unknown transport: %s" % s[0] 282 else: flog.warning("Unknown transport: %s" % s[0]) 283 284 signal(SIGINT, shutdown) 285 signal(SIGTERM, shutdown) 242 286 243 287 for s in servers: 244 t = Thread(target=run_server, args=(s,)) 245 t.start() 288 if s: 289 t = Thread(target=run_server, args=(s,)) 290 t.start() 291 292 pause() -
fedd/fedd_access.py
r8ecfbad r11a08b0 22 22 import parse_detail 23 23 from service_error import * 24 import logging 25 26 class nullHandler(logging.Handler): 27 def emit(self, record): pass 28 29 fl = logging.getLogger("fedd.access") 30 fl.addHandler(nullHandler()) 24 31 25 32 class fedd_access: … … 52 59 self.fedid_default = config.fedid_default 53 60 self.restricted = copy.copy(config.restricted) 61 62 self.log = logging.getLogger("fedd.access") 54 63 55 64 # Certs are promoted from the generic to the specific, so without a … … 385 394 def RequestAccess(self, req, fid): 386 395 387 print "in get access"388 396 if req.has_key('RequestAccessRequestBody'): 389 397 req = req['RequestAccessRequestBody'] … … 393 401 if req.has_key('destinationTestbed'): 394 402 dt = unpack_id(req['destinationTestbed']) 395 396 print dt, " ", self.testbed397 403 398 404 if dt == None or dt == self.testbed: -
fedd/fedd_allocate_project.py
r8ecfbad r11a08b0 22 22 import parse_detail 23 23 from service_error import * 24 import logging 25 26 27 class nullHandler(logging.Handler): 28 def emit(self, record): pass 29 30 fl = logging.getLogger("fedd.allocate.local") 31 fl.addHandler(nullHandler()) 32 fl = logging.getLogger("fedd.allocate.remote") 33 fl.addHandler(nullHandler()) 24 34 25 35 class fedd_allocate_project_local: … … 34 44 self.mkproj = '/usr/testbed/sbin/mkproj' 35 45 self.grantnodetype = '/usr/testbed/sbin/grantnodetype' 46 self.log = logging.getLogger("fedd.allocate.local") 36 47 37 48 def random_string(self, s, n=3): … … 153 164 rc = 0 154 165 for cmd in cmds: 166 self.log.debug("[dynamic_project]: %s" % ' '.join(cmd)) 155 167 if self.dynamic_projects: 156 168 try: … … 161 173 "Dynamic project subprocess creation error "+ \ 162 174 "[%s] (%s)" % (cmd[1], e.strerror)) 163 else:164 print >>sys.stdout, str(" ").join(cmd)165 175 166 176 if rc != 0: -
fedd/fedd_config_file.py
r8ecfbad r11a08b0 29 29 "create_experiment_cert_file", "create_experiment_cert_pwd", 30 30 "create_experiment_trusted_certs", "federation_script_dir", 31 "ssh_pubkey_file", "experiment_state_file" )31 "ssh_pubkey_file", "experiment_state_file", "experiment_log") 32 32 id_list_attrs = ("restricted",) 33 33 -
fedd/fedd_experiment_control.py
r8ecfbad r11a08b0 29 29 import parse_detail 30 30 from service_error import * 31 32 import logging 33 34 class nullHandler(logging.Handler): 35 def emit(self, record): pass 36 37 fl = logging.getLogger("fedd.experiment_control") 38 fl.addHandler(nullHandler()) 31 39 32 40 class fedd_experiment_control_local: … … 91 99 self.exception = s 92 100 if self.trace_file: 93 print >>self.trace_file, "Thread exception: %s %s" % \ 94 (s.code_string(), s.desc) 95 101 logging.error("Thread exception: %s %s" % \ 102 (s.code_string(), s.desc)) 96 103 except: 97 104 self.exception = sys.exc_info()[1] 98 105 if self.trace_file: 99 print >>self.trace_file, \ 100 "Unexpected thread exception: %s" % \ 101 self.exception 102 print >>self.trace_file, "Trace: %s" % \ 103 traceback.format_exc() 106 logging.error(("Unexpected thread exception: %s" +\ 107 "Trace %s") % (self.exception,\ 108 traceback.format_exc())) 104 109 if self.pdata: 105 110 self.pdata.terminate() … … 133 138 self.exp_stem = "fed-stem" 134 139 self.debug = config.create_debug 140 self.log = logging.getLogger("fedd.experiment_control") 135 141 self.muxmax = 2 136 142 self.nthreads = 2 … … 176 182 raise service_error(service_error.internal, 177 183 "Cannot read sshpubkey") 184 185 # Set the logging level to the value passed in. The getattr slieght of 186 # hand finds the logging level constant corrersponding to the string. 187 # We're a little paranoid to avoid user mayhem. 188 if config.experiment_log: 189 try: 190 level = int(getattr(logging, config.experiment_log.upper(),-1)) 191 192 if logging.DEBUG <= level <= logging.CRITICAL: 193 self.log.setLevel(level) 194 else: 195 self.log.error("Bad experiment_log value: %s" % \ 196 config.experiment_log) 197 198 except ValueError: 199 self.log.error("Bad experiment_log value: %s" % \ 200 config.experiment_log) 178 201 179 202 # Grab saved state … … 262 285 pickle.dump(self.state, f) 263 286 except IOError, e: 264 print >>sys.stderr,"Can't write file %s: %s" % \265 (self.state_filename, e) 287 self.log.error("Can't write file %s: %s" % \ 288 (self.state_filename, e)) 266 289 except pickle.PicklingError, e: 267 print >>sys.stderr, "Pickling problem: %s" % e290 self.log.error("Pickling problem: %s" % e) 268 291 269 292 # Call while holding self.state_lock … … 273 296 self.state = pickle.load(f) 274 297 except IOError, e: 275 print >>sys.stderr, "Can't open %s: %s" % \276 (self.state_filename, e) 298 self.log.warning("No saved state: Can't open %s: %s" % \ 299 (self.state_filename, e)) 277 300 except pickle.UnpicklingError, e: 278 print >>sys.stderr, "Unpickling failed: %s" % e301 self.log.warning("No saved state: Unpickling failed: %s" % e) 279 302 280 303 def scp_file(self, file, user, host, dest=""): … … 284 307 285 308 scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)] 286 287 trace = self.trace_file 288 if not trace: 289 try: 290 trace = open("/dev/null", "w") 291 except IOError: 292 raise service_error(service_error.internal, 293 "Cannot open /dev/null??"); 294 309 rv = 0 310 311 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 295 312 if not self.debug: 296 313 rv = call(scp_cmd, stdout=trace, stderr=trace) 297 else:298 if self.trace_file:299 print >>self.trace_file, "debug [scp_file]: %s" % \300 " ".join(scp_cmd)301 rv = 0302 314 303 315 return rv == 0 … … 306 318 sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd) 307 319 308 trace = self.trace_file 309 if not trace: 310 try: 311 trace = open("/dev/null", "w") 312 except IOError: 313 raise service_error(service_error.internal, 314 "Cannot open /dev/null??"); 315 320 self.log.debug("[ssh_cmd]: %s" % sh_str) 316 321 if not self.debug: 317 322 sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace) 318 323 return sub.wait() == 0 319 324 else: 320 if self.trace_file:321 print >>self.trace_file,"debug [ssh_cmd]: %s" % sh_str322 325 return True 323 326 … … 365 368 cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid] 366 369 367 368 if self.trace_file: 369 print >>self.trace_file, "status request: %s" % " ".join(cmd) 370 371 if not self.trace_file: 372 try: 373 st_file = open("/dev/null", "w") 374 except IOError: 375 raise service_error(service_error.internal, 376 "Cannot open /dev/null!?") 377 else: 378 st_file = self.trace_file 379 380 status = Popen(cmd, stdout=PIPE, stderr=st_file) 370 self.log.debug("[start_segment]: %s"% " ".join(cmd)) 371 dev_null = None 372 try: 373 dev_null = open("/dev/null", "a") 374 except IOError, e: 375 self.log.error("[start_segment]: can't open /dev/null: %s" %e) 376 377 status = Popen(cmd, stdout=PIPE, stderr=dev_null) 381 378 for line in status.stdout: 382 379 m = state_re.match(line) … … 392 389 raise service_error(service_error.internal, 393 390 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 394 if self.trace_file: 395 print >>self.trace_file, "%s: %s" % (tb, state)396 print >>self.trace_file, "transferring experiment to %s" % tb391 392 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 393 self.log.info("[start_segment]:transferring experiment to %s" % tb) 397 394 398 395 if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host): … … 433 430 "%s/rpms" % tmpdir, tarfiles_dir): 434 431 return False 435 if self.trace_file: 436 print >>self.trace_file, "Modifying %s on %s" % (eid, tb) 432 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 437 433 if not self.ssh_cmd(user, host, 438 434 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ … … 459 455 "%s/rpms" % tmpdir, tarfiles_dir): 460 456 return False 461 if self.trace_file: 462 print >>self.trace_file, "Modifying %s on %s" % (eid, tb) 457 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 463 458 if not self.ssh_cmd(user, host, 464 459 "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile), 465 460 "modexp"): 466 461 return False 467 if self.trace_file: 468 print >>self.trace_file, "Swapping %s in on %s" % (eid, tb) 462 self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb)) 469 463 if not self.ssh_cmd(user, host, 470 464 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), … … 486 480 "%s/rpms" % tmpdir, tarfiles_dir): 487 481 return False 488 if self.trace_file: 489 print >>self.trace_file, "Creating %s on %s" % (eid, tb) 482 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 490 483 if not self.ssh_cmd(user, host, 491 484 "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \ … … 502 495 proj_dir): 503 496 return False 504 if self.trace_file: 505 print >>self.trace_file, "Swapping %s in on %s" % (eid, tb) 497 self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb)) 506 498 if not self.ssh_cmd(user, host, 507 499 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), … … 510 502 return True 511 503 else: 512 if self.trace_file: 513 print >>self.trace_file, "unknown state %s" % state 504 self.log.debug("[start_segment]:unknown state %s" % state) 514 505 return False 515 506 … … 519 510 pid = tbparams[tb]['project'] 520 511 521 if self.trace_file: 522 print >>self.trace_file, "Stopping %s on %s" % (eid, tb) 512 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 523 513 return self.ssh_cmd(user, host, 524 514 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) … … 534 524 t = type.lower(); 535 525 if t not in valid_types: raise ValueError 536 537 trace = self.trace_file 538 if not trace: 539 try: 540 trace = open("/dev/null", "w") 541 except IOError: 542 raise service_error(service_error.internal, 543 "Cannot open /dev/null??"); 526 cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] 527 528 try: 529 trace = open("/dev/null", "w") 530 except IOError: 531 raise service_error(service_error.internal, 532 "Cannot open /dev/null??"); 544 533 545 534 # May raise CalledProcessError 546 rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest],547 535 self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) 536 rv = call(cmd, stdout=trace, stderr=trace) 548 537 if rv != 0: 549 538 raise service_error(service_error.internal, … … 1007 996 else: 1008 997 # XXX 1009 print >>sys.stderr, \ 1010 "Ignoring gateways for unknown testbed %s" \ 1011 % self.current_gateways 998 self.log.error("[gateways]: Ignoring gateways for " + \ 999 "unknown testbed %s" % self.current_gateways) 1012 1000 self.current_gateways = None 1013 1001 return True … … 1052 1040 "Error creating seer config") 1053 1041 else: 1054 if self.trace_file: 1055 print >>sys.stderr, "No control gateway for %s" %\ 1056 self.current_gateways 1042 debug.error("[gateways]: No control gateway for %s" %\ 1043 self.current_gateways) 1057 1044 self.current_gateways = None 1058 1045 else: … … 1374 1361 "Swap in failed on %s" % ",".join(failed)) 1375 1362 else: 1376 if self.trace_file: 1377 print >>self.trace_file, "Experiment started" 1363 self.log.info("[start_segment]: Experiment %s started" % eid) 1378 1364 1379 1365 # Generate an ID for the experiment (slice) and a certificate that the 1380 1366 # allocator can use to prove they own it. We'll ship it back through 1381 1367 # the encrypted connection. 1382 (expid, expcert) = generate_fedid("test", dir=tmpdir, 1383 trace=self.trace_file) 1384 1385 if self.trace_file: 1386 print >>self.trace_file, "removing %s" % tmpdir 1368 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1369 1370 self.log.debug("[start_experiment]: removing %s" % tmpdir) 1387 1371 1388 1372 # Walk up tmpdir, deleting as we go -
fedd/fedd_util.py
r8ecfbad r11a08b0 286 286 287 287 288 def generate_fedid(subj, bits=2048, trace=None, dir=None):288 def generate_fedid(subj, bits=2048, log=None, dir=None, trace=None): 289 289 """ 290 290 Create a new certificate and derive a fedid from it. … … 306 306 "-x509", "-days", "30", "-out", certpath] 307 307 308 if trace:309 print >>trace, "calling %s" % " ".join(cmd)310 call_out = trace 311 else:312 313 308 if log: 309 log.debug("[generate_fedid] %s" % " ".join(cmd)) 310 311 if trace: call_out = trace 312 else: call_out = open("/dev/null", "w") 313 314 314 rv = subprocess.call(cmd, stdout=call_out, stderr=call_out) 315 315 if rv == 0:
Note: See TracChangeset
for help on using the changeset viewer.