Changeset 0ea11af
- Timestamp:
- Oct 9, 2008 2:08:28 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 0b466d1
- Parents:
- 11a08b0
- Location:
- fedd
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/fedd.py
r11a08b0 r0ea11af 21 21 from signal import signal, pause, SIGINT, SIGTERM 22 22 from select import select 23 from time import sleep 23 24 import logging 24 25 … … 30 31 31 32 class fedd_server(ThreadingSSLServer): 33 """ 34 Interface the fedd services to the XMLRPC and SOAP interfaces 35 """ 32 36 def __init__(self, ME, handler, ssl_ctx, impl): 37 """ 38 Create an SSL server that handles the transport in handler using the 39 credentials in ssl_ctx, and interfacing to the implementation of fedd 40 services in fedd. ME is the host port pair on which to bind. 41 """ 33 42 ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx) 34 43 self.impl = impl … … 37 46 38 47 class fedd_soap_handler(BaseHTTPRequestHandler): 48 """ 49 Standard connection between SOAP and the fedd services in impl. 50 51 Much of this is boilerplate from 52 http://www.xml.com/pub/a/ws/2004/01/20/salz.html 53 """ 39 54 server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version 40 55 … … 99 114 100 115 def soap_dispatch(self, method, req, fid): 116 """ 117 The connection to the implementation, using the method maps 118 119 The implementation provides a mapping from SOAP method name to the 120 method in the implementation that provides the service. 121 """ 101 122 if self.server.soap_methods.has_key(method): 102 123 try: … … 118 139 119 140 class fedd_xmlrpc_handler(BaseHTTPRequestHandler): 141 """ 142 Standard connection between XMLRPC and the fedd services in impl. 143 144 Much of this is boilerplate from 145 http://www.xml.com/pub/a/ws/2004/01/20/salz.html 146 """ 120 147 server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version 121 148 … … 157 184 158 185 def xmlrpc_dispatch(self, method, req, fid): 186 """ 187 The connection to the implementation, using the method maps 188 189 The implementation provides a mapping from XMLRPC method name to the 190 method in the implementation that provides the service. 191 """ 159 192 if self.server.xmlrpc_methods.has_key(method): 160 193 try: … … 193 226 const=sys.stderr, help="Print SOAP exchange to stderr") 194 227 195 servers_active = True 196 197 log_params = {\ 198 'format': "%(asctime)s %(levelname)-8s %(message)s",\ 199 'datefmt': '%a, %d %b %Y %H:%M:%S'\ 200 } 228 servers_active = True # Sub-servers run while this is True 229 services = [ ] # Service descriptions 230 servers = [ ] # fedd_server instances instantiated from services 231 servers_lock = Lock() # Lock to manipulate servers from sub-server threads 201 232 202 233 def shutdown(sig, frame): 234 """ 235 On a signal, stop running sub-servers. 236 237 This is connected to signals below 238 """ 203 239 global servers_active, flog 204 240 servers_active = False … … 206 242 207 243 def run_server(s): 244 """ 245 Operate a subserver, shutting down when servers_active is false. 246 247 Each server (that is host/port/transport triple) has a thread running this 248 function, so each can handle requests independently. They all call in to 249 the same implementation, which must manage its own synchronization. 250 """ 208 251 global servers_active # Not strictly needed: servers_active is only read 209 210 if s: 211 while servers_active: 212 i, o, e = select((s,), (), (), 5.0) 213 if s in i: 214 s.handle_request() 215 216 services = [ ] 217 servers = [ ] 252 global servers # List of active servers 253 global servers_lock # Lock to manipulate servers 254 255 while servers_active: 256 i, o, e = select((s,), (), (), 5.0) 257 if s in i: s.handle_request() 258 259 # Done. Remove us from the list 260 servers_lock.acquire() 261 servers.remove(s) 262 servers_lock.release() 218 263 219 264 opts, args = fedd_opts().parse_args() 220 265 266 # Logging setup 221 267 flog = logging.getLogger("fedd") 222 ffmt = logging.Formatter("%(asctime)s %( levelname)-8s %(message)s",223 '% a, %d %b %Y%H:%M:%S')268 ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s", 269 '%d %b %y %H:%M:%S') 224 270 225 271 if opts.logfile: fh = logging.FileHandler(opts.logfile) … … 236 282 flog.addHandler(fh) 237 283 238 284 # Initialize the implementation 239 285 if opts.configfile != None: 240 286 try: … … 247 293 sys.exit("--configfile is required") 248 294 249 SOAP_port = (opts.host, opts.port)250 251 295 if impl.cert_file == None: 252 296 sys.exit("Must supply certificate file (probably in config)") 253 297 298 # Create the SSL credentials 254 299 ctx = None 255 300 while ctx == None: … … 261 306 raise 262 307 308 # Walk through the service descriptions and pack them into the services list. 309 # That list has the form (transport (host, port)). 263 310 if opts.services: 264 311 for s in opts.services: … … 275 322 services.append((opts.transport, (opts.host, opts.port))) 276 323 324 # Create the servers and put them into a list 277 325 for s in services: 278 326 if s[0] == "soap": … … 282 330 else: flog.warning("Unknown transport: %s" % s[0]) 283 331 332 # Make sure that there are no malformed servers in the list 333 services = [ s for s in services if s ] 334 335 # Catch signals 284 336 signal(SIGINT, shutdown) 285 337 signal(SIGTERM, shutdown) 286 338 339 # Start the servers 287 340 for s in servers: 288 if s: 289 t = Thread(target=run_server, args=(s,)) 290 t.start() 291 292 pause() 341 Thread(target=run_server, args=(s,)).start() 342 343 # Main thread waits for signals 344 while servers_active: 345 pause() 346 347 #Once shutdown starts wait for all the servers to terminate. 348 while True: 349 servers_lock.acquire() 350 if len(servers) == 0: 351 servers_lock.release() 352 flog.info("All servers exited. Terminating") 353 sys.exit(0) 354 servers_lock.release() 355 sleep(1) 356 -
fedd/fedd_access.py
r11a08b0 r0ea11af 24 24 import logging 25 25 26 27 # Make log messages disappear if noone configures a fedd logger 26 28 class nullHandler(logging.Handler): 27 29 def emit(self, record): pass … … 95 97 self.dynamic_projects_cert_pwd) 96 98 97 98 if config.dynamic_projects_url == None: 99 self.allocate_project = \ 100 fedd_allocate_project_local(config.dynamic_projects, 101 config.dynamic_projects_url, proj_certs) 102 else: 103 self.allocate_project = \ 104 fedd_allocate_project_remote(config.dynamic_projects, 105 config.dynamic_projects_url, proj_certs) 106 107 self.soap_handlers = {\ 99 self.soap_services = {\ 108 100 'RequestAccess': make_soap_handler(\ 109 101 RequestAccessRequestMessage.typecode,\ … … 111 103 "RequestAccessResponseBody")\ 112 104 } 113 self.xmlrpc_ handlers = {\105 self.xmlrpc_services = {\ 114 106 'RequestAccess': make_xmlrpc_handler(\ 115 107 self.RequestAccess, "RequestAccessResponseBody")\ 116 108 } 109 110 111 if config.dynamic_projects_url == None: 112 self.allocate_project = \ 113 fedd_allocate_project_local(config.dynamic_projects, 114 config.dynamic_projects_url, proj_certs) 115 else: 116 self.allocate_project = \ 117 fedd_allocate_project_remote(config.dynamic_projects, 118 config.dynamic_projects_url, proj_certs) 119 120 # If the project allocator exports services, put them in this object's 121 # maps so that classes that instantiate this can call the services. 122 self.soap_services.update(self.allocate_project.soap_services) 123 self.xmlrpc_services.update(self.allocate_project.xmlrpc_services) 117 124 118 125 def dump_state(self): … … 393 400 394 401 def RequestAccess(self, req, fid): 395 402 """ 403 Handle the access request. Proxy if not for us. 404 405 Parse out the fields and make the allocations or rejections if for us, 406 otherwise, assuming we're willing to proxy, proxy the request out. 407 """ 408 409 # The dance to get into the request body 396 410 if req.has_key('RequestAccessRequestBody'): 397 411 req = req['RequestAccessRequestBody'] … … 488 502 489 503 def get_soap_services(self): 490 return self.soap_ handlers504 return self.soap_services 491 505 492 506 def get_xmlrpc_services(self): 493 return self.xmlrpc_ handlers494 507 return self.xmlrpc_services 508 -
fedd/fedd_allocate_project.py
r11a08b0 r0ea11af 25 25 26 26 27 # Configure loggers to dump to /dev/null which avoids errors if calling classes 28 # don't configure them. 27 29 class nullHandler(logging.Handler): 28 30 def emit(self, record): pass … … 34 36 35 37 class fedd_allocate_project_local: 38 """ 39 Allocate projects on this machine in response to an access request. 40 """ 36 41 def __init__(self, dp=False, url=None, certs=None): 37 42 """ … … 45 50 self.grantnodetype = '/usr/testbed/sbin/grantnodetype' 46 51 self.log = logging.getLogger("fedd.allocate.local") 52 53 # Internal services are SOAP only 54 self.soap_services = {\ 55 "AllocateProject": make_soap_handler(\ 56 AllocateProjectRequestMessage.typecode,\ 57 self.dynamic_project, AllocateProjectResponseMessage,\ 58 "AllocateProjectResponseBody")\ 59 } 60 self.xmlrpc_services = { } 47 61 48 62 def random_string(self, s, n=3): … … 195 209 196 210 class fedd_allocate_project_remote: 211 """ 212 Allocate projects on a remote machine using the internal SOAP interface 213 """ 197 214 def __init__(self, dp=False, url=None, certs=None): 198 215 """ … … 208 225 self.cert_file, self.trusted_certs, self.cert_pwd = \ 209 226 (None, None, None) 210 227 self.soap_services = { } 228 self.xmlrpc_services = { } 229 211 230 def dynamic_project(self, req, fedid=None): 212 231 """ -
fedd/fedd_deter_impl.py
r11a08b0 r0ea11af 21 21 """ 22 22 if config_path: 23 self.soap_ methods = { }24 self.xmlrpc_ methods = { }23 self.soap_services = { } 24 self.xmlrpc_services = { } 25 25 config = config_file(config_path) 26 26 … … 32 32 self.experiment = fedd_experiment_control_local(config) 33 33 34 self.soap_ methods.update(self.access.get_soap_services())35 self.soap_ methods.update(self.experiment.get_soap_services())34 self.soap_services.update(self.access.soap_services) 35 self.soap_services.update(self.experiment.soap_services) 36 36 37 self.xmlrpc_ methods.update(self.access.get_xmlrpc_services())38 self.xmlrpc_ methods.update(self.experiment.get_xmlrpc_services())37 self.xmlrpc_services.update(self.access.xmlrpc_services) 38 self.xmlrpc_services.update(self.experiment.xmlrpc_services) 39 39 40 40 def get_soap_services(self): 41 return self.soap_ methods41 return self.soap_services 42 42 43 43 def get_xmlrpc_services(self): 44 return self.xmlrpc_ methods44 return self.xmlrpc_services 45 45 46 46 def new_feddservice(configfile): -
fedd/fedd_experiment_control.py
r11a08b0 r0ea11af 39 39 40 40 class fedd_experiment_control_local: 41 """ 42 Control of experiments that this system can directly access. 43 44 Includes experiment creation, termination and information dissemination. 45 Thred safe. 46 """ 41 47 scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl", 42 48 "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl", … … 44 50 45 51 class thread_pool: 52 """ 53 A class to keep track of a set of threads all invoked for the same 54 task. Manages the mutual exclusion of the states. 55 """ 46 56 def __init__(self): 57 """ 58 Start a pool. 59 """ 47 60 self.changed = Condition() 48 61 self.started = 0 … … 50 63 51 64 def acquire(self): 65 """ 66 Get the pool's lock. 67 """ 52 68 self.changed.acquire() 53 69 54 70 def release(self): 71 """ 72 Release the pool's lock. 73 """ 55 74 self.changed.release() 56 75 57 76 def wait(self, timeout = None): 77 """ 78 Wait for a pool thread to start or stop. 79 """ 58 80 self.changed.wait(timeout) 59 81 60 82 def start(self): 83 """ 84 Called by a pool thread to report starting. 85 """ 61 86 self.changed.acquire() 62 87 self.started += 1 … … 65 90 66 91 def terminate(self): 92 """ 93 Called by a pool thread to report finishing. 94 """ 67 95 self.changed.acquire() 68 96 self.terminated += 1 … … 71 99 72 100 def clear(self): 101 """ 102 Clear all pool data. 103 """ 73 104 self.changed.acquire() 74 105 self.started = 0 … … 78 109 79 110 class pooled_thread(Thread): 111 """ 112 One of a set of threads dedicated to a specific task. Uses the 113 thread_pool class above for coordination. 114 """ 80 115 def __init__(self, group=None, target=None, name=None, args=(), 81 116 kwargs={}, pdata=None, trace_file=None): 82 117 Thread.__init__(self, group, target, name, args, kwargs) 83 self.rv = None 84 self.exception = None 85 self.target=target 86 self.args = args 87 self.kwargs = kwargs 88 self.pdata = pdata 89 self.trace_file = trace_file 118 self.rv = None # Return value of the ops in this thread 119 self.exception = None # Exception that terminated this thread 120 self.target=target # Target function to run on start() 121 self.args = args # Args to pass to target 122 self.kwargs = kwargs # Additional kw args 123 self.pdata = pdata # thread_pool for this class 124 # Logger for this thread 125 self.log = logging.getLogger("fedd.experiment_control") 90 126 91 127 def run(self): 128 """ 129 Emulate Thread.run, except add pool data manipulation and error 130 logging. 131 """ 92 132 if self.pdata: 93 133 self.pdata.start() … … 98 138 except service_error, s: 99 139 self.exception = s 100 if self.trace_file: 101 logging.error("Thread exception: %s %s" % \ 102 (s.code_string(), s.desc)) 140 self.log.error("Thread exception: %s %s" % \ 141 (s.code_string(), s.desc)) 103 142 except: 104 143 self.exception = sys.exc_info()[1] 105 if self.trace_file: 106 logging.error(("Unexpected thread exception: %s" +\ 107 "Trace %s") % (self.exception,\ 108 traceback.format_exc())) 144 self.log.error(("Unexpected thread exception: %s" +\ 145 "Trace %s") % (self.exception,\ 146 traceback.format_exc())) 109 147 if self.pdata: 110 148 self.pdata.terminate() 111 149 112 150 def __init__(self, config=None): 151 """ 152 Intialize the various attributes, most from the config object 153 """ 113 154 self.scripts = fedd_experiment_control_local.scripts 114 155 self.thread_with_rv = fedd_experiment_control_local.pooled_thread … … 121 162 # Walk through the various relevant certificat specifying config 122 163 # attributes until the local certificate attributes can be resolved. 123 # The walk is from omst specific to most general specification.164 # The walk is from most specific to most general specification. 124 165 for p in ("create_experiment_", "proxy_", ""): 125 166 filen = "%scert_file" % p … … 200 241 config.experiment_log) 201 242 202 # Grab saved state 243 # Grab saved state. OK to do this w/o locking because it's read only 244 # and only one thread should be in existence that can see self.state at 245 # this point. 203 246 if self.state_filename: 204 247 self.read_state() … … 210 253 "%s/%s not in local script dir" % (self.scripts_dir, s)) 211 254 212 self.soap_handlers = {\ 255 # Dispatch tables 256 self.soap_services = {\ 213 257 'Create': make_soap_handler(\ 214 258 CreateRequestMessage.typecode, … … 238 282 } 239 283 240 self.xmlrpc_ handlers = {\284 self.xmlrpc_services = {\ 241 285 'Create': make_xmlrpc_handler(\ 242 286 getattr(self, "create_experiment"), … … 257 301 258 302 def get_soap_services(self): 259 return self.soap_ handlers303 return self.soap_services 260 304 261 305 def get_xmlrpc_services(self): 262 return self.xmlrpc_ handlers306 return self.xmlrpc_services 263 307 264 308 def copy_file(self, src, dest, size=1024): … … 278 322 # Call while holding self.state_lock 279 323 def write_state(self): 324 """ 325 Write a new copy of experiment state after copying the existing state 326 to a backup. 327 328 State format is a simple pickling of the state dictionary. 329 """ 280 330 if os.access(self.state_filename, os.W_OK): 281 331 self.copy_file(self.state_filename, \ … … 292 342 # Call while holding self.state_lock 293 343 def read_state(self): 344 """ 345 Read a new copy of experiment state. Old state is overwritten. 346 347 State format is a simple pickling of the state dictionary. 348 """ 294 349 try: 295 350 f = open(self.state_filename, "r") … … 303 358 def scp_file(self, file, user, host, dest=""): 304 359 """ 305 scp a file to the remote host. 360 scp a file to the remote host. If debug is set the action is only 361 logged. 306 362 """ 307 363 … … 316 372 317 373 def ssh_cmd(self, user, host, cmd, wname=None): 374 """ 375 Run a remote command on host as user. If debug is set, the action is 376 only logged. 377 """ 318 378 sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd) 319 379 … … 326 386 327 387 def ship_scripts(self, host, user, dest_dir): 388 """ 389 Copy the federation scripts (fedkit) to the a federant. 390 """ 328 391 if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir): 329 392 for s in self.scripts: … … 336 399 337 400 def ship_configs(self, host, user, src_dir, dest_dir): 401 """ 402 Copy federant-specific configuration files to the federant. 403 """ 338 404 if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir): 339 405 return False … … 353 419 354 420 def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0): 421 """ 422 Start a sub-experiment on a federant. 423 424 Get the current state, modify or create as appropriate, ship data and 425 configs and start the experiment. There are small ordering differences 426 based on the initial state of the sub-experiment. 427 """ 428 # ops node in the federant 355 429 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 356 user = tbparams[tb]['user'] 357 pid = tbparams[tb]['project'] 430 user = tbparams[tb]['user'] # federant user 431 pid = tbparams[tb]['project'] # federant project 358 432 # XXX 359 433 base_confs = ( "hosts",) 360 tclfile = "%s.%s.tcl" % (eid, tb) 361 expinfo_exec = "/usr/testbed/bin/expinfo" 434 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 435 # command to test experiment state 436 expinfo_exec = "/usr/testbed/bin/expinfo" 437 # Configuration directories on the remote machine 362 438 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 363 439 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 364 440 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 441 # Regular expressions to parse the expinfo response 365 442 state_re = re.compile("State:\s+(\w+)") 366 443 no_exp_re = re.compile("^No\s+such\s+experiment") 367 state = None 444 state = None # Experiment state parsed from expinfo 445 # The expinfo ssh command 368 446 cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid] 369 447 448 # Get status 370 449 self.log.debug("[start_segment]: %s"% " ".join(cmd)) 371 450 dev_null = None … … 383 462 if m: state = "none" 384 463 rv = status.wait() 464 385 465 # If the experiment is not present the subcommand returns a non-zero 386 466 # return value. If we successfully parsed a "none" outcome, ignore the … … 506 586 507 587 def stop_segment(self, tb, eid, tbparams): 588 """ 589 Stop a sub experiment by calling swapexp on the federant 590 """ 508 591 user = tbparams[tb]['user'] 509 592 host = tbparams[tb]['host'] … … 541 624 542 625 def gentopo(self, str): 626 """ 627 Generate the topology dtat structure from the splitter's XML 628 representation of it. 629 630 The topology XML looks like: 631 <experiment> 632 <nodes> 633 <node><vname></vname><ips>ip1:ip2</ips></node> 634 </nodes> 635 <lans> 636 <lan> 637 <vname></vname><vnode></vnode><ip></ip> 638 <bandwidth></bandwidth><member>node:port</member> 639 </lan> 640 </lans> 641 """ 543 642 class topo_parse: 643 """ 644 Parse the topology XML and create the dats structure. 645 """ 544 646 def __init__(self): 647 # Typing of the subelements for data conversion 545 648 self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member') 546 649 self.int_subelements = ( 'bandwidth',) 547 650 self.float_subelements = ( 'delay',) 651 # The final data structure 548 652 self.nodes = [ ] 549 653 self.lans = [ ] 550 self.element = { }551 654 self.topo = { \ 552 655 'node': self.nodes,\ 553 656 'lan' : self.lans,\ 554 657 } 555 self.chars = "" 658 self.element = { } # Current element being created 659 self.chars = "" # Last text seen 556 660 557 661 def end_element(self, name): 662 # After each sub element the contents is added to the current 663 # element or to the appropriate list. 558 664 if name == 'node': 559 665 self.nodes.append(self.element) … … 782 888 783 889 class current_testbed: 890 """ 891 Object for collecting the current testbed description. The testbed 892 description is saved to a file with the local testbed variables 893 subsittuted line by line. 894 """ 784 895 def __init__(self, eid, tmpdir): 785 896 self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)") … … 870 981 871 982 class allbeds: 983 """ 984 Process the Allbeds section. Get access to each federant and save the 985 parameters in tbparams 986 """ 872 987 def __init__(self, get_access): 873 988 self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds") … … 1090 1205 1091 1206 class shunt_to_file: 1207 """ 1208 Simple class to write data between two regexps to a file. 1209 """ 1092 1210 def __init__(self, begin, end, filename): 1211 """ 1212 Begin shunting on a match of begin, stop on end, send data to 1213 filename. 1214 """ 1093 1215 self.begin = re.compile(begin) 1094 1216 self.end = re.compile(end) … … 1098 1220 1099 1221 def __call__(self, line): 1222 """ 1223 Call this on each line in the input that may be shunted. 1224 """ 1100 1225 if not self.in_shunt: 1101 1226 if self.begin.match(line): … … 1121 1246 1122 1247 class shunt_to_list: 1248 """ 1249 Same interface as shunt_to_file. Data collected in self.list, one list 1250 element per line. 1251 """ 1123 1252 def __init__(self, begin, end): 1124 1253 self.begin = re.compile(begin) … … 1142 1271 1143 1272 class shunt_to_string: 1273 """ 1274 Same interface as shunt_to_file. Data collected in self.str, all in 1275 one string. 1276 """ 1144 1277 def __init__(self, begin, end): 1145 1278 self.begin = re.compile(begin) … … 1163 1296 1164 1297 def create_experiment(self, req, fid): 1298 """ 1299 The external interface to experiment creation called from the 1300 dispatcher. 1301 1302 Creates a working directory, splits the incoming description using the 1303 splitter script and parses out the avrious subsections using the 1304 lcasses above. Once each sub-experiment is created, use pooled threads 1305 to instantiate them and start it all up. 1306 """ 1165 1307 try: 1166 1308 tmpdir = tempfile.mkdtemp(prefix="split-") … … 1209 1351 while (self.state.has_key(eid)): 1210 1352 eid += random.choice(string.ascii_letters) 1353 # To avoid another thread picking this localname 1211 1354 self.state[eid] = "placeholder" 1212 1355 self.state_lock.release() … … 1220 1363 for i in range(0,5): 1221 1364 eid += random.choice(string.ascii_letters) 1365 # To avoid another thread picking this localname 1222 1366 self.state[eid] = "placeholder" 1223 1367 self.state_lock.release() … … 1242 1386 tclparser = Popen(tclcmd, stdout=PIPE) 1243 1387 1244 allocated = { } 1245 started = { } 1246 1388 allocated = { } # Testbeds we can access 1389 started = { } # Testbeds where a sub-experiment started 1390 # successfully 1391 1392 # Objects to parse the splitter output (defined above) 1247 1393 parse_current_testbed = self.current_testbed(eid, tmpdir) 1248 1394 parse_allbeds = self.allbeds(self.get_access) … … 1258 1404 "^#\s+End\s+rpms") 1259 1405 1406 # Worling on the split data 1260 1407 for line in tclparser.stdout: 1261 1408 line = line.rstrip() … … 1278 1425 "Bad tcl parse? %s" % line) 1279 1426 1427 # Virtual topology and visualization 1280 1428 vtopo = self.gentopo(parse_vtopo.str) 1281 1429 if not vtopo: … … 1390 1538 } 1391 1539 1540 # Insert the experiment into our state and update the disk copy 1392 1541 self.state_lock.acquire() 1393 1542 self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \ … … 1412 1561 1413 1562 def get_vtopo(self, req, fid): 1563 """ 1564 Return the stored virtual topology for this experiment 1565 """ 1414 1566 rv = None 1415 1567 … … 1442 1594 1443 1595 def get_vis(self, req, fid): 1596 """ 1597 Return the stored visualization for this experiment 1598 """ 1444 1599 rv = None 1445 1600 … … 1472 1627 1473 1628 def get_info(self, req, fid): 1629 """ 1630 Return all the stored info about this experiment 1631 """ 1474 1632 rv = None 1475 1633 … … 1504 1662 1505 1663 def terminate_experiment(self, req, fid): 1664 """ 1665 Swap this experiment out on the federants and delete the shared 1666 information 1667 """ 1506 1668 tbparams = { } 1507 1669 req = req.get('TerminateRequestBody', None) … … 1584 1746 self.state_lock.release() 1585 1747 raise service_error(service_error.req, "No saved state") 1586 1587 1588 1589 1590 if __name__ == '__main__':1591 from optparse import OptionParser1592 1593 parser = OptionParser()1594 parser.add_option('-d', '--debug', dest='debug', default=False,1595 action='store_true', help='print actions rather than take them')1596 parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse')1597 parser.add_option('-m', '--master', dest='master',1598 help='testbed label for matster testbd')1599 parser.add_option('-t', '--trace', dest='trace', default=None,1600 help='file to print intermediate messages to')1601 parser.add_option('-T', '--trace-stderr', dest='trace',1602 action='store_const',const=sys.stderr,1603 help='file to print intermediate messages to')1604 opts, args = parser.parse_args()1605 1606 trace_file = None1607 if opts.trace:1608 try:1609 trace_file = open(opts.trace, 'w')1610 except IOError:1611 print >>sys.stderr, "Can't open trace file"1612 1613 if opts.debug:1614 if not trace_file:1615 trace_file = sys.stderr1616 1617 if opts.tcl != None:1618 try:1619 f = open(opts.tcl, 'r')1620 content = ''.join(f)1621 f.close()1622 except IOError, e:1623 sys.exit("Can't read %s: %s" % (opts.tcl, e))1624 else:1625 sys.exit("Must specify a file name")1626 1627 if not opts.master:1628 sys.exit("Must supply master tb label (--master)");1629 1630 obj = fedd_create_experiment_local(1631 debug=opts.debug,1632 scripts_dir="/users/faber/testbed/federation",1633 cert_file="./fedd_client.pem", cert_pwd="faber",1634 ssh_pubkey_file='/users/faber/.ssh/id_rsa.pub',1635 trusted_certs="./cacert.pem",1636 tbmap = {1637 'deter':'https://users.isi.deterlab.net:23235',1638 'emulab':'https://users.isi.deterlab.net:23236',1639 'ucb':'https://users.isi.deterlab.net:23237',1640 },1641 trace_file=trace_file1642 )1643 rv = obj.create_experiment( {\1644 'experimentdescription' : content,1645 'master' : opts.master,1646 'user': [ {'userID' : { 'localname' : 'faber' } } ],1647 },1648 None)1649 1650 print rv
Note: See TracChangeset
for help on using the changeset viewer.