Changeset cf0ff4f for fedd/federation
- Timestamp:
- Dec 6, 2010 4:50:57 PM (14 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master
- Children:
- 35a5879
- Parents:
- 5ecb9a3
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r5ecb9a3 rcf0ff4f 509 509 self.log.error("Pickling problem (TypeError): %s" % e) 510 510 511 512 def remove_dirs(self, dir): 513 """ 514 Remove the directory tree and all files rooted at dir. Log any errors, 515 but continue. 516 """ 517 self.log.debug("[removedirs]: removing %s" % dir) 518 try: 519 for path, dirs, files in os.walk(dir, topdown=False): 520 for f in files: 521 os.remove(os.path.join(path, f)) 522 for d in dirs: 523 os.rmdir(os.path.join(path, d)) 524 os.rmdir(dir) 525 except EnvironmentError, e: 526 self.log.error("Error deleting directory tree in %s" % e); 527 528 @staticmethod 529 def make_temp_certfile(expcert, tmpdir): 530 """ 531 make a protected copy of the access certificate so the experiment 532 controller can act as the experiment principal. mkstemp is the most 533 secure way to do that. The directory should be created by 534 mkdtemp. Return the filename. 535 """ 536 if expcert and tmpdir: 537 try: 538 certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir) 539 f = os.fdopen(certf, 'w') 540 print >> f, expcert 541 f.close() 542 except EnvironmentError, e: 543 raise service_error(service_error.internal, 544 "Cannot create temp cert file?") 545 return certfn 546 else: 547 return None 548 511 549 512 550 def generate_ssh_keys(self, dest, type="rsa" ): … … 847 885 (self.testbed, e)) 848 886 return False 849 850 887 851 888 def allocate_resources(self, allocated, masters, eid, expid, … … 1406 1443 "new_experiment for fedid %s" % fid) 1407 1444 1408 pid = "dummy"1409 gid = "dummy"1410 1411 1445 # Generate an ID for the experiment (slice) and a certificate that the 1412 1446 # allocator can use to prove they own it. We'll ship it back through … … 1452 1486 return rv 1453 1487 1488 # create_experiment sub-functions 1489 1454 1490 @staticmethod 1455 def get_ create_key(req):1491 def get_experiment_key(req, field='experimentID'): 1456 1492 """ 1457 1493 Parse the experiment identifiers out of the request (the request body … … 1462 1498 """ 1463 1499 # Get the experiment access 1464 exp = req.get( 'experimentID', None)1500 exp = req.get(field, None) 1465 1501 if exp: 1466 1502 if exp.has_key('fedid'): … … 1611 1647 return masters, pmasters 1612 1648 1613 1614 def create_experiment(self, req, fid): 1615 """ 1616 The external interface to experiment creation called from the 1617 dispatcher. 1618 1619 Creates a working directory, splits the incoming description using the 1620 splitter script and parses out the various subsections using the 1621 classes above. Once each sub-experiment is created, use pooled threads 1622 to instantiate them and start it all up. 1623 """ 1624 1625 req = req.get('CreateRequestBody', None) 1626 if req: 1627 key = self.get_create_key(req) 1628 else: 1629 raise service_error(service_error.req, 1630 "Bad request format (no CreateRequestBody)") 1631 1632 # Import information from the requester 1633 if self.auth.import_credentials(data_list=req.get('credential', [])): 1634 self.auth.save() 1635 1636 # Make sure that the caller can talk to us 1637 self.check_experiment_access(fid, key) 1638 1639 # Install the testbed map entries supplied with the request into a copy 1640 # of the testbed map. 1641 tbmap = dict(self.tbmap) 1642 for m in req.get('testbedmap', []): 1643 if 'testbed' in m and 'uri' in m: 1644 tbmap[m['testbed']] = m['uri'] 1645 1646 # a place to work 1647 try: 1648 tmpdir = tempfile.mkdtemp(prefix="split-") 1649 os.mkdir(tmpdir+"/keys") 1650 except EnvironmentError: 1651 raise service_error(service_error.internal, "Cannot create tmp dir") 1652 1649 def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams): 1650 """ 1651 Create the ssh keys necessary for interconnecting the potral nodes and 1652 the global hosts file for letting each segment know about the IP 1653 addresses in play. Save these into the repo. Add attributes to the 1654 autorizer allowing access controllers to download them and return a set 1655 of attributes that inform the segments where to find this stuff. Mau 1656 raise service_errors in if there are problems. 1657 """ 1653 1658 gw_pubkey_base = "fed.%s.pub" % self.ssh_type 1654 1659 gw_secretkey_base = "fed.%s" % self.ssh_type 1655 1660 gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base 1656 1661 gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base 1657 tbparams = { } 1658 1659 eid, expid, expcert_file = \ 1660 self.get_experiment_ids_and_start(key, tmpdir) 1661 1662 # This catches exceptions to clear the placeholder if necessary 1663 try: 1664 if not (eid and expid): 1665 raise service_error(service_error.internal, 1666 "Cannot find local experiment info!?") 1667 try: 1668 self.generate_ssh_keys(gw_secretkey, self.ssh_type) 1669 except ValueError: 1670 raise service_error(service_error.server_config, 1671 "Bad key type (%s)" % self.ssh_type) 1672 1673 top = self.get_topology(req, tmpdir) 1674 1675 # Assign the IPs 1676 hosts, ip_allocator = self.allocate_ips_to_topo(top) 1677 # Find the testbeds to look up 1678 tb_hosts = { } 1679 testbeds = [ ] 1680 for e in top.elements: 1681 if isinstance(e, topdl.Computer): 1682 tb = e.get_attribute('testbed') or 'default' 1683 if tb in tb_hosts: tb_hosts[tb].append(e.name) 1684 else: 1685 tb_hosts[tb] = [ e.name ] 1686 testbeds.append(tb) 1687 1688 masters, pmasters = self.get_testbed_services(req) 1689 allocated = { } # Testbeds we can access 1690 topo ={ } # Sub topologies 1691 connInfo = { } # Connection information 1692 1693 self.get_access_to_testbeds(testbeds, fid, allocated, 1694 tbparams, masters, tbmap, expid, expcert_file) 1695 1696 self.split_topology(top, topo, testbeds) 1697 1698 # Copy configuration files into the remote file store 1699 # The config urlpath 1700 configpath = "/%s/config" % expid 1701 # The config file system location 1702 configdir ="%s%s" % ( self.repodir, configpath) 1703 try: 1704 os.makedirs(configdir) 1705 except EnvironmentError, e: 1706 raise service_error(service_error.internal, 1707 "Cannot create config directory: %s" % e) 1708 try: 1709 f = open("%s/hosts" % configdir, "w") 1710 f.write('\n'.join(hosts)) 1711 f.close() 1712 except EnvironmentError, e: 1713 raise service_error(service_error.internal, 1714 "Cannot write hosts file: %s" % e) 1715 try: 1716 copy_file("%s" % gw_pubkey, "%s/%s" % \ 1717 (configdir, gw_pubkey_base)) 1718 copy_file("%s" % gw_secretkey, "%s/%s" % \ 1719 (configdir, gw_secretkey_base)) 1720 except EnvironmentError, e: 1721 raise service_error(service_error.internal, 1722 "Cannot copy keyfiles: %s" % e) 1723 1724 # Allow the individual testbeds to access the configuration files. 1725 for tb in tbparams.keys(): 1726 asignee = tbparams[tb]['allocID']['fedid'] 1727 for f in ("hosts", gw_secretkey_base, gw_pubkey_base): 1728 self.auth.set_attribute(asignee, "%s/%s" % \ 1729 (configpath, f)) 1730 1731 part = experiment_partition(self.auth, self.store_url, tbmap, 1732 self.muxmax, self.direct_transit) 1733 part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator, 1734 connInfo, expid) 1735 # Now get access to the dynamic testbeds (those added above) 1736 for tb in [ t for t in topo if t not in allocated]: 1737 #XXX: ABAC 1738 if self.auth_type =='legacy': 1739 self.get_legacy_access(tb, None, tbparams, access_user, 1740 masters, tbmap) 1741 elif self.auth_type == 'abac': 1742 self.get_access(tb, tbparams, fid, masters, tbmap, 1743 expid, expcert_file) 1744 else: 1745 raise service_error(service_error.internal, 1746 "Unknown auth_type %s" % self.auth_type) 1747 allocated[tb] = 1 1748 store_keys = topo[tb].get_attribute('store_keys') 1749 # Give the testbed access to keys it exports or imports 1750 if store_keys: 1751 for sk in store_keys.split(" "): 1752 self.auth.set_attribute(\ 1753 tbparams[tb]['allocID']['fedid'], sk) 1662 1663 try: 1664 self.generate_ssh_keys(gw_secretkey, self.ssh_type) 1665 except ValueError: 1666 raise service_error(service_error.server_config, 1667 "Bad key type (%s)" % self.ssh_type) 1668 1669 1670 # Copy configuration files into the remote file store 1671 # The config urlpath 1672 configpath = "/%s/config" % expid 1673 # The config file system location 1674 configdir ="%s%s" % ( self.repodir, configpath) 1675 try: 1676 os.makedirs(configdir) 1677 except EnvironmentError, e: 1678 raise service_error(service_error.internal, 1679 "Cannot create config directory: %s" % e) 1680 try: 1681 f = open("%s/hosts" % configdir, "w") 1682 print >> f, string.join(hosts, '\n') 1683 f.close() 1684 except EnvironmentError, e: 1685 raise service_error(service_error.internal, 1686 "Cannot write hosts file: %s" % e) 1687 try: 1688 copy_file("%s" % gw_pubkey, "%s/%s" % \ 1689 (configdir, gw_pubkey_base)) 1690 copy_file("%s" % gw_secretkey, "%s/%s" % \ 1691 (configdir, gw_secretkey_base)) 1692 except EnvironmentError, e: 1693 raise service_error(service_error.internal, 1694 "Cannot copy keyfiles: %s" % e) 1695 1696 # Allow the individual testbeds to access the configuration files. 1697 for tb in tbparams.keys(): 1698 asignee = tbparams[tb]['allocID']['fedid'] 1699 for f in ("hosts", gw_secretkey_base, gw_pubkey_base): 1700 self.auth.set_attribute(asignee, "%s/%s" % \ 1701 (configpath, f)) 1754 1702 self.auth.save() 1755 1703 1756 self.wrangle_software(expid, top, topo, tbparams)1757 1758 vtopo = topdl.topology_to_vtopo(top)1759 vis = self.genviz(vtopo)1760 1761 # save federant information1762 for k in allocated.keys():1763 tbparams[k]['federant'] = {1764 'name': [ { 'localname' : eid} ],1765 'allocID' : tbparams[k]['allocID'],1766 'uri': tbparams[k]['uri'],1767 }1768 1769 self.state_lock.acquire()1770 self.state[eid]['vtopo'] = vtopo1771 self.state[eid]['vis'] = vis1772 self.state[eid]['experimentdescription'] = \1773 { 'topdldescription': top.to_dict() }1774 self.state[eid]['federant'] = \1775 [ tbparams[tb]['federant'] for tb in tbparams.keys() \1776 if tbparams[tb].has_key('federant') ]1777 if self.state_filename:1778 self.write_state()1779 self.state_lock.release()1780 except service_error, e:1781 # If something goes wrong in the parse (usually an access error)1782 # clear the placeholder state. From here on out the code delays1783 # exceptions. Failing at this point returns a fault to the remote1784 # caller.1785 1786 self.state_lock.acquire()1787 del self.state[eid]1788 del self.state[expid]1789 if self.state_filename: self.write_state()1790 self.state_lock.release()1791 if tmpdir and self.cleanup:1792 self.remove_dirs(tmpdir)1793 raise e1794 1795 1796 # Start the background swapper and return the starting state. From1797 # here on out, the state will stick around a while.1798 1799 # Let users touch the state1800 self.auth.set_attribute(fid, expid)1801 self.auth.set_attribute(expid, expid)1802 # Override fedids can manipulate state as well1803 for o in self.overrides:1804 self.auth.set_attribute(o, expid)1805 self.auth.save()1806 1807 # Create a logger that logs to the experiment's state object as well as1808 # to the main log file.1809 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)1810 alloc_collector = self.list_log(self.state[eid]['log'])1811 h = logging.StreamHandler(alloc_collector)1812 # XXX: there should be a global one of these rather than repeating the1813 # code.1814 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",1815 '%d %b %y %H:%M:%S'))1816 alloc_log.addHandler(h)1817 1818 1704 attrs = [ 1819 1705 { … … 1833 1719 }, 1834 1720 ] 1835 1836 # transit and disconnected testbeds may not have a connInfo entry. 1837 # Fill in the blanks. 1838 for t in allocated.keys(): 1839 if not connInfo.has_key(t): 1840 connInfo[t] = { } 1721 return attrs 1722 1723 1724 def get_vtopo(self, req, fid): 1725 """ 1726 Return the stored virtual topology for this experiment 1727 """ 1728 rv = None 1729 state = None 1730 1731 req = req.get('VtopoRequestBody', None) 1732 if not req: 1733 raise service_error(service_error.req, 1734 "Bad request format (no VtopoRequestBody)") 1735 exp = req.get('experiment', None) 1736 if exp: 1737 if exp.has_key('fedid'): 1738 key = exp['fedid'] 1739 keytype = "fedid" 1740 elif exp.has_key('localname'): 1741 key = exp['localname'] 1742 keytype = "localname" 1743 else: 1744 raise service_error(service_error.req, "Unknown lookup type") 1745 else: 1746 raise service_error(service_error.req, "No request?") 1747 1748 self.check_experiment_access(fid, key) 1749 1750 self.state_lock.acquire() 1751 if self.state.has_key(key): 1752 if self.state[key].has_key('vtopo'): 1753 rv = { 'experiment' : {keytype: key },\ 1754 'vtopo': self.state[key]['vtopo'],\ 1755 } 1756 else: 1757 state = self.state[key]['experimentStatus'] 1758 self.state_lock.release() 1759 1760 if rv: return rv 1761 else: 1762 if state: 1763 raise service_error(service_error.partial, 1764 "Not ready: %s" % state) 1765 else: 1766 raise service_error(service_error.req, "No such experiment") 1767 1768 def get_vis(self, req, fid): 1769 """ 1770 Return the stored visualization for this experiment 1771 """ 1772 rv = None 1773 state = None 1774 1775 req = req.get('VisRequestBody', None) 1776 if not req: 1777 raise service_error(service_error.req, 1778 "Bad request format (no VisRequestBody)") 1779 exp = req.get('experiment', None) 1780 if exp: 1781 if exp.has_key('fedid'): 1782 key = exp['fedid'] 1783 keytype = "fedid" 1784 elif exp.has_key('localname'): 1785 key = exp['localname'] 1786 keytype = "localname" 1787 else: 1788 raise service_error(service_error.req, "Unknown lookup type") 1789 else: 1790 raise service_error(service_error.req, "No request?") 1791 1792 self.check_experiment_access(fid, key) 1793 1794 self.state_lock.acquire() 1795 if self.state.has_key(key): 1796 if self.state[key].has_key('vis'): 1797 rv = { 'experiment' : {keytype: key },\ 1798 'vis': self.state[key]['vis'],\ 1799 } 1800 else: 1801 state = self.state[key]['experimentStatus'] 1802 self.state_lock.release() 1803 1804 if rv: return rv 1805 else: 1806 if state: 1807 raise service_error(service_error.partial, 1808 "Not ready: %s" % state) 1809 else: 1810 raise service_error(service_error.req, "No such experiment") 1811 1812 1813 def save_federant_information(self, allocated, tbparams, eid, vtopo, vis, 1814 top): 1815 """ 1816 Store the various data that have changed in the experiment state 1817 between when it was started and the beginning of resource allocation. 1818 This is basically the information about each local allocation. This 1819 fills in the values of the placeholder allocation in the state. 1820 """ 1821 # save federant information 1822 for k in allocated.keys(): 1823 tbparams[k]['federant'] = { 1824 'name': [ { 'localname' : eid} ], 1825 'allocID' : tbparams[k]['allocID'], 1826 'uri': tbparams[k]['uri'], 1827 } 1828 1829 self.state_lock.acquire() 1830 self.state[eid]['vtopo'] = vtopo 1831 self.state[eid]['vis'] = vis 1832 self.state[eid]['experimentdescription'] = \ 1833 { 'topdldescription': top.to_dict() } 1834 self.state[eid]['federant'] = \ 1835 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 1836 if tbparams[tb].has_key('federant') ] 1837 if self.state_filename: 1838 self.write_state() 1839 self.state_lock.release() 1840 1841 def clear_placeholder(self, eid, expid, tmpdir): 1842 """ 1843 Clear the placeholder and remove any allocated temporary dir. 1844 """ 1845 1846 self.state_lock.acquire() 1847 del self.state[eid] 1848 del self.state[expid] 1849 if self.state_filename: self.write_state() 1850 self.state_lock.release() 1851 if tmpdir and self.cleanup: 1852 self.remove_dirs(tmpdir) 1853 1854 # end of create_experiment sub-functions 1855 1856 def create_experiment(self, req, fid): 1857 """ 1858 The external interface to experiment creation called from the 1859 dispatcher. 1860 1861 Creates a working directory, splits the incoming description using the 1862 splitter script and parses out the various subsections using the 1863 classes above. Once each sub-experiment is created, use pooled threads 1864 to instantiate them and start it all up. 1865 """ 1866 1867 req = req.get('CreateRequestBody', None) 1868 if req: 1869 key = self.get_experiment_key(req) 1870 else: 1871 raise service_error(service_error.req, 1872 "Bad request format (no CreateRequestBody)") 1873 1874 # Import information from the requester 1875 if self.auth.import_credentials(data_list=req.get('credential', [])): 1876 self.auth.save() 1877 1878 # Make sure that the caller can talk to us 1879 self.check_experiment_access(fid, key) 1880 1881 # Install the testbed map entries supplied with the request into a copy 1882 # of the testbed map. 1883 tbmap = dict(self.tbmap) 1884 for m in req.get('testbedmap', []): 1885 if 'testbed' in m and 'uri' in m: 1886 tbmap[m['testbed']] = m['uri'] 1887 1888 # a place to work 1889 try: 1890 tmpdir = tempfile.mkdtemp(prefix="split-") 1891 os.mkdir(tmpdir+"/keys") 1892 except EnvironmentError: 1893 raise service_error(service_error.internal, "Cannot create tmp dir") 1894 1895 tbparams = { } 1896 1897 eid, expid, expcert_file = \ 1898 self.get_experiment_ids_and_start(key, tmpdir) 1899 1900 # This catches exceptions to clear the placeholder if necessary 1901 try: 1902 if not (eid and expid): 1903 raise service_error(service_error.internal, 1904 "Cannot find local experiment info!?") 1905 1906 top = self.get_topology(req, tmpdir) 1907 # Assign the IPs 1908 hosts, ip_allocator = self.allocate_ips_to_topo(top) 1909 # Find the testbeds to look up 1910 tb_hosts = { } 1911 testbeds = [ ] 1912 for e in top.elements: 1913 if isinstance(e, topdl.Computer): 1914 tb = e.get_attribute('testbed') or 'default' 1915 if tb in tb_hosts: tb_hosts[tb].append(e.name) 1916 else: 1917 tb_hosts[tb] = [ e.name ] 1918 testbeds.append(tb) 1919 1920 masters, pmasters = self.get_testbed_services(req) 1921 allocated = { } # Testbeds we can access 1922 topo ={ } # Sub topologies 1923 connInfo = { } # Connection information 1924 1925 self.get_access_to_testbeds(testbeds, fid, allocated, 1926 tbparams, masters, tbmap, expid, expcert_file) 1927 1928 self.split_topology(top, topo, testbeds) 1929 1930 attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams) 1931 1932 part = experiment_partition(self.auth, self.store_url, tbmap, 1933 self.muxmax, self.direct_transit) 1934 part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator, 1935 connInfo, expid) 1936 # Now get access to the dynamic testbeds (those added above) 1937 for tb in [ t for t in topo if t not in allocated]: 1938 self.get_access(tb, tbparams, fid, masters, tbmap, 1939 expid, expcert_file) 1940 allocated[tb] = 1 1941 store_keys = topo[tb].get_attribute('store_keys') 1942 # Give the testbed access to keys it exports or imports 1943 if store_keys: 1944 for sk in store_keys.split(" "): 1945 self.auth.set_attribute(\ 1946 tbparams[tb]['allocID']['fedid'], sk) 1947 self.auth.save() 1948 1949 # transit and disconnected testbeds may not have a connInfo entry. 1950 # Fill in the blanks. 1951 for t in allocated.keys(): 1952 if not connInfo.has_key(t): 1953 connInfo[t] = { } 1954 1955 self.wrangle_software(expid, top, topo, tbparams) 1956 1957 vtopo = topdl.topology_to_vtopo(top) 1958 vis = self.genviz(vtopo) 1959 self.save_federant_information(allocated, tbparams, eid, vtopo, 1960 vis, top) 1961 except service_error, e: 1962 # If something goes wrong in the parse (usually an access error) 1963 # clear the placeholder state. From here on out the code delays 1964 # exceptions. Failing at this point returns a fault to the remote 1965 # caller. 1966 self.clear_placeholder(eid, expid, tmpdir) 1967 raise e 1968 1969 # Start the background swapper and return the starting state. From 1970 # here on out, the state will stick around a while. 1971 1972 # Let users touch the state 1973 self.auth.set_attribute(fid, expid) 1974 self.auth.set_attribute(expid, expid) 1975 # Override fedids can manipulate state as well 1976 for o in self.overrides: 1977 self.auth.set_attribute(o, expid) 1978 self.auth.save() 1979 1980 # Create a logger that logs to the experiment's state object as well as 1981 # to the main log file. 1982 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 1983 alloc_collector = self.list_log(self.state[eid]['log']) 1984 h = logging.StreamHandler(alloc_collector) 1985 # XXX: there should be a global one of these rather than repeating the 1986 # code. 1987 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 1988 '%d %b %y %H:%M:%S')) 1989 alloc_log.addHandler(h) 1841 1990 1842 1991 # Start a thread to do the resource allocation … … 1905 2054 1906 2055 def get_handler(self, path, fid): 2056 """ 2057 Perhaps surprisingly named, this function handles HTTP GET requests to 2058 this server (SOAP requests are POSTs). 2059 """ 1907 2060 self.log.info("Get handler %s %s" % (path, fid)) 1908 2061 if self.auth.check_attribute(fid, path): … … 1910 2063 else: 1911 2064 return (None, None) 1912 1913 def get_vtopo(self, req, fid):1914 """1915 Return the stored virtual topology for this experiment1916 """1917 rv = None1918 state = None1919 1920 req = req.get('VtopoRequestBody', None)1921 if not req:1922 raise service_error(service_error.req,1923 "Bad request format (no VtopoRequestBody)")1924 exp = req.get('experiment', None)1925 if exp:1926 if exp.has_key('fedid'):1927 key = exp['fedid']1928 keytype = "fedid"1929 elif exp.has_key('localname'):1930 key = exp['localname']1931 keytype = "localname"1932 else:1933 raise service_error(service_error.req, "Unknown lookup type")1934 else:1935 raise service_error(service_error.req, "No request?")1936 1937 self.check_experiment_access(fid, key)1938 1939 self.state_lock.acquire()1940 if self.state.has_key(key):1941 if self.state[key].has_key('vtopo'):1942 rv = { 'experiment' : {keytype: key },\1943 'vtopo': self.state[key]['vtopo'],\1944 }1945 else:1946 state = self.state[key]['experimentStatus']1947 self.state_lock.release()1948 1949 if rv: return rv1950 else:1951 if state:1952 raise service_error(service_error.partial,1953 "Not ready: %s" % state)1954 else:1955 raise service_error(service_error.req, "No such experiment")1956 1957 def get_vis(self, req, fid):1958 """1959 Return the stored visualization for this experiment1960 """1961 rv = None1962 state = None1963 1964 req = req.get('VisRequestBody', None)1965 if not req:1966 raise service_error(service_error.req,1967 "Bad request format (no VisRequestBody)")1968 exp = req.get('experiment', None)1969 if exp:1970 if exp.has_key('fedid'):1971 key = exp['fedid']1972 keytype = "fedid"1973 elif exp.has_key('localname'):1974 key = exp['localname']1975 keytype = "localname"1976 else:1977 raise service_error(service_error.req, "Unknown lookup type")1978 else:1979 raise service_error(service_error.req, "No request?")1980 1981 self.check_experiment_access(fid, key)1982 1983 self.state_lock.acquire()1984 if self.state.has_key(key):1985 if self.state[key].has_key('vis'):1986 rv = { 'experiment' : {keytype: key },\1987 'vis': self.state[key]['vis'],\1988 }1989 else:1990 state = self.state[key]['experimentStatus']1991 self.state_lock.release()1992 1993 if rv: return rv1994 else:1995 if state:1996 raise service_error(service_error.partial,1997 "Not ready: %s" % state)1998 else:1999 raise service_error(service_error.req, "No such experiment")2000 2065 2001 2066 def clean_info_response(self, rv): … … 2087 2152 return rv 2088 2153 2089 def remove_dirs(self, dir): 2090 """ 2091 Remove the directory tree and all files rooted at dir. Log any errors, 2092 but continue. 2093 """ 2094 self.log.debug("[removedirs]: removing %s" % dir) 2154 def check_termination_status(self, fed_exp, force): 2155 """ 2156 Confirm that the experiment is sin a valid state to stop (or force it) 2157 return the state - invalid states for deletion and force settings cause 2158 exceptions. 2159 """ 2160 self.state_lock.acquire() 2161 status = fed_exp.get('experimentStatus', None) 2162 2163 if status: 2164 if status in ('starting', 'terminating'): 2165 if not force: 2166 self.state_lock.release() 2167 raise service_error(service_error.partial, 2168 'Experiment still being created or destroyed') 2169 else: 2170 self.log.warning('Experiment in %s state ' % status + \ 2171 'being terminated by force.') 2172 self.state_lock.release() 2173 return status 2174 else: 2175 # No status??? trouble 2176 self.state_lock.release() 2177 raise service_error(service_error.internal, 2178 "Experiment has no status!?") 2179 2180 2181 def get_termination_info(self, fed_exp): 2182 ids = [] 2183 term_params = { } 2184 self.state_lock.acquire() 2185 # experimentID is a list of dicts that are self-describing 2186 # identifiers. This finds all the fedids and localnames - the 2187 # keys of self.state - and puts them into ids, which is used to delete 2188 # the state after everything is swapped out. 2189 for id in fed_exp.get('experimentID', []): 2190 if 'fedid' in id: 2191 ids.append(id['fedid']) 2192 repo = "%s" % id['fedid'] 2193 if 'localname' in id: ids.append(id['localname']) 2194 2195 # Get the experimentAccess - the principal for this experiment. It 2196 # is this principal to which credentials have been delegated, and 2197 # as which the experiment controller must act. 2198 if 'experimentAccess' in fed_exp and \ 2199 'X509' in fed_exp['experimentAccess']: 2200 expcert = fed_exp['experimentAccess']['X509'] 2201 else: 2202 expcert = None 2203 2204 # Collect the allocation/segment ids into a dict keyed by the fedid 2205 # of the allocation (or a monotonically increasing integer) that 2206 # contains a tuple of uri, aid (which is a dict...) 2207 for i, fed in enumerate(fed_exp.get('federant', [])): 2208 try: 2209 uri = fed['uri'] 2210 aid = fed['allocID'] 2211 k = fed['allocID'].get('fedid', i) 2212 except KeyError, e: 2213 continue 2214 term_params[k] = (uri, aid) 2215 # Change the experiment state 2216 fed_exp['experimentStatus'] = 'terminating' 2217 if self.state_filename: self.write_state() 2218 self.state_lock.release() 2219 2220 return ids, term_params, expcert, repo 2221 2222 2223 def deallocate_resources(self, term_params, expcert, status, force, 2224 dealloc_log): 2225 tmpdir = None 2226 # This try block makes sure the tempdir is cleared 2095 2227 try: 2096 for path, dirs, files in os.walk(dir, topdown=False): 2097 for f in files: 2098 os.remove(os.path.join(path, f)) 2099 for d in dirs: 2100 os.rmdir(os.path.join(path, d)) 2101 os.rmdir(dir) 2102 except EnvironmentError, e: 2103 self.log.error("Error deleting directory tree in %s" % e); 2104 2105 @staticmethod 2106 def make_temp_certfile(expcert, tmpdir): 2107 """ 2108 make a protected copy of the access certificate so the experiment 2109 controller can act as the experiment principal. mkstemp is the most 2110 secure way to do that. The directory should be created by 2111 mkdtemp. Return the filename. 2112 """ 2113 if expcert and tmpdir: 2114 try: 2115 certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir) 2116 f = os.fdopen(certf, 'w') 2117 print >> f, expcert 2118 f.close() 2119 except EnvironmentError, e: 2120 raise service_error(service_error.internal, 2121 "Cannot create temp cert file?") 2122 return certfn 2123 else: 2124 return None 2228 # If no expcert, try the deallocation as the experiment 2229 # controller instance. 2230 if expcert and self.auth_type != 'legacy': 2231 try: 2232 tmpdir = tempfile.mkdtemp(prefix="term-") 2233 except EnvironmentError: 2234 raise service_error(service_error.internal, 2235 "Cannot create tmp dir") 2236 cert_file = self.make_temp_certfile(expcert, tmpdir) 2237 pw = None 2238 else: 2239 cert_file = self.cert_file 2240 pw = self.cert_pwd 2241 2242 # Stop everyone. NB, wait_for_all waits until a thread starts 2243 # and then completes, so we can't wait if nothing starts. So, 2244 # no tbparams, no start. 2245 if len(term_params) > 0: 2246 tp = thread_pool(self.nthreads) 2247 for k, (uri, aid) in term_params.items(): 2248 # Create and start a thread to stop the segment 2249 tp.wait_for_slot() 2250 t = pooled_thread(\ 2251 target=self.terminate_segment(log=dealloc_log, 2252 testbed=uri, 2253 cert_file=cert_file, 2254 cert_pwd=pw, 2255 trusted_certs=self.trusted_certs, 2256 caller=self.call_TerminateSegment), 2257 args=(uri, aid), name=k, 2258 pdata=tp, trace_file=self.trace_file) 2259 t.start() 2260 # Wait for completions 2261 tp.wait_for_all_done() 2262 2263 # release the allocations (failed experiments have done this 2264 # already, and starting experiments may be in odd states, so we 2265 # ignore errors releasing those allocations 2266 try: 2267 for k, (uri, aid) in term_params.items(): 2268 self.release_access(None, aid, uri=uri, 2269 cert_file=cert_file, cert_pwd=pw) 2270 except service_error, e: 2271 if status != 'failed' and not force: 2272 raise e 2273 2274 # Clean up the tmpdir no matter what 2275 finally: 2276 if tmpdir: self.remove_dirs(tmpdir) 2277 2125 2278 2126 2279 def terminate_experiment(self, req, fid): … … 2134 2287 raise service_error(service_error.req, 2135 2288 "Bad request format (no TerminateRequestBody)") 2289 2290 key = self.get_experiment_key(req, 'experiment') 2291 self.check_experiment_access(fid, key) 2292 exp = req.get('experiment', False) 2136 2293 force = req.get('force', False) 2137 exp = req.get('experiment', None)2138 if exp:2139 if exp.has_key('fedid'):2140 key = exp['fedid']2141 keytype = "fedid"2142 elif exp.has_key('localname'):2143 key = exp['localname']2144 keytype = "localname"2145 else:2146 raise service_error(service_error.req, "Unknown lookup type")2147 else:2148 raise service_error(service_error.req, "No request?")2149 2150 self.check_experiment_access(fid, key)2151 2294 2152 2295 dealloc_list = [ ] … … 2165 2308 self.state_lock.acquire() 2166 2309 fed_exp = self.state.get(key, None) 2310 self.state_lock.release() 2167 2311 repo = None 2168 2312 2169 2313 if fed_exp: 2170 # This branch of the conditional holds the lock to generate a 2171 # consistent temporary tbparams variable to deallocate experiments. 2172 # It releases the lock to do the deallocations and reacquires it to 2173 # remove the experiment state when the termination is complete. 2174 2175 # First make sure that the experiment creation is complete. 2176 status = fed_exp.get('experimentStatus', None) 2177 2178 if status: 2179 if status in ('starting', 'terminating'): 2180 if not force: 2181 self.state_lock.release() 2182 raise service_error(service_error.partial, 2183 'Experiment still being created or destroyed') 2184 else: 2185 self.log.warning('Experiment in %s state ' % status + \ 2186 'being terminated by force.') 2187 else: 2188 # No status??? trouble 2189 self.state_lock.release() 2190 raise service_error(service_error.internal, 2191 "Experiment has no status!?") 2192 2193 ids = [] 2194 # experimentID is a list of dicts that are self-describing 2195 # identifiers. This finds all the fedids and localnames - the 2196 # keys of self.state - and puts them into ids. 2197 for id in fed_exp.get('experimentID', []): 2198 if id.has_key('fedid'): 2199 ids.append(id['fedid']) 2200 repo = "%s" % id['fedid'] 2201 if id.has_key('localname'): ids.append(id['localname']) 2202 2203 # Get the experimentAccess - the principal for this experiment. It 2204 # is this principal to which credentials have been delegated, and 2205 # as which the experiment controller must act. 2206 if 'experimentAccess' in self.state[key] and \ 2207 'X509' in self.state[key]['experimentAccess']: 2208 expcert = self.state[key]['experimentAccess']['X509'] 2209 else: 2210 expcert = None 2211 # Collect the allocation/segment ids into a dict keyed by the fedid 2212 # of the allocation (or a monotonically increasing integer) that 2213 # contains a tuple of uri, aid (which is a dict...) 2214 for i, fed in enumerate(fed_exp.get('federant', [])): 2215 try: 2216 uri = fed['uri'] 2217 aid = fed['allocID'] 2218 k = fed['allocID'].get('fedid', i) 2219 except KeyError, e: 2220 continue 2221 tbparams[k] = (uri, aid) 2222 fed_exp['experimentStatus'] = 'terminating' 2223 if self.state_filename: self.write_state() 2224 self.state_lock.release() 2225 2226 try: 2227 tmpdir = tempfile.mkdtemp(prefix="split-") 2228 except EnvironmentError: 2229 raise service_error(service_error.internal, 2230 "Cannot create tmp dir") 2231 # This try block makes sure the tempdir is cleared 2232 try: 2233 # If no expcert, try the deallocation as the experiment 2234 # controller instance. 2235 if expcert and self.auth_type != 'legacy': 2236 cert_file = self.make_temp_certfile(expcert, tmpdir) 2237 pw = None 2238 else: 2239 cert_file = self.cert_file 2240 pw = self.cert_pwd 2241 2242 # Stop everyone. NB, wait_for_all waits until a thread starts 2243 # and then completes, so we can't wait if nothing starts. So, 2244 # no tbparams, no start. 2245 if len(tbparams) > 0: 2246 tp = thread_pool(self.nthreads) 2247 for k in tbparams.keys(): 2248 # Create and start a thread to stop the segment 2249 tp.wait_for_slot() 2250 uri, aid = tbparams[k] 2251 t = pooled_thread(\ 2252 target=self.terminate_segment(log=dealloc_log, 2253 testbed=uri, 2254 cert_file=cert_file, 2255 cert_pwd=pw, 2256 trusted_certs=self.trusted_certs, 2257 caller=self.call_TerminateSegment), 2258 args=(uri, aid), name=k, 2259 pdata=tp, trace_file=self.trace_file) 2260 t.start() 2261 # Wait for completions 2262 tp.wait_for_all_done() 2263 2264 # release the allocations (failed experiments have done this 2265 # already, and starting experiments may be in odd states, so we 2266 # ignore errors releasing those allocations 2267 try: 2268 for k in tbparams.keys(): 2269 # This releases access by uri 2270 uri, aid = tbparams[k] 2271 self.release_access(None, aid, uri=uri, 2272 cert_file=cert_file, cert_pwd=pw) 2273 except service_error, e: 2274 if status != 'failed' and not force: 2275 raise e 2276 2277 # Clean up the tmpdir no matter what 2278 finally: 2279 self.remove_dirs(tmpdir) 2314 status = self.check_termination_status(fed_exp, force) 2315 ids, term_params, expcert, repo = self.get_termination_info(fed_exp) 2316 self.deallocate_resources(term_params, expcert, status, force, 2317 dealloc_log) 2280 2318 2281 2319 # Remove the terminated experiment 2282 2320 self.state_lock.acquire() 2283 2321 for id in ids: 2284 if self.state.has_key(id): del self.state[id]2322 if id in self.state: del self.state[id] 2285 2323 2286 2324 if self.state_filename: self.write_state() … … 2305 2343 return { 2306 2344 'experiment': exp , 2307 'deallocationLog': "".join(dealloc_list),2345 'deallocationLog': string.join(dealloc_list, ''), 2308 2346 } 2309 2347 else: 2310 # Don't forget to release the lock2311 self.state_lock.release()2312 2348 raise service_error(service_error.req, "No saved state") 2313 2349 … … 2322 2358 "Bad request format (no GetValueRequestBody)") 2323 2359 2324 name = req ['name']2325 wait = req ['wait']2360 name = req.get('name', None) 2361 wait = req.get('wait', False) 2326 2362 rv = { 'name': name } 2327 2363 2328 if self.auth.check_attribute(fid, name):2364 if name and self.auth.check_attribute(fid, name): 2329 2365 self.log.debug("[GetValue] asking for %s " % name) 2330 2366 try: … … 2351 2387 "Bad request format (no SetValueRequestBody)") 2352 2388 2353 name = req ['name']2354 v = req ['value']2355 2356 if self.auth.check_attribute(fid, name):2389 name = req.get('name', None) 2390 v = req.get('value', '') 2391 2392 if name and self.auth.check_attribute(fid, name): 2357 2393 try: 2358 2394 self.synch_store.set_value(name, v)
Note: See TracChangeset
for help on using the changeset viewer.