Ignore:
Timestamp:
Jul 24, 2009 1:22:34 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
a74ea78
Parents:
728001e
Message:

Asynchronous creation and logging. These are the fedd changes. Fedd_client next.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r728001e rbd3e314  
    212212        self.thread_with_rv = experiment_control_local.pooled_thread
    213213        self.thread_pool = experiment_control_local.thread_pool
     214        self.list_log = experiment_control_local.list_log
    214215
    215216        self.cert_file = config.get("experiment_control", "cert_file")
     
    16451646                return True
    16461647
     1648    def allocate_resources(self, allocated, master, eid, expid, expcert,
     1649            tbparams, tmpdir, alloc_log=None):
     1650        started = { }           # Testbeds where a sub-experiment started
     1651                                # successfully
     1652
     1653        # XXX
     1654        fail_soft = False
     1655
     1656        log = alloc_log or self.log
     1657
     1658        thread_pool = self.thread_pool(self.nthreads)
     1659        threads = [ ]
     1660
     1661        for tb in [ k for k in allocated.keys() if k != master]:
     1662            # Create and start a thread to start the segment, and save it to
     1663            # get the return value later
     1664            thread_pool.wait_for_slot()
     1665            t  = self.pooled_thread(\
     1666                    target=self.start_segment(log=log,
     1667                        keyfile=self.ssh_privkey_file, debug=self.debug),
     1668                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
     1669                    pdata=thread_pool, trace_file=self.trace_file)
     1670            threads.append(t)
     1671            t.start()
     1672
     1673        # Wait until all finish
     1674        thread_pool.wait_for_all_done()
     1675
     1676        # If none failed, start the master
     1677        failed = [ t.getName() for t in threads if not t.rv ]
     1678
     1679        if len(failed) == 0:
     1680            starter = self.start_segment(log=log,
     1681                    keyfile=self.ssh_privkey_file, debug=self.debug)
     1682            if not starter(master, eid, tbparams, tmpdir):
     1683                failed.append(master)
     1684
     1685        succeeded = [tb for tb in allocated.keys() if tb not in failed]
     1686        # If one failed clean up, unless fail_soft is set
     1687        if failed:
     1688            if not fail_soft:
     1689                thread_pool.clear()
     1690                for tb in succeeded:
     1691                    # Create and start a thread to stop the segment
     1692                    thread_pool.wait_for_slot()
     1693                    t  = self.pooled_thread(\
     1694                            target=self.stop_segment(log=log,
     1695                                keyfile=self.ssh_privkey_file,
     1696                                debug=self.debug),
     1697                            args=(tb, eid, tbparams), name=tb,
     1698                            pdata=thread_pool, trace_file=self.trace_file)
     1699                    t.start()
     1700                # Wait until all finish
     1701                thread_pool.wait_for_all_done()
     1702
     1703                # release the allocations
     1704                for tb in tbparams.keys():
     1705                    self.release_access(tb, tbparams[tb]['allocID'])
     1706                # Remove the placeholder
     1707                self.state_lock.acquire()
     1708                self.state[eid]['experimentStatus'] = 'failed'
     1709                if self.state_filename: self.write_state()
     1710                self.state_lock.release()
     1711
     1712                #raise service_error(service_error.federant,
     1713                #    "Swap in failed on %s" % ",".join(failed))
     1714                log.error("Swap in failed on %s" % ",".join(failed))
     1715                return
     1716        else:
     1717            log.info("[start_segment]: Experiment %s active" % eid)
     1718
     1719        log.debug("[start_experiment]: removing %s" % tmpdir)
     1720
     1721        # Walk up tmpdir, deleting as we go
     1722        for path, dirs, files in os.walk(tmpdir, topdown=False):
     1723            for f in files:
     1724                os.remove(os.path.join(path, f))
     1725            for d in dirs:
     1726                os.rmdir(os.path.join(path, d))
     1727        os.rmdir(tmpdir)
     1728
     1729        # Insert the experiment into our state and update the disk copy
     1730        self.state_lock.acquire()
     1731        self.state[expid]['experimentStatus'] = 'active'
     1732        self.state[eid] = self.state[expid]
     1733        if self.state_filename: self.write_state()
     1734        self.state_lock.release()
     1735        return
     1736
    16471737    def create_experiment(self, req, fid):
    16481738        """
     
    16791769        pid = "dummy"
    16801770        gid = "dummy"
    1681         # XXX
    1682         fail_soft = False
    1683 
    16841771        try:
    16851772            os.mkdir(tmpdir+"/keys")
     
    17101797            raise service_error(service_error.req, "No experiment description")
    17111798
     1799        # Generate an ID for the experiment (slice) and a certificate that the
     1800        # allocator can use to prove they own it.  We'll ship it back through
     1801        # the encrypted connection.
     1802        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
     1803
    17121804        if req.has_key('experimentID') and \
    17131805                req['experimentID'].has_key('localname'):
     
    17161808            while (self.state.has_key(eid)):
    17171809                eid += random.choice(string.ascii_letters)
    1718             # To avoid another thread picking this localname
    1719             self.state[eid] = "placeholder"
     1810            # Initial state
     1811            self.state[eid] = {
     1812                    'experimentID' : \
     1813                            [ { 'localname' : eid }, {'fedid': expid } ],
     1814                    'experimentStatus': 'starting',
     1815                    'experimentAccess': { 'X509' : expcert },
     1816                    'owner': fid,
     1817                    'log' : [],
     1818                }
     1819            self.state[expid] = self.state[eid]
     1820            if self.state_filename: self.write_state()
    17201821            self.state_lock.release()
    17211822        else:
     
    17281829                for i in range(0,5):
    17291830                    eid += random.choice(string.ascii_letters)
    1730             # To avoid another thread picking this localname
    1731             self.state[eid] = "placeholder"
     1831            # Initial state
     1832            self.state[eid] = {
     1833                    'experimentID' : \
     1834                            [ { 'localname' : eid }, {'fedid': expid } ],
     1835                    'experimentStatus': 'starting',
     1836                    'experimentAccess': { 'X509' : expcert },
     1837                    'owner': fid,
     1838                    'log' : [],
     1839                }
     1840            self.state[expid] = self.state[eid]
     1841            if self.state_filename: self.write_state()
    17321842            self.state_lock.release()
    17331843
     
    17741884
    17751885            allocated = { }         # Testbeds we can access
    1776             started = { }           # Testbeds where a sub-experiment started
    1777                                     # successfully
    1778 
    17791886            # Objects to parse the splitter output (defined above)
    17801887            parse_current_testbed = self.current_testbed(eid, tmpdir,
     
    18241931                raise service_error(service_error.internal,
    18251932                        "Failed to generate visualization")
     1933
    18261934           
    18271935            # save federant information
     
    18341942                    }
    18351943
     1944            self.state_lock.acquire()
     1945            self.state[eid]['vtopo'] = vtopo
     1946            self.state[eid]['vis'] = vis
     1947            self.state[expid]['federant'] = \
     1948                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
     1949                        if tbparams[tb].has_key('federant') ]
     1950            if self.state_filename: self.write_state()
     1951            self.state_lock.release()
    18361952
    18371953            # Copy tarfiles and rpms needed at remote sites into a staging area
     
    18741990            # If something goes wrong in the parse (usually an access error)
    18751991            # clear the placeholder state.  From here on out the code delays
    1876             # exceptions.
     1992            # exceptions.  Failing at this point returns a fault to the remote
     1993            # caller.
    18771994            self.state_lock.acquire()
    18781995            del self.state[eid]
     1996            del self.state[expid]
     1997            if self.state_filename: self.write_state()
    18791998            self.state_lock.release()
    18801999            raise e
    18812000
    1882         thread_pool = self.thread_pool(self.nthreads)
    1883         threads = [ ]
    1884 
    1885         for tb in [ k for k in allocated.keys() if k != master]:
    1886             # Create and start a thread to start the segment, and save it to
    1887             # get the return value later
    1888             thread_pool.wait_for_slot()
    1889             t  = self.pooled_thread(\
    1890                     target=self.start_segment(log=self.log,
    1891                         keyfile=self.ssh_privkey_file, debug=self.debug),
    1892                     args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    1893                     pdata=thread_pool, trace_file=self.trace_file)
    1894             threads.append(t)
    1895             t.start()
    1896 
    1897         # Wait until all finish
    1898         thread_pool.wait_for_all_done()
    1899 
    1900         # If none failed, start the master
    1901         failed = [ t.getName() for t in threads if not t.rv ]
    1902 
    1903         if len(failed) == 0:
    1904             starter = self.start_segment(log=self.log,
    1905                     keyfile=self.ssh_privkey_file, debug=self.debug)
    1906             if not starter(master, eid, tbparams, tmpdir):
    1907                 failed.append(master)
    1908 
    1909         succeeded = [tb for tb in allocated.keys() if tb not in failed]
    1910         # If one failed clean up, unless fail_soft is set
    1911         if failed:
    1912             if not fail_soft:
    1913                 thread_pool.clear()
    1914                 for tb in succeeded:
    1915                     # Create and start a thread to stop the segment
    1916                     thread_pool.wait_for_slot()
    1917                     t  = self.pooled_thread(\
    1918                             target=self.stop_segment(log=self.log,
    1919                                 keyfile=self.ssh_privkey_file,
    1920                                 debug=self.debug),
    1921                             args=(tb, eid, tbparams), name=tb,
    1922                             pdata=thread_pool, trace_file=self.trace_file)
    1923                     t.start()
    1924                 # Wait until all finish
    1925                 thread_pool.wait_for_all_done()
    1926 
    1927                 # release the allocations
    1928                 for tb in tbparams.keys():
    1929                     self.release_access(tb, tbparams[tb]['allocID'])
    1930                 # Remove the placeholder
    1931                 self.state_lock.acquire()
    1932                 del self.state[eid]
    1933                 self.state_lock.release()
    1934 
    1935                 raise service_error(service_error.federant,
    1936                     "Swap in failed on %s" % ",".join(failed))
    1937         else:
    1938             self.log.info("[start_segment]: Experiment %s started" % eid)
    1939 
    1940         # Generate an ID for the experiment (slice) and a certificate that the
    1941         # allocator can use to prove they own it.  We'll ship it back through
    1942         # the encrypted connection.
    1943         (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
    1944 
    1945         self.log.debug("[start_experiment]: removing %s" % tmpdir)
    1946 
    1947         # Walk up tmpdir, deleting as we go
    1948         for path, dirs, files in os.walk(tmpdir, topdown=False):
    1949             for f in files:
    1950                 os.remove(os.path.join(path, f))
    1951             for d in dirs:
    1952                 os.rmdir(os.path.join(path, d))
    1953         os.rmdir(tmpdir)
    1954 
    1955         # The deepcopy prevents the allocation ID and other binaries from being
    1956         # translated into other formats
    1957         resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
    1958                 for tb in tbparams.keys() \
    1959                     if tbparams[tb].has_key('federant') ],\
    1960                     'vtopo': vtopo,\
    1961                     'vis' : vis,
    1962                     'experimentID' : [\
    1963                             { 'fedid': copy.copy(expid) }, \
    1964                             { 'localname': eid },\
    1965                         ],\
    1966                     'experimentAccess': { 'X509' : expcert },\
    1967                 }
    1968         # remove the allocationID info from each federant
    1969         for f in resp['federant']:
    1970             if f.has_key('allocID'): del f['allocID']
    1971 
    1972         # Insert the experiment into our state and update the disk copy
    1973         self.state_lock.acquire()
    1974         self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
    1975                 for tb in tbparams.keys() \
    1976                     if tbparams[tb].has_key('federant') ],\
    1977                     'vtopo': vtopo,\
    1978                     'vis' : vis,
    1979                     'owner': fid,
    1980                     'experimentID' : [\
    1981                             { 'fedid': expid }, { 'localname': eid },\
    1982                         ],\
    1983                 }
    1984         self.state[eid] = self.state[expid]
    1985         if self.state_filename: self.write_state()
    1986         self.state_lock.release()
    1987 
     2001
     2002        # Start the background swapper and return the starting state.  From
     2003        # here on out, the state will stick around a while.
     2004
     2005        # Let users touch the state
    19882006        self.auth.set_attribute(fid, expid)
    19892007        self.auth.set_attribute(expid, expid)
    19902008
    1991         if not failed:
    1992             return resp
    1993         else:
    1994             raise service_error(service_error.partial, \
    1995                     "Partial swap in on %s" % ",".join(succeeded))
     2009        # Create a logger that logs to the experiment's state object as well as
     2010        # to the main log file.
     2011
     2012        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
     2013        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     2014        # XXX: there should be a global one of these rather than repeating the
     2015        # code.
     2016        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2017                    '%d %b %y %H:%M:%S'))
     2018        alloc_log.addHandler(h)
     2019       
     2020
     2021
     2022
     2023
     2024        # Start a thread to do the resource allocation
     2025        t  = Thread(target=self.allocate_resources,
     2026                args=(allocated, master, eid, expid, expcert, tbparams,
     2027                    tmpdir, alloc_log),
     2028                name=eid)
     2029        t.start()
     2030
     2031        rv = {
     2032                'experimentID': [
     2033                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
     2034                ],
     2035                'experimentStatus': 'started',
     2036                'experimentAccess': { 'X509' : expcert }
     2037            }
     2038
     2039        return rv
    19962040
    19972041    def check_experiment_access(self, fid, key):
     
    20492093        """
    20502094        rv = None
     2095        state = None
    20512096
    20522097        req = req.get('VtopoRequestBody', None)
     
    20712116        self.state_lock.acquire()
    20722117        if self.state.has_key(key):
    2073             rv = { 'experiment' : {keytype: key },\
    2074                     'vtopo': self.state[key]['vtopo'],\
    2075                 }
     2118            if self.state[key].has_key('vtopo'):
     2119                rv = { 'experiment' : {keytype: key },\
     2120                        'vtopo': self.state[key]['vtopo'],\
     2121                    }
     2122            else:
     2123                state = self.state[key]['experimentStatus']
    20762124        self.state_lock.release()
    20772125
    20782126        if rv: return rv
    2079         else: raise service_error(service_error.req, "No such experiment")
     2127        else:
     2128            if state:
     2129                raise service_error(service_error.partial,
     2130                        "Not ready: %s" % state)
     2131            else:
     2132                raise service_error(service_error.req, "No such experiment")
    20802133
    20812134    def get_vis(self, req, fid):
     
    20842137        """
    20852138        rv = None
     2139        state = None
    20862140
    20872141        req = req.get('VisRequestBody', None)
     
    21062160        self.state_lock.acquire()
    21072161        if self.state.has_key(key):
    2108             rv =  { 'experiment' : {keytype: key },\
    2109                     'vis': self.state[key]['vis'],\
    2110                     }
     2162            if self.state[key].has_key('vis'):
     2163                rv =  { 'experiment' : {keytype: key },\
     2164                        'vis': self.state[key]['vis'],\
     2165                        }
     2166            else:
     2167                state = self.state[key]['experimentStatus']
    21112168        self.state_lock.release()
    21122169
    21132170        if rv: return rv
    2114         else: raise service_error(service_error.req, "No such experiment")
     2171        else:
     2172            if state:
     2173                raise service_error(service_error.partial,
     2174                        "Not ready: %s" % state)
     2175            else:
     2176                raise service_error(service_error.req, "No such experiment")
    21152177
    21162178    def get_info(self, req, fid):
     
    21462208            rv = copy.deepcopy(self.state[key])
    21472209        self.state_lock.release()
    2148         # Remove the owner info
    2149         del rv['owner']
    2150         # remove the allocationID info from each federant
    2151         for f in rv['federant']:
    2152             if f.has_key('allocID'): del f['allocID']
    2153 
    2154         if rv: return rv
    2155         else: raise service_error(service_error.req, "No such experiment")
     2210
     2211        if rv:
     2212            # Remove the owner info (should always be there, but...)
     2213            if rv.has_key('owner'): del rv['owner']
     2214
     2215            # Convert the log into the allocationLog parameter and remove the
     2216            # log entry (with defensive programming)
     2217            if rv.has_key('log'):
     2218                rv['allocationLog'] = "".join(rv['log'])
     2219                del rv['log']
     2220
     2221            if rv['experimentStatus'] != 'active':
     2222                if rv.has_key('federant'): del rv['federant']
     2223            else:
     2224                # remove the allocationID info from each federant
     2225                for f in rv.get('federant', []):
     2226                    if f.has_key('allocID'): del f['allocID']
     2227
     2228            return rv
     2229        else:
     2230            raise service_error(service_error.req, "No such experiment")
    21562231
    21572232
     
    21892264            # It releases the lock to do the deallocations and reacquires it to
    21902265            # remove the experiment state when the termination is complete.
     2266
     2267            # First make sure that the experiment creation is complete.
     2268            if fed_exp.has_key('experimentStatus'):
     2269                if fed_exp['experimentStatus'] == 'started':
     2270                    self.state_lock.release()
     2271                    raise service_error(service_error.partial,
     2272                            'Experiment still being created')
     2273            else:
     2274                # No status??? trouble
     2275                self.state_lock.release()
     2276                raise service_error(service_error.internal,
     2277                        "Experiment has no status!?")
     2278
    21912279            ids = []
    21922280            #  experimentID is a list of dicts that are self-describing
     
    21992287            # Construct enough of the tbparams to make the stop_segment calls
    22002288            # work
    2201             for fed in fed_exp['federant']:
     2289            for fed in fed_exp.get('federant', []):
    22022290                try:
    22032291                    for e in fed['name']:
     
    22332321                # Create and start a thread to stop the segment
    22342322                thread_pool.wait_for_slot()
    2235                 t  = self.pooled_thread(target=self.stop_segment,
     2323                t  = self.pooled_thread(\
     2324                        target=self.stop_segment(log=self.log,
     2325                            keyfile=self.ssh_privkey_file, debug=self.debug),
    22362326                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    22372327                        pdata=thread_pool, trace_file=self.trace_file)
Note: See TracChangeset for help on using the changeset viewer.