Changeset bd3e314 for fedd/federation
- Timestamp:
- Jul 24, 2009 1:22:34 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- a74ea78
- Parents:
- 728001e
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r728001e rbd3e314 212 212 self.thread_with_rv = experiment_control_local.pooled_thread 213 213 self.thread_pool = experiment_control_local.thread_pool 214 self.list_log = experiment_control_local.list_log 214 215 215 216 self.cert_file = config.get("experiment_control", "cert_file") … … 1645 1646 return True 1646 1647 1648 def allocate_resources(self, allocated, master, eid, expid, expcert, 1649 tbparams, tmpdir, alloc_log=None): 1650 started = { } # Testbeds where a sub-experiment started 1651 # successfully 1652 1653 # XXX 1654 fail_soft = False 1655 1656 log = alloc_log or self.log 1657 1658 thread_pool = self.thread_pool(self.nthreads) 1659 threads = [ ] 1660 1661 for tb in [ k for k in allocated.keys() if k != master]: 1662 # Create and start a thread to start the segment, and save it to 1663 # get the return value later 1664 thread_pool.wait_for_slot() 1665 t = self.pooled_thread(\ 1666 target=self.start_segment(log=log, 1667 keyfile=self.ssh_privkey_file, debug=self.debug), 1668 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1669 pdata=thread_pool, trace_file=self.trace_file) 1670 threads.append(t) 1671 t.start() 1672 1673 # Wait until all finish 1674 thread_pool.wait_for_all_done() 1675 1676 # If none failed, start the master 1677 failed = [ t.getName() for t in threads if not t.rv ] 1678 1679 if len(failed) == 0: 1680 starter = self.start_segment(log=log, 1681 keyfile=self.ssh_privkey_file, debug=self.debug) 1682 if not starter(master, eid, tbparams, tmpdir): 1683 failed.append(master) 1684 1685 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1686 # If one failed clean up, unless fail_soft is set 1687 if failed: 1688 if not fail_soft: 1689 thread_pool.clear() 1690 for tb in succeeded: 1691 # Create and start a thread to stop the segment 1692 thread_pool.wait_for_slot() 1693 t = self.pooled_thread(\ 1694 target=self.stop_segment(log=log, 1695 keyfile=self.ssh_privkey_file, 1696 debug=self.debug), 1697 args=(tb, eid, tbparams), name=tb, 1698 pdata=thread_pool, trace_file=self.trace_file) 1699 t.start() 1700 # Wait until all finish 1701 thread_pool.wait_for_all_done() 1702 1703 # release the allocations 1704 for tb in tbparams.keys(): 1705 self.release_access(tb, tbparams[tb]['allocID']) 1706 # Remove the placeholder 1707 self.state_lock.acquire() 1708 self.state[eid]['experimentStatus'] = 'failed' 1709 if self.state_filename: self.write_state() 1710 self.state_lock.release() 1711 1712 #raise service_error(service_error.federant, 1713 # "Swap in failed on %s" % ",".join(failed)) 1714 log.error("Swap in failed on %s" % ",".join(failed)) 1715 return 1716 else: 1717 log.info("[start_segment]: Experiment %s active" % eid) 1718 1719 log.debug("[start_experiment]: removing %s" % tmpdir) 1720 1721 # Walk up tmpdir, deleting as we go 1722 for path, dirs, files in os.walk(tmpdir, topdown=False): 1723 for f in files: 1724 os.remove(os.path.join(path, f)) 1725 for d in dirs: 1726 os.rmdir(os.path.join(path, d)) 1727 os.rmdir(tmpdir) 1728 1729 # Insert the experiment into our state and update the disk copy 1730 self.state_lock.acquire() 1731 self.state[expid]['experimentStatus'] = 'active' 1732 self.state[eid] = self.state[expid] 1733 if self.state_filename: self.write_state() 1734 self.state_lock.release() 1735 return 1736 1647 1737 def create_experiment(self, req, fid): 1648 1738 """ … … 1679 1769 pid = "dummy" 1680 1770 gid = "dummy" 1681 # XXX1682 fail_soft = False1683 1684 1771 try: 1685 1772 os.mkdir(tmpdir+"/keys") … … 1710 1797 raise service_error(service_error.req, "No experiment description") 1711 1798 1799 # Generate an ID for the experiment (slice) and a certificate that the 1800 # allocator can use to prove they own it. We'll ship it back through 1801 # the encrypted connection. 1802 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1803 1712 1804 if req.has_key('experimentID') and \ 1713 1805 req['experimentID'].has_key('localname'): … … 1716 1808 while (self.state.has_key(eid)): 1717 1809 eid += random.choice(string.ascii_letters) 1718 # To avoid another thread picking this localname 1719 self.state[eid] = "placeholder" 1810 # Initial state 1811 self.state[eid] = { 1812 'experimentID' : \ 1813 [ { 'localname' : eid }, {'fedid': expid } ], 1814 'experimentStatus': 'starting', 1815 'experimentAccess': { 'X509' : expcert }, 1816 'owner': fid, 1817 'log' : [], 1818 } 1819 self.state[expid] = self.state[eid] 1820 if self.state_filename: self.write_state() 1720 1821 self.state_lock.release() 1721 1822 else: … … 1728 1829 for i in range(0,5): 1729 1830 eid += random.choice(string.ascii_letters) 1730 # To avoid another thread picking this localname 1731 self.state[eid] = "placeholder" 1831 # Initial state 1832 self.state[eid] = { 1833 'experimentID' : \ 1834 [ { 'localname' : eid }, {'fedid': expid } ], 1835 'experimentStatus': 'starting', 1836 'experimentAccess': { 'X509' : expcert }, 1837 'owner': fid, 1838 'log' : [], 1839 } 1840 self.state[expid] = self.state[eid] 1841 if self.state_filename: self.write_state() 1732 1842 self.state_lock.release() 1733 1843 … … 1774 1884 1775 1885 allocated = { } # Testbeds we can access 1776 started = { } # Testbeds where a sub-experiment started1777 # successfully1778 1779 1886 # Objects to parse the splitter output (defined above) 1780 1887 parse_current_testbed = self.current_testbed(eid, tmpdir, … … 1824 1931 raise service_error(service_error.internal, 1825 1932 "Failed to generate visualization") 1933 1826 1934 1827 1935 # save federant information … … 1834 1942 } 1835 1943 1944 self.state_lock.acquire() 1945 self.state[eid]['vtopo'] = vtopo 1946 self.state[eid]['vis'] = vis 1947 self.state[expid]['federant'] = \ 1948 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 1949 if tbparams[tb].has_key('federant') ] 1950 if self.state_filename: self.write_state() 1951 self.state_lock.release() 1836 1952 1837 1953 # Copy tarfiles and rpms needed at remote sites into a staging area … … 1874 1990 # If something goes wrong in the parse (usually an access error) 1875 1991 # clear the placeholder state. From here on out the code delays 1876 # exceptions. 1992 # exceptions. Failing at this point returns a fault to the remote 1993 # caller. 1877 1994 self.state_lock.acquire() 1878 1995 del self.state[eid] 1996 del self.state[expid] 1997 if self.state_filename: self.write_state() 1879 1998 self.state_lock.release() 1880 1999 raise e 1881 2000 1882 thread_pool = self.thread_pool(self.nthreads) 1883 threads = [ ] 1884 1885 for tb in [ k for k in allocated.keys() if k != master]: 1886 # Create and start a thread to start the segment, and save it to 1887 # get the return value later 1888 thread_pool.wait_for_slot() 1889 t = self.pooled_thread(\ 1890 target=self.start_segment(log=self.log, 1891 keyfile=self.ssh_privkey_file, debug=self.debug), 1892 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1893 pdata=thread_pool, trace_file=self.trace_file) 1894 threads.append(t) 1895 t.start() 1896 1897 # Wait until all finish 1898 thread_pool.wait_for_all_done() 1899 1900 # If none failed, start the master 1901 failed = [ t.getName() for t in threads if not t.rv ] 1902 1903 if len(failed) == 0: 1904 starter = self.start_segment(log=self.log, 1905 keyfile=self.ssh_privkey_file, debug=self.debug) 1906 if not starter(master, eid, tbparams, tmpdir): 1907 failed.append(master) 1908 1909 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1910 # If one failed clean up, unless fail_soft is set 1911 if failed: 1912 if not fail_soft: 1913 thread_pool.clear() 1914 for tb in succeeded: 1915 # Create and start a thread to stop the segment 1916 thread_pool.wait_for_slot() 1917 t = self.pooled_thread(\ 1918 target=self.stop_segment(log=self.log, 1919 keyfile=self.ssh_privkey_file, 1920 debug=self.debug), 1921 args=(tb, eid, tbparams), name=tb, 1922 pdata=thread_pool, trace_file=self.trace_file) 1923 t.start() 1924 # Wait until all finish 1925 thread_pool.wait_for_all_done() 1926 1927 # release the allocations 1928 for tb in tbparams.keys(): 1929 self.release_access(tb, tbparams[tb]['allocID']) 1930 # Remove the placeholder 1931 self.state_lock.acquire() 1932 del self.state[eid] 1933 self.state_lock.release() 1934 1935 raise service_error(service_error.federant, 1936 "Swap in failed on %s" % ",".join(failed)) 1937 else: 1938 self.log.info("[start_segment]: Experiment %s started" % eid) 1939 1940 # Generate an ID for the experiment (slice) and a certificate that the 1941 # allocator can use to prove they own it. We'll ship it back through 1942 # the encrypted connection. 1943 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1944 1945 self.log.debug("[start_experiment]: removing %s" % tmpdir) 1946 1947 # Walk up tmpdir, deleting as we go 1948 for path, dirs, files in os.walk(tmpdir, topdown=False): 1949 for f in files: 1950 os.remove(os.path.join(path, f)) 1951 for d in dirs: 1952 os.rmdir(os.path.join(path, d)) 1953 os.rmdir(tmpdir) 1954 1955 # The deepcopy prevents the allocation ID and other binaries from being 1956 # translated into other formats 1957 resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \ 1958 for tb in tbparams.keys() \ 1959 if tbparams[tb].has_key('federant') ],\ 1960 'vtopo': vtopo,\ 1961 'vis' : vis, 1962 'experimentID' : [\ 1963 { 'fedid': copy.copy(expid) }, \ 1964 { 'localname': eid },\ 1965 ],\ 1966 'experimentAccess': { 'X509' : expcert },\ 1967 } 1968 # remove the allocationID info from each federant 1969 for f in resp['federant']: 1970 if f.has_key('allocID'): del f['allocID'] 1971 1972 # Insert the experiment into our state and update the disk copy 1973 self.state_lock.acquire() 1974 self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \ 1975 for tb in tbparams.keys() \ 1976 if tbparams[tb].has_key('federant') ],\ 1977 'vtopo': vtopo,\ 1978 'vis' : vis, 1979 'owner': fid, 1980 'experimentID' : [\ 1981 { 'fedid': expid }, { 'localname': eid },\ 1982 ],\ 1983 } 1984 self.state[eid] = self.state[expid] 1985 if self.state_filename: self.write_state() 1986 self.state_lock.release() 1987 2001 2002 # Start the background swapper and return the starting state. From 2003 # here on out, the state will stick around a while. 2004 2005 # Let users touch the state 1988 2006 self.auth.set_attribute(fid, expid) 1989 2007 self.auth.set_attribute(expid, expid) 1990 2008 1991 if not failed: 1992 return resp 1993 else: 1994 raise service_error(service_error.partial, \ 1995 "Partial swap in on %s" % ",".join(succeeded)) 2009 # Create a logger that logs to the experiment's state object as well as 2010 # to the main log file. 2011 2012 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2013 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 2014 # XXX: there should be a global one of these rather than repeating the 2015 # code. 2016 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2017 '%d %b %y %H:%M:%S')) 2018 alloc_log.addHandler(h) 2019 2020 2021 2022 2023 2024 # Start a thread to do the resource allocation 2025 t = Thread(target=self.allocate_resources, 2026 args=(allocated, master, eid, expid, expcert, tbparams, 2027 tmpdir, alloc_log), 2028 name=eid) 2029 t.start() 2030 2031 rv = { 2032 'experimentID': [ 2033 {'localname' : eid }, { 'fedid': copy.copy(expid) } 2034 ], 2035 'experimentStatus': 'started', 2036 'experimentAccess': { 'X509' : expcert } 2037 } 2038 2039 return rv 1996 2040 1997 2041 def check_experiment_access(self, fid, key): … … 2049 2093 """ 2050 2094 rv = None 2095 state = None 2051 2096 2052 2097 req = req.get('VtopoRequestBody', None) … … 2071 2116 self.state_lock.acquire() 2072 2117 if self.state.has_key(key): 2073 rv = { 'experiment' : {keytype: key },\ 2074 'vtopo': self.state[key]['vtopo'],\ 2075 } 2118 if self.state[key].has_key('vtopo'): 2119 rv = { 'experiment' : {keytype: key },\ 2120 'vtopo': self.state[key]['vtopo'],\ 2121 } 2122 else: 2123 state = self.state[key]['experimentStatus'] 2076 2124 self.state_lock.release() 2077 2125 2078 2126 if rv: return rv 2079 else: raise service_error(service_error.req, "No such experiment") 2127 else: 2128 if state: 2129 raise service_error(service_error.partial, 2130 "Not ready: %s" % state) 2131 else: 2132 raise service_error(service_error.req, "No such experiment") 2080 2133 2081 2134 def get_vis(self, req, fid): … … 2084 2137 """ 2085 2138 rv = None 2139 state = None 2086 2140 2087 2141 req = req.get('VisRequestBody', None) … … 2106 2160 self.state_lock.acquire() 2107 2161 if self.state.has_key(key): 2108 rv = { 'experiment' : {keytype: key },\ 2109 'vis': self.state[key]['vis'],\ 2110 } 2162 if self.state[key].has_key('vis'): 2163 rv = { 'experiment' : {keytype: key },\ 2164 'vis': self.state[key]['vis'],\ 2165 } 2166 else: 2167 state = self.state[key]['experimentStatus'] 2111 2168 self.state_lock.release() 2112 2169 2113 2170 if rv: return rv 2114 else: raise service_error(service_error.req, "No such experiment") 2171 else: 2172 if state: 2173 raise service_error(service_error.partial, 2174 "Not ready: %s" % state) 2175 else: 2176 raise service_error(service_error.req, "No such experiment") 2115 2177 2116 2178 def get_info(self, req, fid): … … 2146 2208 rv = copy.deepcopy(self.state[key]) 2147 2209 self.state_lock.release() 2148 # Remove the owner info 2149 del rv['owner'] 2150 # remove the allocationID info from each federant 2151 for f in rv['federant']: 2152 if f.has_key('allocID'): del f['allocID'] 2153 2154 if rv: return rv 2155 else: raise service_error(service_error.req, "No such experiment") 2210 2211 if rv: 2212 # Remove the owner info (should always be there, but...) 2213 if rv.has_key('owner'): del rv['owner'] 2214 2215 # Convert the log into the allocationLog parameter and remove the 2216 # log entry (with defensive programming) 2217 if rv.has_key('log'): 2218 rv['allocationLog'] = "".join(rv['log']) 2219 del rv['log'] 2220 2221 if rv['experimentStatus'] != 'active': 2222 if rv.has_key('federant'): del rv['federant'] 2223 else: 2224 # remove the allocationID info from each federant 2225 for f in rv.get('federant', []): 2226 if f.has_key('allocID'): del f['allocID'] 2227 2228 return rv 2229 else: 2230 raise service_error(service_error.req, "No such experiment") 2156 2231 2157 2232 … … 2189 2264 # It releases the lock to do the deallocations and reacquires it to 2190 2265 # remove the experiment state when the termination is complete. 2266 2267 # First make sure that the experiment creation is complete. 2268 if fed_exp.has_key('experimentStatus'): 2269 if fed_exp['experimentStatus'] == 'started': 2270 self.state_lock.release() 2271 raise service_error(service_error.partial, 2272 'Experiment still being created') 2273 else: 2274 # No status??? trouble 2275 self.state_lock.release() 2276 raise service_error(service_error.internal, 2277 "Experiment has no status!?") 2278 2191 2279 ids = [] 2192 2280 # experimentID is a list of dicts that are self-describing … … 2199 2287 # Construct enough of the tbparams to make the stop_segment calls 2200 2288 # work 2201 for fed in fed_exp ['federant']:2289 for fed in fed_exp.get('federant', []): 2202 2290 try: 2203 2291 for e in fed['name']: … … 2233 2321 # Create and start a thread to stop the segment 2234 2322 thread_pool.wait_for_slot() 2235 t = self.pooled_thread(target=self.stop_segment, 2323 t = self.pooled_thread(\ 2324 target=self.stop_segment(log=self.log, 2325 keyfile=self.ssh_privkey_file, debug=self.debug), 2236 2326 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2237 2327 pdata=thread_pool, trace_file=self.trace_file)
Note: See TracChangeset
for help on using the changeset viewer.