Changeset db6b092 for fedd/federation
- Timestamp:
- Aug 5, 2009 11:12:21 AM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- c9b3f49
- Parents:
- ab37086
- Location:
- fedd/federation
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
rab37086 rdb6b092 21 21 from subprocess import * 22 22 23 from urlparse import urlparse 24 from urllib2 import urlopen 25 23 26 from util import * 24 27 from fedid import fedid, generate_fedid … … 26 29 from service_error import service_error 27 30 31 import topdl 32 from ip_allocator import ip_allocator 33 from ip_addr import ip_addr 34 28 35 29 36 class nullHandler(logging.Handler): … … 44 51 45 52 class list_log: 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 53 """ 54 Provide an interface that lets logger.StreamHandler s write to a list 55 of strings. 56 """ 57 def __init__(self, l=[]): 58 """ 59 Link to an existing list or just create a log 60 """ 61 self.ll = l 62 self.lock = Lock() 63 def write(self, str): 64 """ 65 Add the string to the log. Lock for consistency. 66 """ 67 self.lock.acquire() 68 self.ll.append(str) 69 self.lock.release() 70 71 def flush(self): 72 """ 73 No-op that StreamHandlers expect 74 """ 75 pass 69 76 70 77 … … 253 260 self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa") 254 261 255 256 257 258 259 260 261 262 self.overrides = set([]) 263 ovr = config.get('experiment_control', 'overrides') 264 if ovr: 265 for o in ovr.split(","): 266 o = o.strip() 267 if o.startswith('fedid:'): o = o[len('fedid:'):] 268 self.overrides.add(fedid(hexstr=o)) 262 269 263 270 self.state = { } … … 334 341 # Dispatch tables 335 342 self.soap_services = {\ 336 'Create': soap_handler('Create', self. create_experiment),343 'Create': soap_handler('Create', self.new_create_experiment), 337 344 'Vtopo': soap_handler('Vtopo', self.get_vtopo), 338 345 'Vis': soap_handler('Vis', self.get_vis), … … 344 351 345 352 self.xmlrpc_services = {\ 346 'Create': xmlrpc_handler('Create', self. create_experiment),353 'Create': xmlrpc_handler('Create', self.new_create_experiment), 347 354 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo), 348 355 'Vis': xmlrpc_handler('Vis', self.get_vis), … … 417 424 if f.has_key('fedid') ]: 418 425 self.auth.set_attribute(self.state[k]['owner'], eid) 419 420 421 426 # allow overrides to control experiments as well 427 for o in self.overrides: 428 self.auth.set_attribute(o, eid) 422 429 except KeyError, e: 423 430 self.log.warning("[read_state]: State ownership or identity " +\ … … 508 515 509 516 class emulab_segment: 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 517 def __init__(self, log=None, keyfile=None, debug=False): 518 self.log = log or logging.getLogger(\ 519 'fedd.experiment_control.emulab_segment') 520 self.ssh_privkey_file = keyfile 521 self.debug = debug 522 self.ssh_exec="/usr/bin/ssh" 523 self.scp_exec = "/usr/bin/scp" 524 self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout 525 526 def scp_file(self, file, user, host, dest=""): 527 """ 528 scp a file to the remote host. If debug is set the action is only 529 logged. 530 """ 531 532 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 533 '-o', 'StrictHostKeyChecking yes', '-i', 534 self.ssh_privkey_file, file, 535 "%s@%s:%s" % (user, host, dest)] 536 rv = 0 537 538 try: 539 dnull = open("/dev/null", "w") 540 except IOError: 541 self.log.debug("[ssh_file]: failed to open " + \ 542 "/dev/null for redirect") 543 dnull = Null 544 545 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 546 if not self.debug: 547 rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True, 548 close_fds=True) 549 550 return rv == 0 551 552 def ssh_cmd(self, user, host, cmd, wname=None, timeout=None): 553 """ 554 Run a remote command on host as user. If debug is set, the action 555 is only logged. Commands are run without stdin, to avoid stray 556 SIGTTINs. 557 """ 558 sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \ 559 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ 560 (self.ssh_exec, self.ssh_privkey_file, 561 user, host, cmd) 562 563 try: 564 dnull = open("/dev/null", "w") 565 except IOError: 566 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \ 567 "for redirect") 568 dnull = Null 569 570 self.log.debug("[ssh_cmd]: %s" % sh_str) 571 if not self.debug: 572 if dnull: 573 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull, 574 close_fds=True) 575 else: 576 sub = Popen(sh_str, shell=True, 577 close_fds=True) 578 if timeout: 579 i = 0 580 rv = sub.poll() 581 while i < timeout: 582 if rv is not None: break 583 else: 584 time.sleep(1) 585 rv = sub.poll() 586 i += 1 587 else: 588 self.log.debug("Process exceeded runtime: %s" % sh_str) 589 os.kill(sub.pid, signal.SIGKILL) 590 raise self.ssh_cmd_timeout(); 591 return rv == 0 592 else: 593 return sub.wait() == 0 594 else: 595 if timeout == 0: 596 self.log.debug("debug timeout raised on %s " % sh_str) 597 raise self.ssh_cmd_timeout() 598 else: 599 return True 593 600 594 601 class start_segment(emulab_segment): 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 602 def __init__(self, log=None, keyfile=None, debug=False): 603 experiment_control_local.emulab_segment.__init__(self, 604 log=log, keyfile=keyfile, debug=debug) 605 606 def create_config_tree(self, src_dir, dest_dir, script): 607 """ 608 Append commands to script that will create the directory hierarchy 609 on the remote federant. 610 """ 611 612 if os.path.isdir(src_dir): 613 print >>script, "mkdir -p %s" % dest_dir 614 print >>script, "chmod 770 %s" % dest_dir 615 616 for f in os.listdir(src_dir): 617 if os.path.isdir(f): 618 self.create_config_tree("%s/%s" % (src_dir, f), 619 "%s/%s" % (dest_dir, f), script) 620 else: 621 self.log.debug("[create_config_tree]: Not a directory: %s" \ 622 % src_dir) 623 624 def ship_configs(self, host, user, src_dir, dest_dir): 625 """ 626 Copy federant-specific configuration files to the federant. 627 """ 628 for f in os.listdir(src_dir): 629 if os.path.isdir(f): 630 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 631 "%s/%s" % (dest_dir, f)): 632 return False 633 else: 634 if not self.scp_file("%s/%s" % (src_dir, f), 635 user, host, dest_dir): 636 return False 637 return True 638 639 def get_state(self, user, host, tb, pid, eid): 640 # command to test experiment state 641 expinfo_exec = "/usr/testbed/bin/expinfo" 642 # Regular expressions to parse the expinfo response 643 state_re = re.compile("State:\s+(\w+)") 644 no_exp_re = re.compile("^No\s+such\s+experiment") 645 swapping_re = re.compile("^No\s+information\s+available.") 646 state = None # Experiment state parsed from expinfo 647 # The expinfo ssh command. Note the identity restriction to use 648 # only the identity provided in the pubkey given. 649 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 650 'StrictHostKeyChecking yes', '-i', 651 self.ssh_privkey_file, "%s@%s" % (user, host), 652 expinfo_exec, pid, eid] 653 654 dev_null = None 655 try: 656 dev_null = open("/dev/null", "a") 657 except IOError, e: 658 self.log.error("[get_state]: can't open /dev/null: %s" %e) 659 660 if self.debug: 661 state = 'swapped' 662 rv = 0 663 else: 664 status = Popen(cmd, stdout=PIPE, stderr=dev_null, 665 close_fds=True) 666 for line in status.stdout: 667 m = state_re.match(line) 668 if m: state = m.group(1) 669 else: 670 for reg, st in ((no_exp_re, "none"), 671 (swapping_re, "swapping")): 672 m = reg.match(line) 673 if m: state = st 674 rv = status.wait() 675 676 # If the experiment is not present the subcommand returns a 677 # non-zero return value. If we successfully parsed a "none" 678 # outcome, ignore the return code. 679 if rv != 0 and state != 'none': 680 raise service_error(service_error.internal, 681 "Cannot get status of segment %s:%s/%s" % \ 682 (tb, pid, eid)) 683 elif state not in ('active', 'swapped', 'swapping', 'none'): 684 raise service_error(service_error.internal, 685 "Cannot get status of segment %s:%s/%s" % \ 686 (tb, pid, eid)) 687 else: return state 688 689 690 def __call__(self, tb, eid, tbparams, tmpdir, timeout=0): 691 """ 692 Start a sub-experiment on a federant. 693 694 Get the current state, modify or create as appropriate, ship data 695 and configs and start the experiment. There are small ordering 696 differences based on the initial state of the sub-experiment. 697 """ 698 # ops node in the federant 699 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 700 user = tbparams[tb]['user'] # federant user 701 pid = tbparams[tb]['project'] # federant project 702 # XXX 703 base_confs = ( "hosts",) 704 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 705 # Configuration directories on the remote machine 706 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 707 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 708 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 709 710 state = self.get_state(user, host, tb, pid, eid) 711 712 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 713 self.log.info("[start_segment]:transferring experiment to %s" % tb) 714 715 if not self.scp_file("%s/%s/%s" % \ 716 (tmpdir, tb, tclfile), user, host): 717 return False 718 719 if state == 'none': 720 # Create a null copy of the experiment so that we capture any 721 # logs there if the modify fails. Emulab software discards the 722 # logs from a failed startexp 723 if not self.scp_file("%s/null.tcl" % tmpdir, user, host): 724 return False 725 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 726 timedout = False 727 try: 728 if not self.ssh_cmd(user, host, 729 ("/usr/testbed/bin/startexp -i -f -w -p %s " + 730 "-e %s null.tcl") % (pid, eid), "startexp", 731 timeout=60 * 10): 732 return False 733 except self.ssh_cmd_timeout: 734 timedout = True 735 736 if timedout: 737 state = self.get_state(user, host, tb, pid, eid) 738 if state != "swapped": 739 return False 740 741 742 # Open up a temporary file to contain a script for setting up the 743 # filespace for the new experiment. 744 self.log.info("[start_segment]: creating script file") 745 try: 746 sf, scriptname = tempfile.mkstemp() 747 scriptfile = os.fdopen(sf, 'w') 748 except IOError: 749 return False 750 751 scriptbase = os.path.basename(scriptname) 752 753 # Script the filesystem changes 754 print >>scriptfile, "/bin/rm -rf %s" % proj_dir 755 # Clear and create the tarfiles and rpm directories 756 for d in (tarfiles_dir, rpms_dir): 757 print >>scriptfile, "/bin/rm -rf %s/*" % d 758 print >>scriptfile, "mkdir -p %s" % d 759 print >>scriptfile, 'mkdir -p %s' % proj_dir 760 self.create_config_tree("%s/%s" % (tmpdir, tb), 761 proj_dir, scriptfile) 762 if os.path.isdir("%s/tarfiles" % tmpdir): 763 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir, 764 scriptfile) 765 if os.path.isdir("%s/rpms" % tmpdir): 766 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir, 767 scriptfile) 768 print >>scriptfile, "rm -f %s" % scriptbase 769 scriptfile.close() 770 771 # Move the script to the remote machine 772 # XXX: could collide tempfile names on the remote host 773 if self.scp_file(scriptname, user, host, scriptbase): 774 os.remove(scriptname) 775 else: 776 return False 777 778 # Execute the script (and the script's last line deletes it) 779 if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase): 780 return False 781 782 for f in base_confs: 783 if not self.scp_file("%s/%s" % (tmpdir, f), user, host, 784 "%s/%s" % (proj_dir, f)): 785 return False 786 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), 787 proj_dir): 788 return False 789 if os.path.isdir("%s/tarfiles" % tmpdir): 790 if not self.ship_configs(host, user, 791 "%s/tarfiles" % tmpdir, tarfiles_dir): 792 return False 793 if os.path.isdir("%s/rpms" % tmpdir): 794 if not self.ship_configs(host, user, 795 "%s/rpms" % tmpdir, tarfiles_dir): 796 return False 797 # Stage the new configuration (active experiments will stay swapped 798 # in now) 799 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 800 try: 801 if not self.ssh_cmd(user, host, 802 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 803 (pid, eid, tclfile), 804 "modexp", timeout= 60 * 10): 805 return False 806 except self.ssh_cmd_timeout: 807 self.log.error("Modify command failed to complete in time") 808 # There's really no way to see if this succeeded or failed, so 809 # if it hangs, assume the worst. 810 return False 811 # Active experiments are still swapped, this swaps the others in. 812 if state != 'active': 813 self.log.info("[start_segment]: Swapping %s in on %s" % \ 814 (eid, tb)) 815 timedout = False 816 try: 817 if not self.ssh_cmd(user, host, 818 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 819 "swapexp", timeout=10*60): 820 return False 821 except self.ssh_cmd_timeout: 822 timedout = True 823 824 # If the command was terminated, but completed successfully, 825 # report success. 826 if timedout: 827 self.log.debug("[start_segment]: swapin timed out " +\ 828 "checking state") 829 state = self.get_state(user, host, tb, pid, eid) 830 self.log.debug("[start_segment]: state is %s" % state) 831 return state == 'active' 832 # Everything has gone OK. 833 return True 827 834 828 835 class stop_segment(emulab_segment): 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 836 def __init__(self, log=None, keyfile=None, debug=False): 837 experiment_control_local.emulab_segment.__init__(self, 838 log=log, keyfile=keyfile, debug=debug) 839 840 def __call__(self, tb, eid, tbparams): 841 """ 842 Stop a sub experiment by calling swapexp on the federant 843 """ 844 user = tbparams[tb]['user'] 845 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 846 pid = tbparams[tb]['project'] 847 848 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 849 rv = False 850 try: 851 # Clean out tar files: we've gone over quota in the past 852 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid)) 853 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \ 854 (pid, eid)) 855 rv = self.ssh_cmd(user, host, 856 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) 857 except self.ssh_cmd_timeout: 858 rv = False 859 return rv 853 860 854 861 … … 996 1003 "Failed to open file in genviz") 997 1004 998 999 1000 1001 1005 try: 1006 dnull = open('/dev/null', 'w') 1007 except IOError: 1008 service_error(service_error.internal, 1002 1009 "Failed to open /dev/null in genviz") 1003 1010 … … 1023 1030 dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000', 1024 1031 '-Gpack=true', dotname], stdout=PIPE, stderr=dnull, 1025 1026 1032 close_fds=True) 1033 dnull.close() 1027 1034 1028 1035 # Translate dot to vis format … … 1687 1694 1688 1695 def allocate_resources(self, allocated, master, eid, expid, expcert, 1689 1690 1691 1696 tbparams, tmpdir, alloc_log=None): 1697 started = { } # Testbeds where a sub-experiment started 1698 # successfully 1692 1699 1693 1700 # XXX 1694 1701 fail_soft = False 1695 1702 1696 1703 log = alloc_log or self.log 1697 1704 1698 1705 thread_pool = self.thread_pool(self.nthreads) … … 1704 1711 thread_pool.wait_for_slot() 1705 1712 t = self.pooled_thread(\ 1706 1707 1713 target=self.start_segment(log=log, 1714 keyfile=self.ssh_privkey_file, debug=self.debug), 1708 1715 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1709 1716 pdata=thread_pool, trace_file=self.trace_file) … … 1718 1725 1719 1726 if len(failed) == 0: 1720 1721 1727 starter = self.start_segment(log=log, 1728 keyfile=self.ssh_privkey_file, debug=self.debug) 1722 1729 if not starter(master, eid, tbparams, tmpdir): 1723 1730 failed.append(master) … … 1732 1739 thread_pool.wait_for_slot() 1733 1740 t = self.pooled_thread(\ 1734 1735 1736 1741 target=self.stop_segment(log=log, 1742 keyfile=self.ssh_privkey_file, 1743 debug=self.debug), 1737 1744 args=(tb, eid, tbparams), name=tb, 1738 1745 pdata=thread_pool, trace_file=self.trace_file) … … 1747 1754 self.state_lock.acquire() 1748 1755 self.state[eid]['experimentStatus'] = 'failed' 1749 1756 if self.state_filename: self.write_state() 1750 1757 self.state_lock.release() 1751 1758 … … 1753 1760 # "Swap in failed on %s" % ",".join(failed)) 1754 1761 log.error("Swap in failed on %s" % ",".join(failed)) 1755 1762 return 1756 1763 else: 1757 1764 log.info("[start_segment]: Experiment %s active" % eid) … … 1769 1776 # Insert the experiment into our state and update the disk copy 1770 1777 self.state_lock.acquire() 1771 1778 self.state[expid]['experimentStatus'] = 'active' 1772 1779 self.state[eid] = self.state[expid] 1773 1780 if self.state_filename: self.write_state() 1774 1781 self.state_lock.release() 1775 1782 return 1776 1783 1777 1784 def create_experiment(self, req, fid): … … 1844 1851 if req.has_key('experimentID') and \ 1845 1852 req['experimentID'].has_key('localname'): 1846 1853 overwrite = False 1847 1854 eid = req['experimentID']['localname'] 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1855 # If there's an old failed experiment here with the same local name 1856 # and accessible by this user, we'll overwrite it, otherwise we'll 1857 # fall through and do the collision avoidance. 1858 old_expid = self.get_experiment_fedid(eid) 1859 if old_expid and self.check_experiment_access(fid, old_expid): 1860 self.state_lock.acquire() 1861 status = self.state[eid].get('experimentStatus', None) 1862 if status and status == 'failed': 1863 # remove the old access attribute 1864 self.auth.unset_attribute(fid, old_expid) 1865 overwrite = True 1866 del self.state[eid] 1867 del self.state[old_expid] 1868 self.state_lock.release() 1862 1869 self.state_lock.acquire() 1863 1870 while (self.state.has_key(eid) and not overwrite): 1864 1871 eid += random.choice(string.ascii_letters) 1865 1872 # Initial state 1866 1873 self.state[eid] = { 1867 1868 1869 1870 1871 1872 1873 1874 1874 'experimentID' : \ 1875 [ { 'localname' : eid }, {'fedid': expid } ], 1876 'experimentStatus': 'starting', 1877 'experimentAccess': { 'X509' : expcert }, 1878 'owner': fid, 1879 'log' : [], 1880 } 1881 self.state[expid] = self.state[eid] 1875 1882 if self.state_filename: self.write_state() 1876 1883 self.state_lock.release() … … 1884 1891 for i in range(0,5): 1885 1892 eid += random.choice(string.ascii_letters) 1886 1893 # Initial state 1887 1894 self.state[eid] = { 1888 1889 1890 1891 1892 1893 1894 1895 1895 'experimentID' : \ 1896 [ { 'localname' : eid }, {'fedid': expid } ], 1897 'experimentStatus': 'starting', 1898 'experimentAccess': { 'X509' : expcert }, 1899 'owner': fid, 1900 'log' : [], 1901 } 1902 self.state[expid] = self.state[eid] 1896 1903 if self.state_filename: self.write_state() 1897 1904 self.state_lock.release() … … 1935 1942 1936 1943 self.log.debug("running local splitter %s", " ".join(tclcmd)) 1937 1938 1939 1940 1944 # This is just fantastic. As a side effect the parser copies 1945 # tb_compat.tcl into the current directory, so that directory 1946 # must be writable by the fedd user. Doing this in the 1947 # temporary subdir ensures this is the case. 1941 1948 tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 1942 1949 cwd=tmpdir) 1943 1950 split_data = tclparser.stdout 1944 1951 … … 2002 2009 } 2003 2010 2004 2005 2006 2007 2008 2009 2011 self.state_lock.acquire() 2012 self.state[eid]['vtopo'] = vtopo 2013 self.state[eid]['vis'] = vis 2014 self.state[expid]['federant'] = \ 2015 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 2016 if tbparams[tb].has_key('federant') ] 2010 2017 if self.state_filename: self.write_state() 2011 2018 self.state_lock.release() 2012 2019 2013 2020 # Copy tarfiles and rpms needed at remote sites into a staging area … … 2050 2057 # If something goes wrong in the parse (usually an access error) 2051 2058 # clear the placeholder state. From here on out the code delays 2052 2053 2059 # exceptions. Failing at this point returns a fault to the remote 2060 # caller. 2054 2061 self.state_lock.acquire() 2055 2062 del self.state[eid] … … 2060 2067 2061 2068 2062 2063 2064 2065 2069 # Start the background swapper and return the starting state. From 2070 # here on out, the state will stick around a while. 2071 2072 # Let users touch the state 2066 2073 self.auth.set_attribute(fid, expid) 2067 2074 self.auth.set_attribute(expid, expid) 2068 # Override fedids can manipulate state as well 2069 for o in self.overrides: 2070 self.auth.set_attribute(o, expid) 2071 2072 # Create a logger that logs to the experiment's state object as well as 2073 # to the main log file. 2074 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2075 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 2076 # XXX: there should be a global one of these rather than repeating the 2077 # code. 2078 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2079 '%d %b %y %H:%M:%S')) 2080 alloc_log.addHandler(h) 2081 2082 # Start a thread to do the resource allocation 2083 t = Thread(target=self.allocate_resources, 2084 args=(allocated, master, eid, expid, expcert, tbparams, 2085 tmpdir, alloc_log), 2086 name=eid) 2087 t.start() 2088 2089 rv = { 2090 'experimentID': [ 2091 {'localname' : eid }, { 'fedid': copy.copy(expid) } 2092 ], 2093 'experimentStatus': 'starting', 2094 'experimentAccess': { 'X509' : expcert } 2095 } 2096 2097 return rv 2075 # Override fedids can manipulate state as well 2076 for o in self.overrides: 2077 self.auth.set_attribute(o, expid) 2078 2079 # Create a logger that logs to the experiment's state object as well as 2080 # to the main log file. 2081 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2082 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 2083 # XXX: there should be a global one of these rather than repeating the 2084 # code. 2085 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2086 '%d %b %y %H:%M:%S')) 2087 alloc_log.addHandler(h) 2088 2089 # Start a thread to do the resource allocation 2090 t = Thread(target=self.allocate_resources, 2091 args=(allocated, master, eid, expid, expcert, tbparams, 2092 tmpdir, alloc_log), 2093 name=eid) 2094 t.start() 2095 2096 rv = { 2097 'experimentID': [ 2098 {'localname' : eid }, { 'fedid': copy.copy(expid) } 2099 ], 2100 'experimentStatus': 'starting', 2101 'experimentAccess': { 'X509' : expcert } 2102 } 2103 2104 return rv 2105 2106 2107 def new_create_experiment(self, req, fid): 2108 """ 2109 The external interface to experiment creation called from the 2110 dispatcher. 2111 2112 Creates a working directory, splits the incoming description using the 2113 splitter script and parses out the avrious subsections using the 2114 lcasses above. Once each sub-experiment is created, use pooled threads 2115 to instantiate them and start it all up. 2116 """ 2117 2118 if not self.auth.check_attribute(fid, 'create'): 2119 raise service_error(service_error.access, "Create access denied") 2120 2121 try: 2122 tmpdir = tempfile.mkdtemp(prefix="split-") 2123 except IOError: 2124 raise service_error(service_error.internal, "Cannot create tmp dir") 2125 2126 gw_pubkey_base = "fed.%s.pub" % self.ssh_type 2127 gw_secretkey_base = "fed.%s" % self.ssh_type 2128 gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base 2129 gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base 2130 tclfile = tmpdir + "/experiment.tcl" 2131 tbparams = { } 2132 try: 2133 access_user = self.accessdb[fid] 2134 except KeyError: 2135 raise service_error(service_error.internal, 2136 "Access map and authorizer out of sync in " + \ 2137 "create_experiment for fedid %s" % fid) 2138 2139 pid = "dummy" 2140 gid = "dummy" 2141 try: 2142 os.mkdir(tmpdir+"/keys") 2143 except OSError: 2144 raise service_error(service_error.internal, 2145 "Can't make temporary dir") 2146 2147 req = req.get('CreateRequestBody', None) 2148 if not req: 2149 raise service_error(service_error.req, 2150 "Bad request format (no CreateRequestBody)") 2151 # The tcl parser needs to read a file so put the content into that file 2152 descr=req.get('experimentdescription', None) 2153 if descr: 2154 file_content=descr.get('ns2description', None) 2155 if file_content: 2156 try: 2157 f = open(tclfile, 'w') 2158 f.write(file_content) 2159 f.close() 2160 except IOError: 2161 raise service_error(service_error.internal, 2162 "Cannot write temp experiment description") 2163 else: 2164 raise service_error(service_error.req, 2165 "Only ns2descriptions supported") 2166 else: 2167 raise service_error(service_error.req, "No experiment description") 2168 2169 # Generate an ID for the experiment (slice) and a certificate that the 2170 # allocator can use to prove they own it. We'll ship it back through 2171 # the encrypted connection. 2172 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 2173 2174 if req.has_key('experimentID') and \ 2175 req['experimentID'].has_key('localname'): 2176 overwrite = False 2177 eid = req['experimentID']['localname'] 2178 # If there's an old failed experiment here with the same local name 2179 # and accessible by this user, we'll overwrite it, otherwise we'll 2180 # fall through and do the collision avoidance. 2181 old_expid = self.get_experiment_fedid(eid) 2182 if old_expid and self.check_experiment_access(fid, old_expid): 2183 self.state_lock.acquire() 2184 status = self.state[eid].get('experimentStatus', None) 2185 if status and status == 'failed': 2186 # remove the old access attribute 2187 self.auth.unset_attribute(fid, old_expid) 2188 overwrite = True 2189 del self.state[eid] 2190 del self.state[old_expid] 2191 self.state_lock.release() 2192 self.state_lock.acquire() 2193 while (self.state.has_key(eid) and not overwrite): 2194 eid += random.choice(string.ascii_letters) 2195 # Initial state 2196 self.state[eid] = { 2197 'experimentID' : \ 2198 [ { 'localname' : eid }, {'fedid': expid } ], 2199 'experimentStatus': 'starting', 2200 'experimentAccess': { 'X509' : expcert }, 2201 'owner': fid, 2202 'log' : [], 2203 } 2204 self.state[expid] = self.state[eid] 2205 if self.state_filename: self.write_state() 2206 self.state_lock.release() 2207 else: 2208 eid = self.exp_stem 2209 for i in range(0,5): 2210 eid += random.choice(string.ascii_letters) 2211 self.state_lock.acquire() 2212 while (self.state.has_key(eid)): 2213 eid = self.exp_stem 2214 for i in range(0,5): 2215 eid += random.choice(string.ascii_letters) 2216 # Initial state 2217 self.state[eid] = { 2218 'experimentID' : \ 2219 [ { 'localname' : eid }, {'fedid': expid } ], 2220 'experimentStatus': 'starting', 2221 'experimentAccess': { 'X509' : expcert }, 2222 'owner': fid, 2223 'log' : [], 2224 } 2225 self.state[expid] = self.state[eid] 2226 if self.state_filename: self.write_state() 2227 self.state_lock.release() 2228 2229 try: 2230 # This catches exceptions to clear the placeholder if necessary 2231 try: 2232 self.generate_ssh_keys(gw_secretkey, self.ssh_type) 2233 except ValueError: 2234 raise service_error(service_error.server_config, 2235 "Bad key type (%s)" % self.ssh_type) 2236 2237 user = req.get('user', None) 2238 if user == None: 2239 raise service_error(service_error.req, "No user") 2240 2241 master = req.get('master', None) 2242 if not master: 2243 raise service_error(service_error.req, 2244 "No master testbed label") 2245 export_project = req.get('exportProject', None) 2246 if not export_project: 2247 raise service_error(service_error.req, "No export project") 2248 2249 if self.splitter_url: 2250 self.log.debug("Calling remote splitter at %s" % \ 2251 self.splitter_url) 2252 split_data = self.remote_splitter(self.splitter_url, 2253 file_content, master) 2254 else: 2255 tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 2256 str(self.muxmax), '-m', master] 2257 2258 if self.fedkit: 2259 tclcmd.append('-k') 2260 2261 if self.gatewaykit: 2262 tclcmd.append('-K') 2263 2264 tclcmd.extend([pid, gid, eid, tclfile]) 2265 2266 self.log.debug("running local splitter %s", " ".join(tclcmd)) 2267 # This is just fantastic. As a side effect the parser copies 2268 # tb_compat.tcl into the current directory, so that directory 2269 # must be writable by the fedd user. Doing this in the 2270 # temporary subdir ensures this is the case. 2271 tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 2272 cwd=tmpdir) 2273 split_data = tclparser.stdout 2274 2275 allocated = { } # Testbeds we can access 2276 # XXX here's where we're working 2277 def out_topo(filename, t): 2278 try: 2279 f = open("/tmp/%s" % filename, "w") 2280 print >> f, "%s" % \ 2281 topdl.topology_to_xml(t, top="experiment") 2282 f.close() 2283 except IOError, e: 2284 raise service_error(service_error.internal, "Can't open file") 2285 2286 try: 2287 2288 top = topdl.topology_from_xml(file=split_data, top="experiment") 2289 subs = sorted(top.substrates, 2290 cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)), 2291 reverse=True) 2292 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24) 2293 for s in subs: 2294 a = ips.allocate(len(s.interfaces)+2) 2295 if a : 2296 base, num = a 2297 if num < len(s.interfaces) +2 : 2298 raise service_error(service_error.internal, 2299 "Allocator returned wrong number of IPs??") 2300 else: 2301 raise service_error(service_error.req, 2302 "Cannot allocate IP addresses") 2303 2304 base += 1 2305 for i in s.interfaces: 2306 i.attribute.append( 2307 topdl.Attribute('ip4_address', 2308 "%s" % ip_addr(base))) 2309 base += 1 2310 2311 testbeds = set([ a.value for e in top.elements \ 2312 for a in e.attribute \ 2313 if a.attribute == 'testbed'] ) 2314 topo ={ } 2315 for tb in testbeds: 2316 self.get_access(tb, None, user, tbparams, master, 2317 export_project, access_user) 2318 topo[tb] = top.clone() 2319 to_delete = [ ] 2320 for e in topo[tb].elements: 2321 etb = e.get_attribute('testbed') 2322 if etb and etb != tb: 2323 for i in e.interface: 2324 for s in i.subs: 2325 try: 2326 s.interfaces.remove(i) 2327 except ValueError: 2328 raise service_error(service_error.internal, 2329 "Can't remove interface??") 2330 to_delete.append(e) 2331 for e in to_delete: 2332 topo[tb].elements.remove(e) 2333 topo[tb].make_indices() 2334 2335 2336 2337 for s in top.substrates: 2338 tests = { } 2339 for i in s.interfaces: 2340 e = i.element 2341 tb = e.get_attribute('testbed') 2342 if tb and not tests.has_key(tb): 2343 for i in e.interface: 2344 if s in i.subs: 2345 tests[tb]= \ 2346 i.get_attribute('ip4_address') 2347 if len(tests) < 2: 2348 continue 2349 2350 # More than one testbed is on this substrate. Insert 2351 # some gateways into the subtopologies. 2352 2353 for st in tests.keys(): 2354 for dt in [ t for t in tests.keys() if t != st]: 2355 myname = "%stunnel" % dt 2356 desthost = "%stunnel" % st 2357 sproject = tbparams[st].get('project', 'project') 2358 dproject = tbparams[dt].get('project', 'project') 2359 sdomain = ".%s.%s%s" % (eid, sproject, 2360 tbparams[st].get('domain', ".example.com")) 2361 ddomain = ".%s.%s%s" % (eid, dproject, 2362 tbparams[dt].get('domain', ".example.com")) 2363 boss = tbparams[master].get('boss', "boss") 2364 fs = tbparams[master].get('fs', "fs") 2365 event_server = "%s%s" % \ 2366 (tbparams[st].get('eventserver', "event_server"), 2367 tbparams[dt].get('domain', "example.com")) 2368 remote_event_server = "%s%s" % \ 2369 (tbparams[dt].get('eventserver', "event_server"), 2370 tbparams[dt].get('domain', "example.com")) 2371 seer_control = "%s%s" % \ 2372 (tbparams[st].get('control', "control"), sdomain) 2373 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 2374 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 2375 conf_file = "%s%s.gw.conf" % (myname, sdomain) 2376 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 2377 # translate to lower case so the `hostname` hack for specifying 2378 # configuration files works. 2379 conf_file = conf_file.lower(); 2380 remote_conf_file = remote_conf_file.lower(); 2381 active = ("%s" % (st == master)) 2382 portal = topdl.Computer(**{ 2383 'name': "%stunnel" % dt, 2384 'attribute' : [{ 2385 'attribute': n, 2386 'value': v, 2387 } for n, v in (\ 2388 ('gateway', 'true'), 2389 ('boss', boss), 2390 ('fs', fs), 2391 ('event_server', event_server), 2392 ('remote_event_server', remote_event_server), 2393 ('seer_control', seer_control), 2394 ('local_key_dir', local_key_dir), 2395 ('remote_conf_dir', remote_conf_dir), 2396 ('conf_file', conf_file), 2397 ('remote_conf_file', remote_conf_file), 2398 ('remote_script_dir', "/usr/local/federation/bin"), 2399 ('local_script_dir', "/usr/local/federation/bin"), 2400 )], 2401 'interface': [{ 2402 'substrate': s.name, 2403 'attribute': [ { 2404 'attribute': 'ip4_addreess', 2405 'value': tests[dt], 2406 }, ], 2407 }, ], 2408 }) 2409 topo[st].elements.append(portal) 2410 # Connect the gateway nodes into the topologies and clear out 2411 # substrates that are not in the topologies 2412 for tb in testbeds: 2413 topo[tb].incorporate_elements() 2414 topo[tb].substrates = \ 2415 [s for s in topo[tb].substrates \ 2416 if len(s.interfaces) >0] 2417 2418 softdir ="%s/software" % tmpdir 2419 softmap = { } 2420 os.mkdir(softdir) 2421 pkgs = set([fedkit, gatewaykit]) 2422 pkgs.update([x.location for e in top.elements \ 2423 for x in e.software]) 2424 for pkg in pkgs: 2425 loc = pkg 2426 2427 scheme, host, path = urlparse(loc)[0:3] 2428 dest = os.path.basename(path) 2429 if not scheme: 2430 if not loc.startswith('/'): 2431 loc = "/%s" % loc 2432 loc = "file://%s" %loc 2433 try: 2434 u = urlopen(loc) 2435 except Exception, e: 2436 raise service_error(service_error.req, 2437 "Cannot open %s: %s" % (loc, e)) 2438 try: 2439 f = open("%s/%s" % (softdir, dest) , "w") 2440 data = u.read(4096) 2441 while data: 2442 f.write(data) 2443 data = u.read(4096) 2444 f.close() 2445 u.close() 2446 except Exception, e: 2447 raise service_error(service_error.internal, 2448 "Could not copy %s: %s" % (loc, e)) 2449 path = re.sub("/tmp", "", softdir) 2450 # XXX 2451 softmap[pkg] = \ 2452 "https://users.isi.deterlab.net:23232/%s/%s" %\ 2453 ( path, dest) 2454 2455 # Convert the software locations in the segments into the local 2456 # copies on this host 2457 for soft in [ s for tb in topo.values() \ 2458 for e in tb.elements \ 2459 for s in e.software ]: 2460 if softmap.has_key(soft.location): 2461 soft.location = softmap[soft.location] 2462 for tb in testbeds: 2463 out_topo("%s.xml" %tb, topo[tb]) 2464 2465 vtopo = topdl.topology_to_vtopo(top) 2466 vis = self.genviz(vtopo) 2467 2468 except Exception, e: 2469 traceback.print_exc() 2470 raise service_error(service_error.internal, "%s" % e) 2471 2472 2473 2474 # Build the testbed topologies: 2475 2476 2477 if True: 2478 raise service_error(service_error.internal, "Developing") 2479 2480 # XXX old code 2481 # Objects to parse the splitter output (defined above) 2482 parse_current_testbed = self.current_testbed(eid, tmpdir, 2483 self.fedkit, self.gatewaykit) 2484 parse_allbeds = self.allbeds(self.get_access) 2485 parse_gateways = self.gateways(eid, master, tmpdir, 2486 gw_pubkey_base, gw_secretkey_base, self.copy_file, 2487 self.fedkit) 2488 parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", 2489 "^#\s+End\s+Vtopo") 2490 parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", 2491 "^#\s+End\s+hostnames", tmpdir + "/hosts") 2492 parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", 2493 "^#\s+End\s+tarfiles") 2494 parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", 2495 "^#\s+End\s+rpms") 2496 2497 # Working on the split data 2498 for line in split_data: 2499 line = line.rstrip() 2500 if parse_current_testbed(line, master, allocated, tbparams): 2501 continue 2502 elif parse_allbeds(line, user, tbparams, master, export_project, 2503 access_user): 2504 continue 2505 elif parse_gateways(line, allocated, tbparams): 2506 continue 2507 elif parse_vtopo(line): 2508 continue 2509 elif parse_hostnames(line): 2510 continue 2511 elif parse_tarfiles(line): 2512 continue 2513 elif parse_rpms(line): 2514 continue 2515 else: 2516 raise service_error(service_error.internal, 2517 "Bad tcl parse? %s" % line) 2518 # Virtual topology and visualization 2519 vtopo = self.gentopo(parse_vtopo.str) 2520 if not vtopo: 2521 raise service_error(service_error.internal, 2522 "Failed to generate virtual topology") 2523 2524 vis = self.genviz(vtopo) 2525 if not vis: 2526 raise service_error(service_error.internal, 2527 "Failed to generate visualization") 2528 2529 2530 # save federant information 2531 for k in allocated.keys(): 2532 tbparams[k]['federant'] = {\ 2533 'name': [ { 'localname' : eid} ],\ 2534 'emulab': tbparams[k]['emulab'],\ 2535 'allocID' : tbparams[k]['allocID'],\ 2536 'master' : k == master,\ 2537 } 2538 2539 self.state_lock.acquire() 2540 self.state[eid]['vtopo'] = vtopo 2541 self.state[eid]['vis'] = vis 2542 self.state[expid]['federant'] = \ 2543 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 2544 if tbparams[tb].has_key('federant') ] 2545 if self.state_filename: self.write_state() 2546 self.state_lock.release() 2547 2548 # Copy tarfiles and rpms needed at remote sites into a staging area 2549 try: 2550 if self.fedkit: 2551 for t in self.fedkit: 2552 parse_tarfiles.list.append(t[1]) 2553 if self.gatewaykit: 2554 for t in self.gatewaykit: 2555 parse_tarfiles.list.append(t[1]) 2556 for t in parse_tarfiles.list: 2557 if not os.path.exists("%s/tarfiles" % tmpdir): 2558 os.mkdir("%s/tarfiles" % tmpdir) 2559 self.copy_file(t, "%s/tarfiles/%s" % \ 2560 (tmpdir, os.path.basename(t))) 2561 for r in parse_rpms.list: 2562 if not os.path.exists("%s/rpms" % tmpdir): 2563 os.mkdir("%s/rpms" % tmpdir) 2564 self.copy_file(r, "%s/rpms/%s" % \ 2565 (tmpdir, os.path.basename(r))) 2566 # A null experiment file in case we need to create a remote 2567 # experiment from scratch 2568 f = open("%s/null.tcl" % tmpdir, "w") 2569 print >>f, """ 2570 set ns [new Simulator] 2571 source tb_compat.tcl 2572 2573 set a [$ns node] 2574 2575 $ns rtproto Session 2576 $ns run 2577 """ 2578 f.close() 2579 2580 except IOError, e: 2581 raise service_error(service_error.internal, 2582 "Cannot stage tarfile/rpm: %s" % e.strerror) 2583 2584 except service_error, e: 2585 # If something goes wrong in the parse (usually an access error) 2586 # clear the placeholder state. From here on out the code delays 2587 # exceptions. Failing at this point returns a fault to the remote 2588 # caller. 2589 self.state_lock.acquire() 2590 del self.state[eid] 2591 del self.state[expid] 2592 if self.state_filename: self.write_state() 2593 self.state_lock.release() 2594 raise e 2595 2596 2597 # Start the background swapper and return the starting state. From 2598 # here on out, the state will stick around a while. 2599 2600 # Let users touch the state 2601 self.auth.set_attribute(fid, expid) 2602 self.auth.set_attribute(expid, expid) 2603 # Override fedids can manipulate state as well 2604 for o in self.overrides: 2605 self.auth.set_attribute(o, expid) 2606 2607 # Create a logger that logs to the experiment's state object as well as 2608 # to the main log file. 2609 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2610 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 2611 # XXX: there should be a global one of these rather than repeating the 2612 # code. 2613 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2614 '%d %b %y %H:%M:%S')) 2615 alloc_log.addHandler(h) 2616 2617 # Start a thread to do the resource allocation 2618 t = Thread(target=self.allocate_resources, 2619 args=(allocated, master, eid, expid, expcert, tbparams, 2620 tmpdir, alloc_log), 2621 name=eid) 2622 t.start() 2623 2624 rv = { 2625 'experimentID': [ 2626 {'localname' : eid }, { 'fedid': copy.copy(expid) } 2627 ], 2628 'experimentStatus': 'starting', 2629 'experimentAccess': { 'X509' : expcert } 2630 } 2631 2632 return rv 2098 2633 2099 2634 def get_experiment_fedid(self, key): 2100 2635 """ 2101 2102 """ 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2636 find the fedid associated with the localname key in the state database. 2637 """ 2638 2639 rv = None 2640 self.state_lock.acquire() 2641 if self.state.has_key(key): 2642 if isinstance(self.state[key], dict): 2643 try: 2644 kl = [ f['fedid'] for f in \ 2645 self.state[key]['experimentID']\ 2646 if f.has_key('fedid') ] 2647 except KeyError: 2648 self.state_lock.release() 2649 raise service_error(service_error.internal, 2650 "No fedid for experiment %s when getting "+\ 2651 "fedid(!?)" % key) 2652 if len(kl) == 1: 2653 rv = kl[0] 2654 else: 2655 self.state_lock.release() 2656 raise service_error(service_error.internal, 2657 "multiple fedids for experiment %s when " +\ 2658 "getting fedid(!?)" % key) 2659 else: 2660 self.state_lock.release() 2661 raise service_error(service_error.internal, 2662 "Unexpected state for %s" % key) 2663 self.state_lock.release() 2664 return rv 2130 2665 2131 2666 def check_experiment_access(self, fid, key): … … 2136 2671 """ 2137 2672 if not isinstance(key, fedid): 2138 2673 key = self.get_experiment_fedid(key) 2139 2674 2140 2675 if self.auth.check_attribute(fid, key): … … 2144 2679 2145 2680 2681 def get_handler(self, path, fid): 2682 print "in get_handler %s %s" % (path, fid) 2683 return ("/users/faber/test.html", "text/html") 2146 2684 2147 2685 def get_vtopo(self, req, fid): … … 2150 2688 """ 2151 2689 rv = None 2152 2690 state = None 2153 2691 2154 2692 req = req.get('VtopoRequestBody', None) … … 2173 2711 self.state_lock.acquire() 2174 2712 if self.state.has_key(key): 2175 2176 2177 2178 2179 2180 2713 if self.state[key].has_key('vtopo'): 2714 rv = { 'experiment' : {keytype: key },\ 2715 'vtopo': self.state[key]['vtopo'],\ 2716 } 2717 else: 2718 state = self.state[key]['experimentStatus'] 2181 2719 self.state_lock.release() 2182 2720 2183 2721 if rv: return rv 2184 2722 else: 2185 2186 2187 2188 2189 2723 if state: 2724 raise service_error(service_error.partial, 2725 "Not ready: %s" % state) 2726 else: 2727 raise service_error(service_error.req, "No such experiment") 2190 2728 2191 2729 def get_vis(self, req, fid): … … 2194 2732 """ 2195 2733 rv = None 2196 2734 state = None 2197 2735 2198 2736 req = req.get('VisRequestBody', None) … … 2217 2755 self.state_lock.acquire() 2218 2756 if self.state.has_key(key): 2219 2220 2221 2222 2223 2224 2757 if self.state[key].has_key('vis'): 2758 rv = { 'experiment' : {keytype: key },\ 2759 'vis': self.state[key]['vis'],\ 2760 } 2761 else: 2762 state = self.state[key]['experimentStatus'] 2225 2763 self.state_lock.release() 2226 2764 2227 2765 if rv: return rv 2228 2766 else: 2229 2230 2231 2232 2233 2767 if state: 2768 raise service_error(service_error.partial, 2769 "Not ready: %s" % state) 2770 else: 2771 raise service_error(service_error.req, "No such experiment") 2234 2772 2235 2773 def clean_info_response(self, rv): 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2774 """ 2775 Remove the information in the experiment's state object that is not in 2776 the info response. 2777 """ 2778 # Remove the owner info (should always be there, but...) 2779 if rv.has_key('owner'): del rv['owner'] 2780 2781 # Convert the log into the allocationLog parameter and remove the 2782 # log entry (with defensive programming) 2783 if rv.has_key('log'): 2784 rv['allocationLog'] = "".join(rv['log']) 2785 del rv['log'] 2786 else: 2787 rv['allocationLog'] = "" 2788 2789 if rv['experimentStatus'] != 'active': 2790 if rv.has_key('federant'): del rv['federant'] 2791 else: 2792 # remove the allocationID info from each federant 2793 for f in rv.get('federant', []): 2794 if f.has_key('allocID'): del f['allocID'] 2795 return rv 2258 2796 2259 2797 def get_info(self, req, fid): … … 2290 2828 self.state_lock.release() 2291 2829 2292 2293 2830 if rv: 2831 return self.clean_info_response(rv) 2294 2832 else: 2295 2833 raise service_error(service_error.req, "No such experiment") 2296 2834 2297 2835 def get_multi_info(self, req, fid): … … 2299 2837 Return all the stored info that this fedid can access 2300 2838 """ 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2839 rv = { 'info': [ ] } 2840 2841 self.state_lock.acquire() 2842 for key in [ k for k in self.state.keys() if isinstance(k, fedid)]: 2843 self.check_experiment_access(fid, key) 2844 2845 if self.state.has_key(key): 2846 e = copy.deepcopy(self.state[key]) 2847 e = self.clean_info_response(e) 2848 rv['info'].append(e) 2311 2849 self.state_lock.release() 2312 2850 return rv 2313 2851 2314 2852 … … 2323 2861 raise service_error(service_error.req, 2324 2862 "Bad request format (no TerminateRequestBody)") 2325 2863 force = req.get('force', False) 2326 2864 exp = req.get('experiment', None) 2327 2865 if exp: … … 2339 2877 self.check_experiment_access(fid, key) 2340 2878 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2879 dealloc_list = [ ] 2880 2881 2882 # Create a logger that logs to the dealloc_list as well as to the main 2883 # log file. 2884 dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key) 2885 h = logging.StreamHandler(self.list_log(dealloc_list)) 2886 # XXX: there should be a global one of these rather than repeating the 2887 # code. 2888 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2889 '%d %b %y %H:%M:%S')) 2890 dealloc_log.addHandler(h) 2353 2891 2354 2892 self.state_lock.acquire() … … 2361 2899 # remove the experiment state when the termination is complete. 2362 2900 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2901 # First make sure that the experiment creation is complete. 2902 status = fed_exp.get('experimentStatus', None) 2903 2904 if status: 2905 if status in ('starting', 'terminating'): 2906 if not force: 2907 self.state_lock.release() 2908 raise service_error(service_error.partial, 2909 'Experiment still being created or destroyed') 2910 else: 2911 self.log.warning('Experiment in %s state ' % status + \ 2912 'being terminated by force.') 2913 else: 2914 # No status??? trouble 2915 self.state_lock.release() 2916 raise service_error(service_error.internal, 2917 "Experiment has no status!?") 2380 2918 2381 2919 ids = [] … … 2416 2954 'aid': aid,\ 2417 2955 } 2418 2956 fed_exp['experimentStatus'] = 'terminating' 2419 2957 if self.state_filename: self.write_state() 2420 2958 self.state_lock.release() 2421 2959 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2960 # Stop everyone. NB, wait_for_all waits until a thread starts and 2961 # then completes, so we can't wait if nothing starts. So, no 2962 # tbparams, no start. 2963 if len(tbparams) > 0: 2964 thread_pool = self.thread_pool(self.nthreads) 2965 for tb in tbparams.keys(): 2966 # Create and start a thread to stop the segment 2967 thread_pool.wait_for_slot() 2968 t = self.pooled_thread(\ 2969 target=self.stop_segment(log=dealloc_log, 2970 keyfile=self.ssh_privkey_file, debug=self.debug), 2971 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2972 pdata=thread_pool, trace_file=self.trace_file) 2973 t.start() 2974 # Wait for completions 2975 thread_pool.wait_for_all_done() 2976 2977 # release the allocations (failed experiments have done this 2978 # already, and starting experiments may be in odd states, so we 2979 # ignore errors releasing those allocations 2980 try: 2981 for tb in tbparams.keys(): 2982 self.release_access(tb, tbparams[tb]['aid']) 2983 except service_error, e: 2984 if status != 'failed' and not force: 2985 raise e 2448 2986 2449 2987 # Remove the terminated experiment … … 2455 2993 self.state_lock.release() 2456 2994 2457 2458 2459 2460 2995 return { 2996 'experiment': exp , 2997 'deallocationLog': "".join(dealloc_list), 2998 } 2461 2999 else: 2462 3000 # Don't forget to release the lock -
fedd/federation/topdl.py
rab37086 rdb6b092 23 23 def get_attribute(self, key): 24 24 rv = None 25 attrs = getattr(self, 'attribute s', None)25 attrs = getattr(self, 'attribute', None) 26 26 if attrs: 27 27 for a in attrs: … … 40 40 self.value = value 41 41 42 def clone(self): 43 return Attribute(attribute=self.attribute, value=self.value) 44 42 45 def to_dict(self): 43 46 return { 'attribute': self.attribute, 'value': self.value } … … 48 51 self.kind = kind 49 52 53 def clone(self): 54 return Capacity(rate=self.rate, kind=self.kind) 55 50 56 def to_dict(self): 51 57 return { 'rate': self.rate, 'kind': self.kind } … … 55 61 self.time = time 56 62 self.kind = kind 63 64 def clone(self): 65 return Latency(time=self.time, kind=self.kind) 57 66 58 67 def to_dict(self): … … 68 77 self.interfaces = [ ] 69 78 79 def clone(self): 80 if self.capacity: c = self.capacity.clone() 81 else: c = None 82 83 if self.latency: l = self.latency.clone() 84 else: l = None 85 86 return Substrate(name=self.name, 87 capacity=c, 88 latency=l, 89 attribute = [a.clone() for a in self.attribute]) 90 70 91 def to_dict(self): 71 92 rv = { 'name': self.name } … … 83 104 self.attribute = [ self.init_class(Attribute, a) for a in \ 84 105 self.make_list(attribute) ] 106 107 def clone(self): 108 return CPU(type=self.type, 109 attribute = [a.clone() for a in self.attribute]) 85 110 86 111 def to_dict(self): … … 100 125 for a in self.make_list(attribute) ] 101 126 127 def clone(self): 128 return Storage(amount=self.amount, persistence=self.persistence, 129 attribute = [a.clone() for a in self.attribute]) 130 102 131 def to_dict(self): 103 132 rv = { 'amount': float(self.amount), 'persistence': self.persistence } … … 116 145 for a in self.make_list(attribute) ] 117 146 147 def clone(self): 148 return OperatingSystem(name=self.name, 149 version=self.version, 150 distribution=self.distribution, 151 distributionversion=self.distributionversion, 152 attribute = [ a.clone() for a in self.attribute]) 153 118 154 def to_dict(self): 119 155 rv = { } … … 132 168 self.attribute = [ self.init_class(Attribute, a)\ 133 169 for a in self.make_list(attribute) ] 170 171 def clone(self): 172 return Software(location=self.location, install=self.install, 173 attribute=[a.clone() for a in self.attribute]) 134 174 135 175 def to_dict(self): … … 150 190 self.element = element 151 191 self.subs = [ ] 192 193 def clone(self): 194 if self.capacity: c = self.capacity.clone() 195 else: c = None 196 197 if self.latency: l = self.latency.clone() 198 else: l = None 199 200 return Interface(substrate=self.substrate, 201 capacity=c, latency=l, 202 attribute = [ a.clone() for a in self.attribute]) 152 203 153 204 def to_dict(self): … … 181 232 map(assign_element, self.interface) 182 233 234 def clone(self): 235 return Computer(name=self.name, 236 cpu=[x.clone() for x in self.cpu], 237 os=[x.clone() for x in self.os], 238 software=[x.clone() for x in self.software], 239 storage=[x.clone() for x in self.storage], 240 interface=[x.clone() for x in self.interface], 241 attribute=[x.clone() for x in self.attribute]) 242 183 243 def to_dict(self): 184 244 rv = { } 245 if self.name: 246 rv['name'] = self.name 185 247 if self.cpu: 186 248 rv['cpu'] = [ c.to_dict() for c in self.cpu ] … … 203 265 self.attribute = [ self.init_class(Attribute, c) \ 204 266 for c in self.make_list(attribute) ] 267 268 def clone(self): 269 return Other(interface=[i.clone() for i in self.interface], 270 attribute=[a.clone() for a in attribute]) 205 271 206 272 def to_dict(self): … … 225 291 } 226 292 227 for k in e.keys(): 228 cl = classmap.get(k, None) 229 if cl: return cl(**e[k]) 293 if isinstance(e, dict): 294 for k in e.keys(): 295 cl = classmap.get(k, None) 296 if cl: return cl(**e[k]) 297 else: 298 return e 230 299 231 300 def __init__(self, substrates=[], elements=[]): … … 234 303 self.elements = [ self.init_element(e) \ 235 304 for e in self.make_list(elements) ] 305 self.incorporate_elements() 306 307 def incorporate_elements(self): 308 236 309 # Could to this init in one gulp, but we want to look for duplicate 237 310 # substrate names 238 311 substrate_map = { } 239 312 for s in self.substrates: 313 s.interfaces = [ ] 240 314 if not substrate_map.has_key(s.name): 241 315 substrate_map[s.name] = s … … 243 317 raise ConsistencyError("Duplicate substrate name %s" % s.name) 244 318 245 substrate_map = dict([ (s.name, s) for s in self.substrates ])246 319 for e in self.elements: 247 320 for i in e.interface: 321 i.element = e 322 i.subs = [ ] 248 323 for sn in i.substrate: 249 324 # NB, interfaces have substrate names in their substrate … … 256 331 raise ConsistencyError("No such substrate for %s" % sn) 257 332 333 def clone(self): 334 return Topology(substrates=[s.clone() for s in self.substrates], 335 elements=[e.clone() for e in self.elements]) 336 337 338 def make_indices(self): 339 sub_index = dict([(s.name, s) for s in self.substrates]) 340 elem_index = dict([(n, e) for e in self.elements for n in e.name]) 341 258 342 def to_dict(self): 259 343 rv = { } … … 264 348 return rv 265 349 266 def topology_from_xml(string=None, file=None, top="topology"):350 def topology_from_xml(string=None, file=None, filename=None, top="topology"): 267 351 import xml.parsers.expat 268 352 … … 318 402 xp.CharacterDataHandler = p.char_data 319 403 320 if file and string: 321 raise RuntimeError("Only one of file and string") 404 num_set = len([ x for x in (string, filename, file)\ 405 if x is not None ]) 406 407 if num_set != 1: 408 raise RuntimeError("Exactly one one of file, filename and string " + \ 409 "must be set") 410 elif filename: 411 f = open(filename, "r") 412 xp.ParseFile(f) 322 413 elif file: 323 f = open(file, "r") 324 xp.ParseFile(f) 414 xp.ParseFile(file) 325 415 elif string: 326 416 xp.Parse(string, isfinal=True) … … 360 450 361 451 362 452 def topology_to_vtopo(t): 453 nodes = [ ] 454 lans = [ ] 455 456 for eidx, e in enumerate(t.elements): 457 n = { } 458 if e.name: name = e.name[0] 459 else: name = "unnamed_node%d" % eidx 460 461 ips = [ ] 462 for i in e.interfaces: 463 ip = i.get_attribute('ip4_address') 464 ips.append(ip) 465 port = "%s:%d" % (name, idx) 466 for idx, s in enumerate(i.subs): 467 bw = 100000 468 delay = 0.0 469 if s.capacity: 470 bw = s.capacity.rate 471 if i.capacity: 472 bw = i.capacity.rate 473 474 if s.latency: 475 delay = s.latency.time 476 if i.latency: 477 bw = i.latency.time 478 479 lans.append({ 480 'member': port, 481 'vname': s.name, 482 'ip': ip, 483 'vnode': name, 484 'delay': delay, 485 'bandwidth': bw, 486 }) 487 nodes.append({ 488 'ips': ":".join(ips), 489 'vname': name, 490 }) 491 492 nodes.append(n) 493 return { 'vtopo': { 'node': node, 'lan': lan } }
Note: See TracChangeset
for help on using the changeset viewer.