Changeset cc8d8e9 for fedd/federation/experiment_control.py
- Timestamp:
- Aug 28, 2009 6:07:42 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 6c57fe9
- Parents:
- 4c8a0b7
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r4c8a0b7 rcc8d8e9 196 196 call_RequestAccess = service_caller('RequestAccess') 197 197 call_ReleaseAccess = service_caller('ReleaseAccess') 198 call_StartSegment = service_caller('StartSegment') 198 199 call_Ns2Split = service_caller('Ns2Split') 199 200 … … 230 231 self.trusted_certs = config.get("experiment_control", "trusted_certs") \ 231 232 or config.get("globals", "trusted_certs") 233 234 # XXX: 235 self.repodir = '/usr/local/etc/fedd/repo' 232 236 233 237 self.exp_stem = "fed-stem" … … 403 407 State format is a simple pickling of the state dictionary. 404 408 """ 409 410 def get_experiment_id(state): 411 """ 412 Pull the fedid experimentID out of the saved state. This is kind 413 of a gross walk through the dict. 414 """ 415 416 if state.has_key('experimentID'): 417 for e in state['experimentID']: 418 if e.has_key('fedid'): 419 return e['fedid'] 420 else: 421 return None 422 else: 423 return None 424 425 def get_alloc_ids(state): 426 """ 427 Pull the fedids of the identifiers of each allocation from the 428 state. Again, a dict dive that's best isolated. 429 """ 430 431 return [ f['allocID']['fedid'] 432 for f in state.get('federant',[]) \ 433 if f.has_key('allocID') and \ 434 f['allocID'].has_key('fedid')] 435 436 405 437 try: 406 438 f = open(self.state_filename, "r") … … 415 447 "Unpickling failed: %s") % e) 416 448 417 for k in self.state.keys():449 for s in self.state.values(): 418 450 try: 419 # This list should only have one element in it, but phrasing it 420 # as a for loop doesn't cost much, really. We have to find the 421 # fedid elements anyway. 422 for eid in [ f['fedid'] \ 423 for f in self.state[k]['experimentID']\424 if f.has_key('fedid') ]: 425 self.auth.set_attribute( self.state[k]['owner'], eid)451 452 eid = get_experiment_id(s) 453 if eid : 454 # Give the owner rights to the experiment 455 self.auth.set_attribute(s['owner'], eid) 456 # And holders of the eid as well 457 self.auth.set_attribute(eid, eid) 426 458 # allow overrides to control experiments as well 427 459 for o in self.overrides: 428 460 self.auth.set_attribute(o, eid) 461 # Set permissions to allow reading of the software repo, if 462 # any, as well. 463 for a in get_alloc_ids(s): 464 self.auth.set_attribute(a, 'repo/%s' % eid) 465 else: 466 raise KeyError("No experiment id") 429 467 except KeyError, e: 430 468 self.log.warning("[read_state]: State ownership or identity " +\ … … 970 1008 nodes = [ n['vname'] for n in topo['node'] ] 971 1009 topo_lans = topo['lan'] 972 except KeyError :973 raise service_error(service_error.internal, "Bad topology")1010 except KeyError, e: 1011 raise service_error(service_error.internal, "Bad topology: %s" %e) 974 1012 975 1013 lans = { } … … 2103 2141 2104 2142 return rv 2143 2144 class new_start_segment: 2145 def __init__(self, debug=False, log=None, cert_file=None, 2146 cert_pwd=None, trusted_certs=None, caller=None): 2147 self.log = log 2148 self.debug = debug 2149 self.cert_file = cert_file 2150 self.cert_pwd = cert_pwd 2151 self.trusted_certs = None 2152 self.caller = caller 2153 2154 def __call__(self, uri, aid, topo): 2155 req = { 2156 'allocID': { 'fedid' : aid }, 2157 'segmentdescription': { 2158 'topdldescription': topo.to_dict(), 2159 }, 2160 } 2161 2162 print "calling %s" % uri 2163 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 2164 self.trusted_certs) 2165 print r 2166 return True 2167 2168 2105 2169 2170 2171 def new_allocate_resources(self, allocated, master, eid, expid, expcert, 2172 tbparams, topo, tmpdir, alloc_log=None): 2173 started = { } # Testbeds where a sub-experiment started 2174 # successfully 2175 2176 # XXX 2177 fail_soft = False 2178 2179 log = alloc_log or self.log 2180 2181 thread_pool = self.thread_pool(self.nthreads) 2182 threads = [ ] 2183 2184 for tb in [ k for k in allocated.keys() if k != master]: 2185 # Create and start a thread to start the segment, and save it to 2186 # get the return value later 2187 thread_pool.wait_for_slot() 2188 uri = self.tbmap.get(tb, None) 2189 if not uri: 2190 raise service_error(service_error.internal, 2191 "Unknown testbed %s !?" % tb) 2192 2193 if tbparams[tb].has_key('allocID') and \ 2194 tbparams[tb]['allocID'].has_key('fedid'): 2195 aid = tbparams[tb]['allocID']['fedid'] 2196 else: 2197 raise service_error(service_error.internal, 2198 "No alloc id for testbed %s !?" % tb) 2199 2200 t = self.pooled_thread(\ 2201 target=self.new_start_segment(log=log, debug=self.debug, 2202 cert_file=self.cert_file, cert_pwd=self.cert_pwd, 2203 trusted_certs=self.trusted_certs, 2204 caller=self.call_StartSegment), 2205 args=(uri, aid, topo[tb]), name=tb, 2206 pdata=thread_pool, trace_file=self.trace_file) 2207 threads.append(t) 2208 t.start() 2209 2210 # Wait until all finish 2211 thread_pool.wait_for_all_done() 2212 2213 # If none failed, start the master 2214 failed = [ t.getName() for t in threads if not t.rv ] 2215 2216 if len(failed) == 0: 2217 uri = self.tbmap.get(master, None) 2218 if not uri: 2219 raise service_error(service_error.internal, 2220 "Unknown testbed %s !?" % master) 2221 2222 if tbparams[master].has_key('allocID') and \ 2223 tbparams[master]['allocID'].has_key('fedid'): 2224 aid = tbparams[master]['allocID']['fedid'] 2225 else: 2226 raise service_error(service_error.internal, 2227 "No alloc id for testbed %s !?" % master) 2228 starter = self.new_start_segment(log=log, debug=self.debug, 2229 cert_file=self.cert_file, cert_pwd=self.cert_pwd, 2230 trusted_certs=self.trusted_certs, 2231 caller=self.call_StartSegment) 2232 if not starter(uri, aid, topo[master]): 2233 failed.append(master) 2234 2235 succeeded = [tb for tb in allocated.keys() if tb not in failed] 2236 # If one failed clean up, unless fail_soft is set 2237 if failed and False: 2238 if not fail_soft: 2239 thread_pool.clear() 2240 for tb in succeeded: 2241 # Create and start a thread to stop the segment 2242 thread_pool.wait_for_slot() 2243 t = self.pooled_thread(\ 2244 target=self.stop_segment(log=log, 2245 keyfile=self.ssh_privkey_file, 2246 debug=self.debug), 2247 args=(tb, eid, tbparams), name=tb, 2248 pdata=thread_pool, trace_file=self.trace_file) 2249 t.start() 2250 # Wait until all finish 2251 thread_pool.wait_for_all_done() 2252 2253 # release the allocations 2254 for tb in tbparams.keys(): 2255 self.release_access(tb, tbparams[tb]['allocID']) 2256 # Remove the placeholder 2257 self.state_lock.acquire() 2258 self.state[eid]['experimentStatus'] = 'failed' 2259 if self.state_filename: self.write_state() 2260 self.state_lock.release() 2261 2262 log.error("Swap in failed on %s" % ",".join(failed)) 2263 return 2264 else: 2265 log.info("[start_segment]: Experiment %s active" % eid) 2266 2267 log.debug("[start_experiment]: removing %s" % tmpdir) 2268 2269 # Walk up tmpdir, deleting as we go 2270 for path, dirs, files in os.walk(tmpdir, topdown=False): 2271 for f in files: 2272 os.remove(os.path.join(path, f)) 2273 for d in dirs: 2274 os.rmdir(os.path.join(path, d)) 2275 os.rmdir(tmpdir) 2276 2277 # Insert the experiment into our state and update the disk copy 2278 self.state_lock.acquire() 2279 self.state[expid]['experimentStatus'] = 'active' 2280 self.state[eid] = self.state[expid] 2281 if self.state_filename: self.write_state() 2282 self.state_lock.release() 2283 return 2284 2106 2285 2107 2286 def new_create_experiment(self, req, fid): … … 2274 2453 2275 2454 allocated = { } # Testbeds we can access 2276 # XXX here's where we're working 2277 def out_topo(filename, t): 2278 try: 2279 f = open("/tmp/%s" % filename, "w") 2280 print >> f, "%s" % \ 2281 topdl.topology_to_xml(t, top="experiment") 2282 f.close() 2283 except IOError, e: 2284 raise service_error(service_error.internal, "Can't open file") 2285 2286 try: 2287 2288 top = topdl.topology_from_xml(file=split_data, top="experiment") 2289 subs = sorted(top.substrates, 2290 cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)), 2291 reverse=True) 2292 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) 2293 for s in subs: 2294 a = ips.allocate(len(s.interfaces)+2) 2295 if a : 2296 base, num = a 2297 if num < len(s.interfaces) +2 : 2298 raise service_error(service_error.internal, 2299 "Allocator returned wrong number of IPs??") 2300 else: 2301 raise service_error(service_error.req, 2302 "Cannot allocate IP addresses") 2303 2304 base += 1 2305 for i in s.interfaces: 2306 i.attribute.append( 2307 topdl.Attribute('ip4_address', 2308 "%s" % ip_addr(base))) 2309 base += 1 2310 2311 testbeds = set([ a.value for e in top.elements \ 2312 for a in e.attribute \ 2313 if a.attribute == 'testbed'] ) 2314 topo ={ } 2315 for tb in testbeds: 2316 self.get_access(tb, None, user, tbparams, master, 2317 export_project, access_user) 2318 topo[tb] = top.clone() 2319 to_delete = [ ] 2320 for e in topo[tb].elements: 2321 etb = e.get_attribute('testbed') 2322 if etb and etb != tb: 2323 for i in e.interface: 2324 for s in i.subs: 2325 try: 2326 s.interfaces.remove(i) 2327 except ValueError: 2328 raise service_error(service_error.internal, 2329 "Can't remove interface??") 2330 to_delete.append(e) 2331 for e in to_delete: 2332 topo[tb].elements.remove(e) 2333 topo[tb].make_indices() 2334 2335 2336 2337 for s in top.substrates: 2338 tests = { } 2339 for i in s.interfaces: 2340 e = i.element 2341 tb = e.get_attribute('testbed') 2342 if tb and not tests.has_key(tb): 2343 for i in e.interface: 2344 if s in i.subs: 2345 tests[tb]= \ 2346 i.get_attribute('ip4_address') 2347 if len(tests) < 2: 2348 continue 2349 2350 # More than one testbed is on this substrate. Insert 2351 # some gateways into the subtopologies. 2352 2353 for st in tests.keys(): 2354 for dt in [ t for t in tests.keys() if t != st]: 2355 myname = "%stunnel" % dt 2356 desthost = "%stunnel" % st 2357 sproject = tbparams[st].get('project', 'project') 2358 dproject = tbparams[dt].get('project', 'project') 2359 sdomain = ".%s.%s%s" % (eid, sproject, 2360 tbparams[st].get('domain', ".example.com")) 2361 ddomain = ".%s.%s%s" % (eid, dproject, 2362 tbparams[dt].get('domain', ".example.com")) 2363 boss = tbparams[master].get('boss', "boss") 2364 fs = tbparams[master].get('fs', "fs") 2365 event_server = "%s%s" % \ 2366 (tbparams[st].get('eventserver', "event_server"), 2367 tbparams[dt].get('domain', "example.com")) 2368 remote_event_server = "%s%s" % \ 2369 (tbparams[dt].get('eventserver', "event_server"), 2370 tbparams[dt].get('domain', "example.com")) 2371 seer_control = "%s%s" % \ 2372 (tbparams[st].get('control', "control"), sdomain) 2373 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 2374 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 2375 conf_file = "%s%s.gw.conf" % (myname, sdomain) 2376 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 2377 # translate to lower case so the `hostname` hack for specifying 2378 # configuration files works. 2379 conf_file = conf_file.lower(); 2380 remote_conf_file = remote_conf_file.lower(); 2381 active = ("%s" % (st == master)) 2382 portal = topdl.Computer(**{ 2383 'name': "%stunnel" % dt, 2384 'attribute' : [{ 2385 'attribute': n, 2386 'value': v, 2387 } for n, v in (\ 2388 ('gateway', 'true'), 2389 ('boss', boss), 2390 ('fs', fs), 2391 ('event_server', event_server), 2392 ('remote_event_server', remote_event_server), 2393 ('seer_control', seer_control), 2394 ('local_key_dir', local_key_dir), 2395 ('remote_conf_dir', remote_conf_dir), 2396 ('conf_file', conf_file), 2397 ('remote_conf_file', remote_conf_file), 2398 ('remote_script_dir', "/usr/local/federation/bin"), 2399 ('local_script_dir', "/usr/local/federation/bin"), 2400 )], 2401 'interface': [{ 2402 'substrate': s.name, 2403 'attribute': [ { 2404 'attribute': 'ip4_addreess', 2405 'value': tests[dt], 2406 }, ], 2407 }, ], 2408 }) 2409 topo[st].elements.append(portal) 2410 # Connect the gateway nodes into the topologies and clear out 2411 # substrates that are not in the topologies 2412 for tb in testbeds: 2413 topo[tb].incorporate_elements() 2414 topo[tb].substrates = \ 2415 [s for s in topo[tb].substrates \ 2416 if len(s.interfaces) >0] 2417 2418 softdir ="%s/software" % tmpdir 2419 softmap = { } 2420 os.mkdir(softdir) 2421 pkgs = set([fedkit, gatewaykit]) 2422 pkgs.update([x.location for e in top.elements \ 2423 for x in e.software]) 2424 for pkg in pkgs: 2425 loc = pkg 2426 2427 scheme, host, path = urlparse(loc)[0:3] 2428 dest = os.path.basename(path) 2429 if not scheme: 2430 if not loc.startswith('/'): 2431 loc = "/%s" % loc 2432 loc = "file://%s" %loc 2433 try: 2434 u = urlopen(loc) 2435 except Exception, e: 2436 raise service_error(service_error.req, 2437 "Cannot open %s: %s" % (loc, e)) 2438 try: 2439 f = open("%s/%s" % (softdir, dest) , "w") 2440 data = u.read(4096) 2441 while data: 2442 f.write(data) 2443 data = u.read(4096) 2444 f.close() 2445 u.close() 2446 except Exception, e: 2447 raise service_error(service_error.internal, 2448 "Could not copy %s: %s" % (loc, e)) 2449 path = re.sub("/tmp", "", softdir) 2450 # XXX 2451 softmap[pkg] = \ 2452 "https://users.isi.deterlab.net:23232/%s/%s" %\ 2453 ( path, dest) 2454 2455 # Convert the software locations in the segments into the local 2456 # copies on this host 2457 for soft in [ s for tb in topo.values() \ 2458 for e in tb.elements \ 2459 for s in e.software ]: 2460 if softmap.has_key(soft.location): 2461 soft.location = softmap[soft.location] 2462 for tb in testbeds: 2463 out_topo("%s.xml" %tb, topo[tb]) 2464 2465 vtopo = topdl.topology_to_vtopo(top) 2466 vis = self.genviz(vtopo) 2467 2468 except Exception, e: 2469 traceback.print_exc() 2470 raise service_error(service_error.internal, "%s" % e) 2471 2472 2473 2474 # Build the testbed topologies: 2475 2476 2477 if True: 2478 raise service_error(service_error.internal, "Developing") 2479 2480 # XXX old code 2481 # Objects to parse the splitter output (defined above) 2482 parse_current_testbed = self.current_testbed(eid, tmpdir, 2483 self.fedkit, self.gatewaykit) 2484 parse_allbeds = self.allbeds(self.get_access) 2485 parse_gateways = self.gateways(eid, master, tmpdir, 2486 gw_pubkey_base, gw_secretkey_base, self.copy_file, 2487 self.fedkit) 2488 parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", 2489 "^#\s+End\s+Vtopo") 2490 parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", 2491 "^#\s+End\s+hostnames", tmpdir + "/hosts") 2492 parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", 2493 "^#\s+End\s+tarfiles") 2494 parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", 2495 "^#\s+End\s+rpms") 2496 2497 # Working on the split data 2498 for line in split_data: 2499 line = line.rstrip() 2500 if parse_current_testbed(line, master, allocated, tbparams): 2501 continue 2502 elif parse_allbeds(line, user, tbparams, master, export_project, 2503 access_user): 2504 continue 2505 elif parse_gateways(line, allocated, tbparams): 2506 continue 2507 elif parse_vtopo(line): 2508 continue 2509 elif parse_hostnames(line): 2510 continue 2511 elif parse_tarfiles(line): 2512 continue 2513 elif parse_rpms(line): 2514 continue 2515 else: 2516 raise service_error(service_error.internal, 2517 "Bad tcl parse? %s" % line) 2518 # Virtual topology and visualization 2519 vtopo = self.gentopo(parse_vtopo.str) 2520 if not vtopo: 2521 raise service_error(service_error.internal, 2522 "Failed to generate virtual topology") 2523 2524 vis = self.genviz(vtopo) 2525 if not vis: 2526 raise service_error(service_error.internal, 2527 "Failed to generate visualization") 2528 2529 2455 # Allocate IP addresses: The allocator is a buddy system memory 2456 # allocator. Allocate from the largest substrate to the 2457 # smallest to make the packing more likely to work - i.e. 2458 # avoiding internal fragmentation. 2459 top = topdl.topology_from_xml(file=split_data, top="experiment") 2460 subs = sorted(top.substrates, 2461 cmp=lambda x,y: cmp(len(x.interfaces), 2462 len(y.interfaces)), 2463 reverse=True) 2464 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) 2465 for s in subs: 2466 a = ips.allocate(len(s.interfaces)+2) 2467 if a : 2468 base, num = a 2469 if num < len(s.interfaces) +2 : 2470 raise service_error(service_error.internal, 2471 "Allocator returned wrong number of IPs??") 2472 else: 2473 raise service_error(service_error.req, 2474 "Cannot allocate IP addresses") 2475 2476 base += 1 2477 for i in s.interfaces: 2478 i.attribute.append( 2479 topdl.Attribute('ip4_address', 2480 "%s" % ip_addr(base))) 2481 base += 1 2482 2483 # Find the testbeds to look up 2484 testbeds = set([ a.value for e in top.elements \ 2485 for a in e.attribute \ 2486 if a.attribute == 'testbed'] ) 2487 2488 # Make per testbed topologies. Copy the main topo and remove 2489 # interfaces and nodes that don't live in the testbed. 2490 topo ={ } 2491 for tb in testbeds: 2492 self.get_access(tb, None, user, tbparams, master, 2493 export_project, access_user) 2494 allocated[tb] = 1 2495 topo[tb] = top.clone() 2496 to_delete = [ ] 2497 for e in topo[tb].elements: 2498 etb = e.get_attribute('testbed') 2499 if etb and etb != tb: 2500 for i in e.interface: 2501 for s in i.subs: 2502 try: 2503 s.interfaces.remove(i) 2504 except ValueError: 2505 raise service_error(service_error.internal, 2506 "Can't remove interface??") 2507 to_delete.append(e) 2508 for e in to_delete: 2509 topo[tb].elements.remove(e) 2510 topo[tb].make_indices() 2511 2512 2513 2514 # Now, for each substrate in the main topology, find those that 2515 # have nodes on more than one testbed. Insert portal nodes 2516 # into the copies of those substrates on the sub topologies. 2517 for s in top.substrates: 2518 tests = { } 2519 for i in s.interfaces: 2520 e = i.element 2521 tb = e.get_attribute('testbed') 2522 if tb and not tests.has_key(tb): 2523 for i in e.interface: 2524 if s in i.subs: 2525 tests[tb]= \ 2526 i.get_attribute('ip4_address') 2527 if len(tests) < 2: 2528 continue 2529 2530 # More than one testbed is on this substrate. Insert 2531 # some portals into the subtopologies. 2532 2533 for st in tests.keys(): 2534 for dt in [ t for t in tests.keys() if t != st]: 2535 myname = "%stunnel" % dt 2536 desthost = "%stunnel" % st 2537 sproject = tbparams[st].get('project', 'project') 2538 dproject = tbparams[dt].get('project', 'project') 2539 sdomain = ".%s.%s%s" % (eid, sproject, 2540 tbparams[st].get('domain', ".example.com")) 2541 ddomain = ".%s.%s%s" % (eid, dproject, 2542 tbparams[dt].get('domain', ".example.com")) 2543 boss = tbparams[master].get('boss', "boss") 2544 fs = tbparams[master].get('fs', "fs") 2545 event_server = "%s%s" % \ 2546 (tbparams[st].get('eventserver', "event_server"), 2547 tbparams[dt].get('domain', "example.com")) 2548 remote_event_server = "%s%s" % \ 2549 (tbparams[dt].get('eventserver', "event_server"), 2550 tbparams[dt].get('domain', "example.com")) 2551 seer_control = "%s%s" % \ 2552 (tbparams[st].get('control', "control"), sdomain) 2553 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 2554 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 2555 conf_file = "%s%s.gw.conf" % (myname, sdomain) 2556 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 2557 # translate to lower case so the `hostname` hack for specifying 2558 # configuration files works. 2559 conf_file = conf_file.lower(); 2560 remote_conf_file = remote_conf_file.lower(); 2561 active = ("%s" % (st == master)) 2562 portal = topdl.Computer(**{ 2563 'name': "%stunnel" % dt, 2564 'attribute' : [{ 2565 'attribute': n, 2566 'value': v, 2567 } for n, v in (\ 2568 ('gateway', 'true'), 2569 ('boss', boss), 2570 ('fs', fs), 2571 ('event_server', event_server), 2572 ('remote_event_server', remote_event_server), 2573 ('seer_control', seer_control), 2574 ('local_key_dir', local_key_dir), 2575 ('remote_conf_dir', remote_conf_dir), 2576 ('conf_file', conf_file), 2577 ('remote_conf_file', remote_conf_file), 2578 ('remote_script_dir', "/usr/local/federation/bin"), 2579 ('local_script_dir', "/usr/local/federation/bin"), 2580 )], 2581 'interface': [{ 2582 'substrate': s.name, 2583 'attribute': [ { 2584 'attribute': 'ip4_addreess', 2585 'value': tests[dt], 2586 }, ], 2587 }, ], 2588 }) 2589 topo[st].elements.append(portal) 2590 # Connect the gateway nodes into the topologies and clear out 2591 # substrates that are not in the topologies 2592 for tb in testbeds: 2593 topo[tb].incorporate_elements() 2594 topo[tb].substrates = \ 2595 [s for s in topo[tb].substrates \ 2596 if len(s.interfaces) >0] 2597 2598 # Copy the rpms and tarfiles to a distribution directory from 2599 # which the federants can retrieve them 2600 linkpath = "%s/software" % expid 2601 softdir ="%s/%s" % ( self.repodir, linkpath) 2602 softmap = { } 2603 pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \ 2604 for p, t in l ]) 2605 pkgs.update([x.location for e in top.elements \ 2606 for x in e.software]) 2607 try: 2608 os.makedirs(softdir) 2609 except IOError, e: 2610 raise service_error( 2611 "Cannot create software directory: %s" % e) 2612 for pkg in pkgs: 2613 loc = pkg 2614 2615 scheme, host, path = urlparse(loc)[0:3] 2616 dest = os.path.basename(path) 2617 if not scheme: 2618 if not loc.startswith('/'): 2619 loc = "/%s" % loc 2620 loc = "file://%s" %loc 2621 try: 2622 u = urlopen(loc) 2623 except Exception, e: 2624 raise service_error(service_error.req, 2625 "Cannot open %s: %s" % (loc, e)) 2626 try: 2627 f = open("%s/%s" % (softdir, dest) , "w") 2628 data = u.read(4096) 2629 while data: 2630 f.write(data) 2631 data = u.read(4096) 2632 f.close() 2633 u.close() 2634 except Exception, e: 2635 raise service_error(service_error.internal, 2636 "Could not copy %s: %s" % (loc, e)) 2637 path = re.sub("/tmp", "", linkpath) 2638 # XXX 2639 softmap[pkg] = \ 2640 "https://users.isi.deterlab.net:23232/%s/%s" %\ 2641 ( path, dest) 2642 2643 # Allow the individual testbeds to access the software. 2644 for tb in tbparams.keys(): 2645 self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 2646 "%s/%s" % ( path, dest)) 2647 2648 # Convert the software locations in the segments into the local 2649 # copies on this host 2650 for soft in [ s for tb in topo.values() \ 2651 for e in tb.elements \ 2652 for s in e.software ]: 2653 if softmap.has_key(soft.location): 2654 soft.location = softmap[soft.location] 2655 2656 vtopo = topdl.topology_to_vtopo(top) 2657 vis = self.genviz(vtopo) 2658 2530 2659 # save federant information 2531 2660 for k in allocated.keys(): … … 2543 2672 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 2544 2673 if tbparams[tb].has_key('federant') ] 2545 if self.state_filename: self.write_state() 2674 if self.state_filename: 2675 self.write_state() 2546 2676 self.state_lock.release() 2547 2548 # Copy tarfiles and rpms needed at remote sites into a staging area2549 try:2550 if self.fedkit:2551 for t in self.fedkit:2552 parse_tarfiles.list.append(t[1])2553 if self.gatewaykit:2554 for t in self.gatewaykit:2555 parse_tarfiles.list.append(t[1])2556 for t in parse_tarfiles.list:2557 if not os.path.exists("%s/tarfiles" % tmpdir):2558 os.mkdir("%s/tarfiles" % tmpdir)2559 self.copy_file(t, "%s/tarfiles/%s" % \2560 (tmpdir, os.path.basename(t)))2561 for r in parse_rpms.list:2562 if not os.path.exists("%s/rpms" % tmpdir):2563 os.mkdir("%s/rpms" % tmpdir)2564 self.copy_file(r, "%s/rpms/%s" % \2565 (tmpdir, os.path.basename(r)))2566 # A null experiment file in case we need to create a remote2567 # experiment from scratch2568 f = open("%s/null.tcl" % tmpdir, "w")2569 print >>f, """2570 set ns [new Simulator]2571 source tb_compat.tcl2572 2573 set a [$ns node]2574 2575 $ns rtproto Session2576 $ns run2577 """2578 f.close()2579 2580 except IOError, e:2581 raise service_error(service_error.internal,2582 "Cannot stage tarfile/rpm: %s" % e.strerror)2583 2584 2677 except service_error, e: 2585 2678 # If something goes wrong in the parse (usually an access error) … … 2587 2680 # exceptions. Failing at this point returns a fault to the remote 2588 2681 # caller. 2682 2589 2683 self.state_lock.acquire() 2590 2684 del self.state[eid] … … 2616 2710 2617 2711 # Start a thread to do the resource allocation 2618 t = Thread(target=self. allocate_resources,2712 t = Thread(target=self.new_allocate_resources, 2619 2713 args=(allocated, master, eid, expid, expcert, tbparams, 2620 t mpdir, alloc_log),2714 topo, tmpdir, alloc_log), 2621 2715 name=eid) 2622 2716 t.start()
Note: See TracChangeset
for help on using the changeset viewer.