Changeset 6c57fe9 for fedd/federation/experiment_control.py
- Timestamp:
- Sep 2, 2009 10:36:18 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- f9ef40b
- Parents:
- cc8d8e9
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
rcc8d8e9 r6c57fe9 18 18 import xml.parsers.expat 19 19 20 from threading import *21 from subprocess import *20 from threading import Lock, Thread, Condition 21 from subprocess import call, Popen, PIPE 22 22 23 23 from urlparse import urlparse … … 232 232 or config.get("globals", "trusted_certs") 233 233 234 # XXX: 235 self.repodir = '/usr/local/etc/fedd/repo' 234 self.repodir = config.get("experiment_control", "repodir") 236 235 237 236 self.exp_stem = "fed-stem" … … 2152 2151 self.caller = caller 2153 2152 2154 def __call__(self, uri, aid, topo ):2153 def __call__(self, uri, aid, topo, attrs=None): 2155 2154 req = { 2156 2155 'allocID': { 'fedid' : aid }, … … 2159 2158 }, 2160 2159 } 2161 2162 print "calling %s" % uri 2160 if attrs: 2161 req['fedAttr'] = attrs 2162 2163 2163 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 2164 2164 self.trusted_certs) … … 2170 2170 2171 2171 def new_allocate_resources(self, allocated, master, eid, expid, expcert, 2172 tbparams, topo, tmpdir, alloc_log=None ):2172 tbparams, topo, tmpdir, alloc_log=None, attrs=None): 2173 2173 started = { } # Testbeds where a sub-experiment started 2174 2174 # successfully … … 2203 2203 trusted_certs=self.trusted_certs, 2204 2204 caller=self.call_StartSegment), 2205 args=(uri, aid, topo[tb] ), name=tb,2205 args=(uri, aid, topo[tb], attrs), name=tb, 2206 2206 pdata=thread_pool, trace_file=self.trace_file) 2207 2207 threads.append(t) … … 2294 2294 to instantiate them and start it all up. 2295 2295 """ 2296 2297 def add_kit(e, kit): 2298 """ 2299 Add a Software object created from the list of (install, location) 2300 tuples passed as kit to the software attribute of an object e. We 2301 do this enough to break out the code, but it's kind of a hack to 2302 avoid changing the old tuple rep. 2303 """ 2304 2305 s = [ topdl.Software(install=i, location=l) for i, l in kit] 2306 2307 if isinstance(e.software, list): e.software.extend(s) 2308 else: e.software = s 2309 2296 2310 2297 2311 if not self.auth.check_attribute(fid, 'create'): … … 2463 2477 reverse=True) 2464 2478 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) 2465 for s in subs: 2479 ifs = { } 2480 hosts = [ ] 2481 # The config urlpath 2482 configpath = "/%s/config" % expid 2483 # The config file system location 2484 configdir ="%s%s" % ( self.repodir, configpath) 2485 2486 for idx, s in enumerate(subs): 2466 2487 a = ips.allocate(len(s.interfaces)+2) 2467 2488 if a : … … 2479 2500 topdl.Attribute('ip4_address', 2480 2501 "%s" % ip_addr(base))) 2502 hname = i.element.name[0] 2503 if ifs.has_key(hname): 2504 hosts.append("%s\t%s-%s %s-%d" % \ 2505 (ip_addr(base), hname, s.name, hname, 2506 ifs[hname])) 2507 else: 2508 ifs[hname] = 0 2509 hosts.append("%s\t%s-%s %s-%d %s" % \ 2510 (ip_addr(base), hname, s.name, hname, 2511 ifs[hname], hname)) 2512 2513 ifs[hname] += 1 2481 2514 base += 1 2482 2515 # save config files 2516 try: 2517 os.makedirs(configdir) 2518 except IOError, e: 2519 raise service_error( 2520 "Cannot create config directory: %s" % e) 2483 2521 # Find the testbeds to look up 2484 2522 testbeds = set([ a.value for e in top.elements \ 2485 2523 for a in e.attribute \ 2486 2524 if a.attribute == 'testbed'] ) 2525 2487 2526 2488 2527 # Make per testbed topologies. Copy the main topo and remove … … 2510 2549 topo[tb].make_indices() 2511 2550 2512 2551 for e in topo[tb].elements: 2552 if tb == master: 2553 cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid) 2554 else: 2555 cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate" 2556 scmd = e.get_attribute('startup') 2557 if scmd: 2558 cmd = "%s \\$USER '%s'" % (cmd, scmd) 2559 2560 e.set_attribute('startup', cmd) 2561 if self.fedkit: add_kit(e, self.fedkit) 2562 2563 # Copy configuration files into the remote file store 2564 try: 2565 f = open("%s/hosts" % configdir, "w") 2566 f.write('\n'.join(hosts)) 2567 f.close() 2568 except IOError, e: 2569 raise service_error(service_error.internal, 2570 "Cannot write hosts file: %s" % e) 2571 try: 2572 self.copy_file("%s" % gw_pubkey, "%s/%s" % \ 2573 (configdir, gw_pubkey_base)) 2574 self.copy_file("%s" % gw_secretkey, "%s/%s" % \ 2575 (configdir, gw_secretkey_base)) 2576 except IOError, e: 2577 raise service_error(service_error.internal, 2578 "Cannot copy keyfiles: %s" % e) 2579 2580 # Allow the individual testbeds to access the configuration files. 2581 for tb in tbparams.keys(): 2582 asignee = tbparams[tb]['allocID']['fedid'] 2583 for f in ("hosts", gw_secretkey_base, gw_pubkey_base): 2584 self.auth.set_attribute(asignee, "%s/%s" % (configpath, f)) 2585 print "assigned %s/%s" % (configpath, f) 2513 2586 2514 2587 # Now, for each substrate in the main topology, find those that … … 2516 2589 # into the copies of those substrates on the sub topologies. 2517 2590 for s in top.substrates: 2518 tests = { } 2591 # tbs will contain an ip address on this subsrate that is in 2592 # each testbed. 2593 tbs = { } 2519 2594 for i in s.interfaces: 2520 2595 e = i.element 2521 2596 tb = e.get_attribute('testbed') 2522 if tb and not t ests.has_key(tb):2597 if tb and not tbs.has_key(tb): 2523 2598 for i in e.interface: 2524 2599 if s in i.subs: 2525 tests[tb]= \ 2526 i.get_attribute('ip4_address') 2527 if len(tests) < 2: 2600 tbs[tb]= i.get_attribute('ip4_address') 2601 if len(tbs) < 2: 2528 2602 continue 2529 2603 2530 2604 # More than one testbed is on this substrate. Insert 2531 # some portals into the subtopologies. 2532 2533 for st in tests.keys(): 2534 for dt in [ t for t in tests.keys() if t != st]: 2605 # some portals into the subtopologies. st == source testbed, 2606 # dt == destination testbed. 2607 segment_substrate = { } 2608 for st in tbs.keys(): 2609 segment_substrate[st] = { } 2610 for dt in [ t for t in tbs.keys() if t != st]: 2535 2611 myname = "%stunnel" % dt 2536 2612 desthost = "%stunnel" % st … … 2541 2617 ddomain = ".%s.%s%s" % (eid, dproject, 2542 2618 tbparams[dt].get('domain', ".example.com")) 2543 boss = tbparams[master].get('boss', "boss") 2544 fs = tbparams[master].get('fs', "fs") 2545 event_server = "%s%s" % \ 2546 (tbparams[st].get('eventserver', "event_server"), 2547 tbparams[dt].get('domain', "example.com")) 2548 remote_event_server = "%s%s" % \ 2549 (tbparams[dt].get('eventserver', "event_server"), 2550 tbparams[dt].get('domain', "example.com")) 2551 seer_control = "%s%s" % \ 2552 (tbparams[st].get('control', "control"), sdomain) 2553 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 2554 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 2555 conf_file = "%s%s.gw.conf" % (myname, sdomain) 2556 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 2557 # translate to lower case so the `hostname` hack for specifying 2558 # configuration files works. 2559 conf_file = conf_file.lower(); 2560 remote_conf_file = remote_conf_file.lower(); 2619 mdomain = "%s.%s%s" % (eid, 2620 tbparams[master].get('project', 'project'), 2621 tbparams[master].get('domain', '.example.com')) 2622 # XXX: active and type need to be unkludged 2561 2623 active = ("%s" % (st == master)) 2562 portal = topdl.Computer(**{ 2563 'name': "%stunnel" % dt, 2564 'attribute' : [{ 2565 'attribute': n, 2566 'value': v, 2567 } for n, v in (\ 2568 ('gateway', 'true'), 2569 ('boss', boss), 2570 ('fs', fs), 2571 ('event_server', event_server), 2572 ('remote_event_server', remote_event_server), 2573 ('seer_control', seer_control), 2574 ('local_key_dir', local_key_dir), 2575 ('remote_conf_dir', remote_conf_dir), 2576 ('conf_file', conf_file), 2577 ('remote_conf_file', remote_conf_file), 2578 ('remote_script_dir', "/usr/local/federation/bin"), 2579 ('local_script_dir', "/usr/local/federation/bin"), 2580 )], 2581 'interface': [{ 2582 'substrate': s.name, 2583 'attribute': [ { 2584 'attribute': 'ip4_addreess', 2585 'value': tests[dt], 2586 }, ], 2587 }, ], 2588 }) 2624 if not segment_substrate[st].has_key(dt): 2625 # Put a substrate and a segment for the connected 2626 # testbed in there. 2627 tsubstrate = \ 2628 topdl.Substrate(name='%s-%s' % (st, dt)) 2629 segment_element = topdl.Segment( 2630 id= tbparams[dt]['allocID'], 2631 type='emulab', 2632 uri = self.tbmap.get(dt, None), 2633 interface=[ 2634 topdl.Interface( 2635 substrate=tsubstrate.name), 2636 ], 2637 attribute = [ 2638 topdl.Attribute(attribute=n, value=v) 2639 for n, v in (\ 2640 ('domain', ddomain), 2641 ('experiment', "%s/%s" % \ 2642 (dproject, eid)),) 2643 ], 2644 ) 2645 segment_substrate[st][dt] = tsubstrate 2646 topo[st].substrates.append(tsubstrate) 2647 topo[st].elements.append(segment_element) 2648 portal = topdl.Computer( 2649 name="%stunnel" % dt, 2650 attribute=[ 2651 topdl.Attribute(attribute=n,value=v) 2652 for n, v in (\ 2653 ('portal', 'true'), 2654 ('masterdomain', mdomain), 2655 ('experiment', "%s/%s" % \ 2656 (sproject, eid)), 2657 ('peer', "%s.%s" % \ 2658 (desthost, ddomain)), 2659 ('scriptdir', 2660 "/usr/local/federation/bin"), 2661 ('active', "%s" % active), 2662 ('type', 'both'), 2663 ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), sdomain.lower()))) 2664 ], 2665 interface=[ 2666 topdl.Interface( 2667 substrate=s.name, 2668 attribute=[ 2669 topdl.Attribute( 2670 attribute='ip4_addreess', 2671 value=tbs[dt] 2672 ) 2673 ]), 2674 topdl.Interface( 2675 substrate=\ 2676 segment_substrate[st][dt].name 2677 ), 2678 ], 2679 ) 2680 if self.fedkit: add_kit(portal, self.fedkit) 2681 if self.gatewaykit: add_kit(portal, self.gatewaykit) 2682 2589 2683 topo[st].elements.append(portal) 2684 2590 2685 # Connect the gateway nodes into the topologies and clear out 2591 2686 # substrates that are not in the topologies … … 2626 2721 try: 2627 2722 f = open("%s/%s" % (softdir, dest) , "w") 2723 self.log.debug("Writing %s/%s" % (softdir,dest) ) 2628 2724 data = u.read(4096) 2629 2725 while data: … … 2644 2740 for tb in tbparams.keys(): 2645 2741 self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 2646 " %s/%s" % ( path, dest))2742 "/%s/%s" % ( path, dest)) 2647 2743 2648 2744 # Convert the software locations in the segments into the local … … 2650 2746 for soft in [ s for tb in topo.values() \ 2651 2747 for e in tb.elements \ 2652 for s in e.software ]: 2748 if getattr(e, 'software', False) \ 2749 for s in e.software ]: 2653 2750 if softmap.has_key(soft.location): 2654 2751 soft.location = softmap[soft.location] … … 2709 2806 alloc_log.addHandler(h) 2710 2807 2808 # XXX 2809 url_base = 'https://users.isi.deterlab.net:23232' 2810 attrs = [ 2811 { 2812 'attribute': 'ssh_pubkey', 2813 'value': '%s/%s/config/%s' % \ 2814 (url_base, expid, gw_pubkey_base) 2815 }, 2816 { 2817 'attribute': 'ssh_secretkey', 2818 'value': '%s/%s/config/%s' % \ 2819 (url_base, expid, gw_secretkey_base) 2820 }, 2821 { 2822 'attribute': 'hosts', 2823 'value': '%s/%s/config/hosts' % \ 2824 (url_base, expid) 2825 }, 2826 ] 2827 2711 2828 # Start a thread to do the resource allocation 2712 2829 t = Thread(target=self.new_allocate_resources, 2713 2830 args=(allocated, master, eid, expid, expcert, tbparams, 2714 topo, tmpdir, alloc_log ),2831 topo, tmpdir, alloc_log, attrs), 2715 2832 name=eid) 2716 2833 t.start() … … 2774 2891 2775 2892 def get_handler(self, path, fid): 2776 print "in get_handler %s %s" % (path, fid) 2777 return ("/users/faber/test.html", "text/html") 2893 print "%s" % path 2894 if self.auth.check_attribute(fid, path): 2895 return ("%s/%s" % (self.repodir, path), "application/binary") 2896 else: 2897 return (None, None) 2778 2898 2779 2899 def get_vtopo(self, req, fid):
Note: See TracChangeset
for help on using the changeset viewer.