Changeset bd3e314
- Timestamp:
- Jul 24, 2009 1:22:34 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- a74ea78
- Parents:
- 728001e
- Location:
- fedd
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/experiment_control.py
r728001e rbd3e314 212 212 self.thread_with_rv = experiment_control_local.pooled_thread 213 213 self.thread_pool = experiment_control_local.thread_pool 214 self.list_log = experiment_control_local.list_log 214 215 215 216 self.cert_file = config.get("experiment_control", "cert_file") … … 1645 1646 return True 1646 1647 1648 def allocate_resources(self, allocated, master, eid, expid, expcert, 1649 tbparams, tmpdir, alloc_log=None): 1650 started = { } # Testbeds where a sub-experiment started 1651 # successfully 1652 1653 # XXX 1654 fail_soft = False 1655 1656 log = alloc_log or self.log 1657 1658 thread_pool = self.thread_pool(self.nthreads) 1659 threads = [ ] 1660 1661 for tb in [ k for k in allocated.keys() if k != master]: 1662 # Create and start a thread to start the segment, and save it to 1663 # get the return value later 1664 thread_pool.wait_for_slot() 1665 t = self.pooled_thread(\ 1666 target=self.start_segment(log=log, 1667 keyfile=self.ssh_privkey_file, debug=self.debug), 1668 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1669 pdata=thread_pool, trace_file=self.trace_file) 1670 threads.append(t) 1671 t.start() 1672 1673 # Wait until all finish 1674 thread_pool.wait_for_all_done() 1675 1676 # If none failed, start the master 1677 failed = [ t.getName() for t in threads if not t.rv ] 1678 1679 if len(failed) == 0: 1680 starter = self.start_segment(log=log, 1681 keyfile=self.ssh_privkey_file, debug=self.debug) 1682 if not starter(master, eid, tbparams, tmpdir): 1683 failed.append(master) 1684 1685 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1686 # If one failed clean up, unless fail_soft is set 1687 if failed: 1688 if not fail_soft: 1689 thread_pool.clear() 1690 for tb in succeeded: 1691 # Create and start a thread to stop the segment 1692 thread_pool.wait_for_slot() 1693 t = self.pooled_thread(\ 1694 target=self.stop_segment(log=log, 1695 keyfile=self.ssh_privkey_file, 1696 debug=self.debug), 1697 args=(tb, eid, tbparams), name=tb, 1698 pdata=thread_pool, trace_file=self.trace_file) 1699 t.start() 1700 # Wait until all finish 1701 thread_pool.wait_for_all_done() 1702 1703 # release the allocations 1704 for tb in tbparams.keys(): 1705 self.release_access(tb, tbparams[tb]['allocID']) 1706 # Remove the placeholder 1707 self.state_lock.acquire() 1708 self.state[eid]['experimentStatus'] = 'failed' 1709 if self.state_filename: self.write_state() 1710 self.state_lock.release() 1711 1712 #raise service_error(service_error.federant, 1713 # "Swap in failed on %s" % ",".join(failed)) 1714 log.error("Swap in failed on %s" % ",".join(failed)) 1715 return 1716 else: 1717 log.info("[start_segment]: Experiment %s active" % eid) 1718 1719 log.debug("[start_experiment]: removing %s" % tmpdir) 1720 1721 # Walk up tmpdir, deleting as we go 1722 for path, dirs, files in os.walk(tmpdir, topdown=False): 1723 for f in files: 1724 os.remove(os.path.join(path, f)) 1725 for d in dirs: 1726 os.rmdir(os.path.join(path, d)) 1727 os.rmdir(tmpdir) 1728 1729 # Insert the experiment into our state and update the disk copy 1730 self.state_lock.acquire() 1731 self.state[expid]['experimentStatus'] = 'active' 1732 self.state[eid] = self.state[expid] 1733 if self.state_filename: self.write_state() 1734 self.state_lock.release() 1735 return 1736 1647 1737 def create_experiment(self, req, fid): 1648 1738 """ … … 1679 1769 pid = "dummy" 1680 1770 gid = "dummy" 1681 # XXX1682 fail_soft = False1683 1684 1771 try: 1685 1772 os.mkdir(tmpdir+"/keys") … … 1710 1797 raise service_error(service_error.req, "No experiment description") 1711 1798 1799 # Generate an ID for the experiment (slice) and a certificate that the 1800 # allocator can use to prove they own it. We'll ship it back through 1801 # the encrypted connection. 1802 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1803 1712 1804 if req.has_key('experimentID') and \ 1713 1805 req['experimentID'].has_key('localname'): … … 1716 1808 while (self.state.has_key(eid)): 1717 1809 eid += random.choice(string.ascii_letters) 1718 # To avoid another thread picking this localname 1719 self.state[eid] = "placeholder" 1810 # Initial state 1811 self.state[eid] = { 1812 'experimentID' : \ 1813 [ { 'localname' : eid }, {'fedid': expid } ], 1814 'experimentStatus': 'starting', 1815 'experimentAccess': { 'X509' : expcert }, 1816 'owner': fid, 1817 'log' : [], 1818 } 1819 self.state[expid] = self.state[eid] 1820 if self.state_filename: self.write_state() 1720 1821 self.state_lock.release() 1721 1822 else: … … 1728 1829 for i in range(0,5): 1729 1830 eid += random.choice(string.ascii_letters) 1730 # To avoid another thread picking this localname 1731 self.state[eid] = "placeholder" 1831 # Initial state 1832 self.state[eid] = { 1833 'experimentID' : \ 1834 [ { 'localname' : eid }, {'fedid': expid } ], 1835 'experimentStatus': 'starting', 1836 'experimentAccess': { 'X509' : expcert }, 1837 'owner': fid, 1838 'log' : [], 1839 } 1840 self.state[expid] = self.state[eid] 1841 if self.state_filename: self.write_state() 1732 1842 self.state_lock.release() 1733 1843 … … 1774 1884 1775 1885 allocated = { } # Testbeds we can access 1776 started = { } # Testbeds where a sub-experiment started1777 # successfully1778 1779 1886 # Objects to parse the splitter output (defined above) 1780 1887 parse_current_testbed = self.current_testbed(eid, tmpdir, … … 1824 1931 raise service_error(service_error.internal, 1825 1932 "Failed to generate visualization") 1933 1826 1934 1827 1935 # save federant information … … 1834 1942 } 1835 1943 1944 self.state_lock.acquire() 1945 self.state[eid]['vtopo'] = vtopo 1946 self.state[eid]['vis'] = vis 1947 self.state[expid]['federant'] = \ 1948 [ tbparams[tb]['federant'] for tb in tbparams.keys() \ 1949 if tbparams[tb].has_key('federant') ] 1950 if self.state_filename: self.write_state() 1951 self.state_lock.release() 1836 1952 1837 1953 # Copy tarfiles and rpms needed at remote sites into a staging area … … 1874 1990 # If something goes wrong in the parse (usually an access error) 1875 1991 # clear the placeholder state. From here on out the code delays 1876 # exceptions. 1992 # exceptions. Failing at this point returns a fault to the remote 1993 # caller. 1877 1994 self.state_lock.acquire() 1878 1995 del self.state[eid] 1996 del self.state[expid] 1997 if self.state_filename: self.write_state() 1879 1998 self.state_lock.release() 1880 1999 raise e 1881 2000 1882 thread_pool = self.thread_pool(self.nthreads) 1883 threads = [ ] 1884 1885 for tb in [ k for k in allocated.keys() if k != master]: 1886 # Create and start a thread to start the segment, and save it to 1887 # get the return value later 1888 thread_pool.wait_for_slot() 1889 t = self.pooled_thread(\ 1890 target=self.start_segment(log=self.log, 1891 keyfile=self.ssh_privkey_file, debug=self.debug), 1892 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1893 pdata=thread_pool, trace_file=self.trace_file) 1894 threads.append(t) 1895 t.start() 1896 1897 # Wait until all finish 1898 thread_pool.wait_for_all_done() 1899 1900 # If none failed, start the master 1901 failed = [ t.getName() for t in threads if not t.rv ] 1902 1903 if len(failed) == 0: 1904 starter = self.start_segment(log=self.log, 1905 keyfile=self.ssh_privkey_file, debug=self.debug) 1906 if not starter(master, eid, tbparams, tmpdir): 1907 failed.append(master) 1908 1909 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1910 # If one failed clean up, unless fail_soft is set 1911 if failed: 1912 if not fail_soft: 1913 thread_pool.clear() 1914 for tb in succeeded: 1915 # Create and start a thread to stop the segment 1916 thread_pool.wait_for_slot() 1917 t = self.pooled_thread(\ 1918 target=self.stop_segment(log=self.log, 1919 keyfile=self.ssh_privkey_file, 1920 debug=self.debug), 1921 args=(tb, eid, tbparams), name=tb, 1922 pdata=thread_pool, trace_file=self.trace_file) 1923 t.start() 1924 # Wait until all finish 1925 thread_pool.wait_for_all_done() 1926 1927 # release the allocations 1928 for tb in tbparams.keys(): 1929 self.release_access(tb, tbparams[tb]['allocID']) 1930 # Remove the placeholder 1931 self.state_lock.acquire() 1932 del self.state[eid] 1933 self.state_lock.release() 1934 1935 raise service_error(service_error.federant, 1936 "Swap in failed on %s" % ",".join(failed)) 1937 else: 1938 self.log.info("[start_segment]: Experiment %s started" % eid) 1939 1940 # Generate an ID for the experiment (slice) and a certificate that the 1941 # allocator can use to prove they own it. We'll ship it back through 1942 # the encrypted connection. 1943 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1944 1945 self.log.debug("[start_experiment]: removing %s" % tmpdir) 1946 1947 # Walk up tmpdir, deleting as we go 1948 for path, dirs, files in os.walk(tmpdir, topdown=False): 1949 for f in files: 1950 os.remove(os.path.join(path, f)) 1951 for d in dirs: 1952 os.rmdir(os.path.join(path, d)) 1953 os.rmdir(tmpdir) 1954 1955 # The deepcopy prevents the allocation ID and other binaries from being 1956 # translated into other formats 1957 resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \ 1958 for tb in tbparams.keys() \ 1959 if tbparams[tb].has_key('federant') ],\ 1960 'vtopo': vtopo,\ 1961 'vis' : vis, 1962 'experimentID' : [\ 1963 { 'fedid': copy.copy(expid) }, \ 1964 { 'localname': eid },\ 1965 ],\ 1966 'experimentAccess': { 'X509' : expcert },\ 1967 } 1968 # remove the allocationID info from each federant 1969 for f in resp['federant']: 1970 if f.has_key('allocID'): del f['allocID'] 1971 1972 # Insert the experiment into our state and update the disk copy 1973 self.state_lock.acquire() 1974 self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \ 1975 for tb in tbparams.keys() \ 1976 if tbparams[tb].has_key('federant') ],\ 1977 'vtopo': vtopo,\ 1978 'vis' : vis, 1979 'owner': fid, 1980 'experimentID' : [\ 1981 { 'fedid': expid }, { 'localname': eid },\ 1982 ],\ 1983 } 1984 self.state[eid] = self.state[expid] 1985 if self.state_filename: self.write_state() 1986 self.state_lock.release() 1987 2001 2002 # Start the background swapper and return the starting state. From 2003 # here on out, the state will stick around a while. 2004 2005 # Let users touch the state 1988 2006 self.auth.set_attribute(fid, expid) 1989 2007 self.auth.set_attribute(expid, expid) 1990 2008 1991 if not failed: 1992 return resp 1993 else: 1994 raise service_error(service_error.partial, \ 1995 "Partial swap in on %s" % ",".join(succeeded)) 2009 # Create a logger that logs to the experiment's state object as well as 2010 # to the main log file. 2011 2012 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 2013 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 2014 # XXX: there should be a global one of these rather than repeating the 2015 # code. 2016 h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s", 2017 '%d %b %y %H:%M:%S')) 2018 alloc_log.addHandler(h) 2019 2020 2021 2022 2023 2024 # Start a thread to do the resource allocation 2025 t = Thread(target=self.allocate_resources, 2026 args=(allocated, master, eid, expid, expcert, tbparams, 2027 tmpdir, alloc_log), 2028 name=eid) 2029 t.start() 2030 2031 rv = { 2032 'experimentID': [ 2033 {'localname' : eid }, { 'fedid': copy.copy(expid) } 2034 ], 2035 'experimentStatus': 'started', 2036 'experimentAccess': { 'X509' : expcert } 2037 } 2038 2039 return rv 1996 2040 1997 2041 def check_experiment_access(self, fid, key): … … 2049 2093 """ 2050 2094 rv = None 2095 state = None 2051 2096 2052 2097 req = req.get('VtopoRequestBody', None) … … 2071 2116 self.state_lock.acquire() 2072 2117 if self.state.has_key(key): 2073 rv = { 'experiment' : {keytype: key },\ 2074 'vtopo': self.state[key]['vtopo'],\ 2075 } 2118 if self.state[key].has_key('vtopo'): 2119 rv = { 'experiment' : {keytype: key },\ 2120 'vtopo': self.state[key]['vtopo'],\ 2121 } 2122 else: 2123 state = self.state[key]['experimentStatus'] 2076 2124 self.state_lock.release() 2077 2125 2078 2126 if rv: return rv 2079 else: raise service_error(service_error.req, "No such experiment") 2127 else: 2128 if state: 2129 raise service_error(service_error.partial, 2130 "Not ready: %s" % state) 2131 else: 2132 raise service_error(service_error.req, "No such experiment") 2080 2133 2081 2134 def get_vis(self, req, fid): … … 2084 2137 """ 2085 2138 rv = None 2139 state = None 2086 2140 2087 2141 req = req.get('VisRequestBody', None) … … 2106 2160 self.state_lock.acquire() 2107 2161 if self.state.has_key(key): 2108 rv = { 'experiment' : {keytype: key },\ 2109 'vis': self.state[key]['vis'],\ 2110 } 2162 if self.state[key].has_key('vis'): 2163 rv = { 'experiment' : {keytype: key },\ 2164 'vis': self.state[key]['vis'],\ 2165 } 2166 else: 2167 state = self.state[key]['experimentStatus'] 2111 2168 self.state_lock.release() 2112 2169 2113 2170 if rv: return rv 2114 else: raise service_error(service_error.req, "No such experiment") 2171 else: 2172 if state: 2173 raise service_error(service_error.partial, 2174 "Not ready: %s" % state) 2175 else: 2176 raise service_error(service_error.req, "No such experiment") 2115 2177 2116 2178 def get_info(self, req, fid): … … 2146 2208 rv = copy.deepcopy(self.state[key]) 2147 2209 self.state_lock.release() 2148 # Remove the owner info 2149 del rv['owner'] 2150 # remove the allocationID info from each federant 2151 for f in rv['federant']: 2152 if f.has_key('allocID'): del f['allocID'] 2153 2154 if rv: return rv 2155 else: raise service_error(service_error.req, "No such experiment") 2210 2211 if rv: 2212 # Remove the owner info (should always be there, but...) 2213 if rv.has_key('owner'): del rv['owner'] 2214 2215 # Convert the log into the allocationLog parameter and remove the 2216 # log entry (with defensive programming) 2217 if rv.has_key('log'): 2218 rv['allocationLog'] = "".join(rv['log']) 2219 del rv['log'] 2220 2221 if rv['experimentStatus'] != 'active': 2222 if rv.has_key('federant'): del rv['federant'] 2223 else: 2224 # remove the allocationID info from each federant 2225 for f in rv.get('federant', []): 2226 if f.has_key('allocID'): del f['allocID'] 2227 2228 return rv 2229 else: 2230 raise service_error(service_error.req, "No such experiment") 2156 2231 2157 2232 … … 2189 2264 # It releases the lock to do the deallocations and reacquires it to 2190 2265 # remove the experiment state when the termination is complete. 2266 2267 # First make sure that the experiment creation is complete. 2268 if fed_exp.has_key('experimentStatus'): 2269 if fed_exp['experimentStatus'] == 'started': 2270 self.state_lock.release() 2271 raise service_error(service_error.partial, 2272 'Experiment still being created') 2273 else: 2274 # No status??? trouble 2275 self.state_lock.release() 2276 raise service_error(service_error.internal, 2277 "Experiment has no status!?") 2278 2191 2279 ids = [] 2192 2280 # experimentID is a list of dicts that are self-describing … … 2199 2287 # Construct enough of the tbparams to make the stop_segment calls 2200 2288 # work 2201 for fed in fed_exp ['federant']:2289 for fed in fed_exp.get('federant', []): 2202 2290 try: 2203 2291 for e in fed['name']: … … 2233 2321 # Create and start a thread to stop the segment 2234 2322 thread_pool.wait_for_slot() 2235 t = self.pooled_thread(target=self.stop_segment, 2323 t = self.pooled_thread(\ 2324 target=self.stop_segment(log=self.log, 2325 keyfile=self.ssh_privkey_file, debug=self.debug), 2236 2326 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2237 2327 pdata=thread_pool, trace_file=self.trace_file) -
fedd/wsdl/fedd_types.xsd
r728001e rbd3e314 52 52 <xsd:enumeration value="max"/> 53 53 <xsd:enumeration value="average"/> 54 </xsd:restriction> 55 </xsd:simpleType> 56 57 <xsd:simpleType name="statusType"> 58 <xsd:annotation> 59 <xsd:documentation> 60 The current state of the experiment. 61 </xsd:documentation> 62 </xsd:annotation> 63 <xsd:restriction base="xsd:string"> 64 <xsd:enumeration value="active"/> 65 <xsd:enumeration value="starting"/> 66 <xsd:enumeration value="terminating"/> 67 <xsd:enumeration value="failed"/> 54 68 </xsd:restriction> 55 69 </xsd:simpleType> … … 440 454 <xsd:annotation> 441 455 <xsd:documentation> 442 The reply to a successful creation request. Includes the 443 information about federants hosting sub-experiments for service 444 access as well as virtual topology and visualization 445 information. All that information is relative to the requester. 446 ExperimentAccess includes credentials with which one can access 447 the experiment. These may include a public key necessary to 448 prove possession of the credential and should be treated with 449 care. 450 </xsd:documentation> 451 </xsd:annotation> 452 <xsd:sequence> 453 <xsd:element name="federant" type="tns:federatedExperimentType" 454 minOccurs="1" maxOccurs="unbounded"/> 455 <xsd:element name="vtopo" type="tns:vtopoType" minOccurs="0" 456 maxOccurs="1"/> 457 <xsd:element name="vis" type="tns:visType" minOccurs="0" 458 maxOccurs="1"/> 456 Returned to let the caller know that the request is underway and what 457 credentials will eventauly be able to be used to access them. 458 </xsd:documentation> 459 </xsd:annotation> 460 <xsd:sequence> 459 461 <xsd:element name="experimentID" type="tns:IDType" minOccurs="1" 460 462 maxOccurs="unbounded"/> 463 <xsd:element name="experimentStatus" type="tns:statusType"/> 461 464 <xsd:element name="experimentAccess" type="tns:accessType" minOccurs="0" 462 465 maxOccurs="1"/> … … 523 526 A combined topology, visualalization, and federant request. 524 527 Different information may be returned based on the user's rights 525 to see the topology. </xsd:documentation> 528 to see the topology. 529 </xsd:documentation> 526 530 </xsd:annotation> 527 531 <xsd:sequence> … … 533 537 <xsd:annotation> 534 538 <xsd:documentation> 535 Information on an instantiated experiment. A createResponse 536 without the secret information. Different information may be 537 returned based on the user's rights to see the topology. 539 Information on an instantiated experiment. Different information may 540 be returned based on the user's rights to see the topology. Includes 541 the information about federants hosting sub-experiments for service 542 access as well as virtual topology and visualization information. All 543 that information is relative to the requester. ExperimentAccess 544 includes credentials with which one can access the experiment. These 545 may include a public key necessary to prove possession of the 546 credential and should be treated with care. 538 547 </xsd:documentation> 539 548 </xsd:annotation> 540 549 <xsd:sequence> 541 550 <xsd:element name="federant" type="tns:federatedExperimentType" 542 minOccurs=" 1" maxOccurs="unbounded"/>551 minOccurs="0" maxOccurs="unbounded"/> 543 552 <xsd:element name="vtopo" type="tns:vtopoType" minOccurs="0" 544 553 maxOccurs="1"/> … … 547 556 <xsd:element name="experimentID" type="tns:IDType" minOccurs="1" 548 557 maxOccurs="unbounded"/> 558 <xsd:element name="allocationLog" type="xsd:string" minOccurs="0" 559 maxOccurs="1"/> 560 <xsd:element name="experimentStatus" type="tns:statusType"/> 561 <xsd:element name="experimentAccess" type="tns:accessType" minOccurs="0" 562 maxOccurs="1"/> 549 563 </xsd:sequence> 550 564 </xsd:complexType>
Note: See TracChangeset
for help on using the changeset viewer.