Changeset bd3e314


Ignore:
Timestamp:
Jul 24, 2009 1:22:34 PM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
a74ea78
Parents:
728001e
Message:

Asynchronous creation and logging. These are the fedd changes. Fedd_client next.

Location:
fedd
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r728001e rbd3e314  
    212212        self.thread_with_rv = experiment_control_local.pooled_thread
    213213        self.thread_pool = experiment_control_local.thread_pool
     214        self.list_log = experiment_control_local.list_log
    214215
    215216        self.cert_file = config.get("experiment_control", "cert_file")
     
    16451646                return True
    16461647
     1648    def allocate_resources(self, allocated, master, eid, expid, expcert,
     1649            tbparams, tmpdir, alloc_log=None):
     1650        started = { }           # Testbeds where a sub-experiment started
     1651                                # successfully
     1652
     1653        # XXX
     1654        fail_soft = False
     1655
     1656        log = alloc_log or self.log
     1657
     1658        thread_pool = self.thread_pool(self.nthreads)
     1659        threads = [ ]
     1660
     1661        for tb in [ k for k in allocated.keys() if k != master]:
     1662            # Create and start a thread to start the segment, and save it to
     1663            # get the return value later
     1664            thread_pool.wait_for_slot()
     1665            t  = self.pooled_thread(\
     1666                    target=self.start_segment(log=log,
     1667                        keyfile=self.ssh_privkey_file, debug=self.debug),
     1668                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
     1669                    pdata=thread_pool, trace_file=self.trace_file)
     1670            threads.append(t)
     1671            t.start()
     1672
     1673        # Wait until all finish
     1674        thread_pool.wait_for_all_done()
     1675
     1676        # If none failed, start the master
     1677        failed = [ t.getName() for t in threads if not t.rv ]
     1678
     1679        if len(failed) == 0:
     1680            starter = self.start_segment(log=log,
     1681                    keyfile=self.ssh_privkey_file, debug=self.debug)
     1682            if not starter(master, eid, tbparams, tmpdir):
     1683                failed.append(master)
     1684
     1685        succeeded = [tb for tb in allocated.keys() if tb not in failed]
     1686        # If one failed clean up, unless fail_soft is set
     1687        if failed:
     1688            if not fail_soft:
     1689                thread_pool.clear()
     1690                for tb in succeeded:
     1691                    # Create and start a thread to stop the segment
     1692                    thread_pool.wait_for_slot()
     1693                    t  = self.pooled_thread(\
     1694                            target=self.stop_segment(log=log,
     1695                                keyfile=self.ssh_privkey_file,
     1696                                debug=self.debug),
     1697                            args=(tb, eid, tbparams), name=tb,
     1698                            pdata=thread_pool, trace_file=self.trace_file)
     1699                    t.start()
     1700                # Wait until all finish
     1701                thread_pool.wait_for_all_done()
     1702
     1703                # release the allocations
     1704                for tb in tbparams.keys():
     1705                    self.release_access(tb, tbparams[tb]['allocID'])
     1706                # Remove the placeholder
     1707                self.state_lock.acquire()
     1708                self.state[eid]['experimentStatus'] = 'failed'
     1709                if self.state_filename: self.write_state()
     1710                self.state_lock.release()
     1711
     1712                #raise service_error(service_error.federant,
     1713                #    "Swap in failed on %s" % ",".join(failed))
     1714                log.error("Swap in failed on %s" % ",".join(failed))
     1715                return
     1716        else:
     1717            log.info("[start_segment]: Experiment %s active" % eid)
     1718
     1719        log.debug("[start_experiment]: removing %s" % tmpdir)
     1720
     1721        # Walk up tmpdir, deleting as we go
     1722        for path, dirs, files in os.walk(tmpdir, topdown=False):
     1723            for f in files:
     1724                os.remove(os.path.join(path, f))
     1725            for d in dirs:
     1726                os.rmdir(os.path.join(path, d))
     1727        os.rmdir(tmpdir)
     1728
     1729        # Insert the experiment into our state and update the disk copy
     1730        self.state_lock.acquire()
     1731        self.state[expid]['experimentStatus'] = 'active'
     1732        self.state[eid] = self.state[expid]
     1733        if self.state_filename: self.write_state()
     1734        self.state_lock.release()
     1735        return
     1736
    16471737    def create_experiment(self, req, fid):
    16481738        """
     
    16791769        pid = "dummy"
    16801770        gid = "dummy"
    1681         # XXX
    1682         fail_soft = False
    1683 
    16841771        try:
    16851772            os.mkdir(tmpdir+"/keys")
     
    17101797            raise service_error(service_error.req, "No experiment description")
    17111798
     1799        # Generate an ID for the experiment (slice) and a certificate that the
     1800        # allocator can use to prove they own it.  We'll ship it back through
     1801        # the encrypted connection.
     1802        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
     1803
    17121804        if req.has_key('experimentID') and \
    17131805                req['experimentID'].has_key('localname'):
     
    17161808            while (self.state.has_key(eid)):
    17171809                eid += random.choice(string.ascii_letters)
    1718             # To avoid another thread picking this localname
    1719             self.state[eid] = "placeholder"
     1810            # Initial state
     1811            self.state[eid] = {
     1812                    'experimentID' : \
     1813                            [ { 'localname' : eid }, {'fedid': expid } ],
     1814                    'experimentStatus': 'starting',
     1815                    'experimentAccess': { 'X509' : expcert },
     1816                    'owner': fid,
     1817                    'log' : [],
     1818                }
     1819            self.state[expid] = self.state[eid]
     1820            if self.state_filename: self.write_state()
    17201821            self.state_lock.release()
    17211822        else:
     
    17281829                for i in range(0,5):
    17291830                    eid += random.choice(string.ascii_letters)
    1730             # To avoid another thread picking this localname
    1731             self.state[eid] = "placeholder"
     1831            # Initial state
     1832            self.state[eid] = {
     1833                    'experimentID' : \
     1834                            [ { 'localname' : eid }, {'fedid': expid } ],
     1835                    'experimentStatus': 'starting',
     1836                    'experimentAccess': { 'X509' : expcert },
     1837                    'owner': fid,
     1838                    'log' : [],
     1839                }
     1840            self.state[expid] = self.state[eid]
     1841            if self.state_filename: self.write_state()
    17321842            self.state_lock.release()
    17331843
     
    17741884
    17751885            allocated = { }         # Testbeds we can access
    1776             started = { }           # Testbeds where a sub-experiment started
    1777                                     # successfully
    1778 
    17791886            # Objects to parse the splitter output (defined above)
    17801887            parse_current_testbed = self.current_testbed(eid, tmpdir,
     
    18241931                raise service_error(service_error.internal,
    18251932                        "Failed to generate visualization")
     1933
    18261934           
    18271935            # save federant information
     
    18341942                    }
    18351943
     1944            self.state_lock.acquire()
     1945            self.state[eid]['vtopo'] = vtopo
     1946            self.state[eid]['vis'] = vis
     1947            self.state[expid]['federant'] = \
     1948                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
     1949                        if tbparams[tb].has_key('federant') ]
     1950            if self.state_filename: self.write_state()
     1951            self.state_lock.release()
    18361952
    18371953            # Copy tarfiles and rpms needed at remote sites into a staging area
     
    18741990            # If something goes wrong in the parse (usually an access error)
    18751991            # clear the placeholder state.  From here on out the code delays
    1876             # exceptions.
     1992            # exceptions.  Failing at this point returns a fault to the remote
     1993            # caller.
    18771994            self.state_lock.acquire()
    18781995            del self.state[eid]
     1996            del self.state[expid]
     1997            if self.state_filename: self.write_state()
    18791998            self.state_lock.release()
    18801999            raise e
    18812000
    1882         thread_pool = self.thread_pool(self.nthreads)
    1883         threads = [ ]
    1884 
    1885         for tb in [ k for k in allocated.keys() if k != master]:
    1886             # Create and start a thread to start the segment, and save it to
    1887             # get the return value later
    1888             thread_pool.wait_for_slot()
    1889             t  = self.pooled_thread(\
    1890                     target=self.start_segment(log=self.log,
    1891                         keyfile=self.ssh_privkey_file, debug=self.debug),
    1892                     args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    1893                     pdata=thread_pool, trace_file=self.trace_file)
    1894             threads.append(t)
    1895             t.start()
    1896 
    1897         # Wait until all finish
    1898         thread_pool.wait_for_all_done()
    1899 
    1900         # If none failed, start the master
    1901         failed = [ t.getName() for t in threads if not t.rv ]
    1902 
    1903         if len(failed) == 0:
    1904             starter = self.start_segment(log=self.log,
    1905                     keyfile=self.ssh_privkey_file, debug=self.debug)
    1906             if not starter(master, eid, tbparams, tmpdir):
    1907                 failed.append(master)
    1908 
    1909         succeeded = [tb for tb in allocated.keys() if tb not in failed]
    1910         # If one failed clean up, unless fail_soft is set
    1911         if failed:
    1912             if not fail_soft:
    1913                 thread_pool.clear()
    1914                 for tb in succeeded:
    1915                     # Create and start a thread to stop the segment
    1916                     thread_pool.wait_for_slot()
    1917                     t  = self.pooled_thread(\
    1918                             target=self.stop_segment(log=self.log,
    1919                                 keyfile=self.ssh_privkey_file,
    1920                                 debug=self.debug),
    1921                             args=(tb, eid, tbparams), name=tb,
    1922                             pdata=thread_pool, trace_file=self.trace_file)
    1923                     t.start()
    1924                 # Wait until all finish
    1925                 thread_pool.wait_for_all_done()
    1926 
    1927                 # release the allocations
    1928                 for tb in tbparams.keys():
    1929                     self.release_access(tb, tbparams[tb]['allocID'])
    1930                 # Remove the placeholder
    1931                 self.state_lock.acquire()
    1932                 del self.state[eid]
    1933                 self.state_lock.release()
    1934 
    1935                 raise service_error(service_error.federant,
    1936                     "Swap in failed on %s" % ",".join(failed))
    1937         else:
    1938             self.log.info("[start_segment]: Experiment %s started" % eid)
    1939 
    1940         # Generate an ID for the experiment (slice) and a certificate that the
    1941         # allocator can use to prove they own it.  We'll ship it back through
    1942         # the encrypted connection.
    1943         (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
    1944 
    1945         self.log.debug("[start_experiment]: removing %s" % tmpdir)
    1946 
    1947         # Walk up tmpdir, deleting as we go
    1948         for path, dirs, files in os.walk(tmpdir, topdown=False):
    1949             for f in files:
    1950                 os.remove(os.path.join(path, f))
    1951             for d in dirs:
    1952                 os.rmdir(os.path.join(path, d))
    1953         os.rmdir(tmpdir)
    1954 
    1955         # The deepcopy prevents the allocation ID and other binaries from being
    1956         # translated into other formats
    1957         resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
    1958                 for tb in tbparams.keys() \
    1959                     if tbparams[tb].has_key('federant') ],\
    1960                     'vtopo': vtopo,\
    1961                     'vis' : vis,
    1962                     'experimentID' : [\
    1963                             { 'fedid': copy.copy(expid) }, \
    1964                             { 'localname': eid },\
    1965                         ],\
    1966                     'experimentAccess': { 'X509' : expcert },\
    1967                 }
    1968         # remove the allocationID info from each federant
    1969         for f in resp['federant']:
    1970             if f.has_key('allocID'): del f['allocID']
    1971 
    1972         # Insert the experiment into our state and update the disk copy
    1973         self.state_lock.acquire()
    1974         self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
    1975                 for tb in tbparams.keys() \
    1976                     if tbparams[tb].has_key('federant') ],\
    1977                     'vtopo': vtopo,\
    1978                     'vis' : vis,
    1979                     'owner': fid,
    1980                     'experimentID' : [\
    1981                             { 'fedid': expid }, { 'localname': eid },\
    1982                         ],\
    1983                 }
    1984         self.state[eid] = self.state[expid]
    1985         if self.state_filename: self.write_state()
    1986         self.state_lock.release()
    1987 
     2001
     2002        # Start the background swapper and return the starting state.  From
     2003        # here on out, the state will stick around a while.
     2004
     2005        # Let users touch the state
    19882006        self.auth.set_attribute(fid, expid)
    19892007        self.auth.set_attribute(expid, expid)
    19902008
    1991         if not failed:
    1992             return resp
    1993         else:
    1994             raise service_error(service_error.partial, \
    1995                     "Partial swap in on %s" % ",".join(succeeded))
     2009        # Create a logger that logs to the experiment's state object as well as
     2010        # to the main log file.
     2011
     2012        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
     2013        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     2014        # XXX: there should be a global one of these rather than repeating the
     2015        # code.
     2016        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2017                    '%d %b %y %H:%M:%S'))
     2018        alloc_log.addHandler(h)
     2019       
     2020
     2021
     2022
     2023
     2024        # Start a thread to do the resource allocation
     2025        t  = Thread(target=self.allocate_resources,
     2026                args=(allocated, master, eid, expid, expcert, tbparams,
     2027                    tmpdir, alloc_log),
     2028                name=eid)
     2029        t.start()
     2030
     2031        rv = {
     2032                'experimentID': [
     2033                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
     2034                ],
     2035                'experimentStatus': 'started',
     2036                'experimentAccess': { 'X509' : expcert }
     2037            }
     2038
     2039        return rv
    19962040
    19972041    def check_experiment_access(self, fid, key):
     
    20492093        """
    20502094        rv = None
     2095        state = None
    20512096
    20522097        req = req.get('VtopoRequestBody', None)
     
    20712116        self.state_lock.acquire()
    20722117        if self.state.has_key(key):
    2073             rv = { 'experiment' : {keytype: key },\
    2074                     'vtopo': self.state[key]['vtopo'],\
    2075                 }
     2118            if self.state[key].has_key('vtopo'):
     2119                rv = { 'experiment' : {keytype: key },\
     2120                        'vtopo': self.state[key]['vtopo'],\
     2121                    }
     2122            else:
     2123                state = self.state[key]['experimentStatus']
    20762124        self.state_lock.release()
    20772125
    20782126        if rv: return rv
    2079         else: raise service_error(service_error.req, "No such experiment")
     2127        else:
     2128            if state:
     2129                raise service_error(service_error.partial,
     2130                        "Not ready: %s" % state)
     2131            else:
     2132                raise service_error(service_error.req, "No such experiment")
    20802133
    20812134    def get_vis(self, req, fid):
     
    20842137        """
    20852138        rv = None
     2139        state = None
    20862140
    20872141        req = req.get('VisRequestBody', None)
     
    21062160        self.state_lock.acquire()
    21072161        if self.state.has_key(key):
    2108             rv =  { 'experiment' : {keytype: key },\
    2109                     'vis': self.state[key]['vis'],\
    2110                     }
     2162            if self.state[key].has_key('vis'):
     2163                rv =  { 'experiment' : {keytype: key },\
     2164                        'vis': self.state[key]['vis'],\
     2165                        }
     2166            else:
     2167                state = self.state[key]['experimentStatus']
    21112168        self.state_lock.release()
    21122169
    21132170        if rv: return rv
    2114         else: raise service_error(service_error.req, "No such experiment")
     2171        else:
     2172            if state:
     2173                raise service_error(service_error.partial,
     2174                        "Not ready: %s" % state)
     2175            else:
     2176                raise service_error(service_error.req, "No such experiment")
    21152177
    21162178    def get_info(self, req, fid):
     
    21462208            rv = copy.deepcopy(self.state[key])
    21472209        self.state_lock.release()
    2148         # Remove the owner info
    2149         del rv['owner']
    2150         # remove the allocationID info from each federant
    2151         for f in rv['federant']:
    2152             if f.has_key('allocID'): del f['allocID']
    2153 
    2154         if rv: return rv
    2155         else: raise service_error(service_error.req, "No such experiment")
     2210
     2211        if rv:
     2212            # Remove the owner info (should always be there, but...)
     2213            if rv.has_key('owner'): del rv['owner']
     2214
     2215            # Convert the log into the allocationLog parameter and remove the
     2216            # log entry (with defensive programming)
     2217            if rv.has_key('log'):
     2218                rv['allocationLog'] = "".join(rv['log'])
     2219                del rv['log']
     2220
     2221            if rv['experimentStatus'] != 'active':
     2222                if rv.has_key('federant'): del rv['federant']
     2223            else:
     2224                # remove the allocationID info from each federant
     2225                for f in rv.get('federant', []):
     2226                    if f.has_key('allocID'): del f['allocID']
     2227
     2228            return rv
     2229        else:
     2230            raise service_error(service_error.req, "No such experiment")
    21562231
    21572232
     
    21892264            # It releases the lock to do the deallocations and reacquires it to
    21902265            # remove the experiment state when the termination is complete.
     2266
     2267            # First make sure that the experiment creation is complete.
     2268            if fed_exp.has_key('experimentStatus'):
     2269                if fed_exp['experimentStatus'] == 'started':
     2270                    self.state_lock.release()
     2271                    raise service_error(service_error.partial,
     2272                            'Experiment still being created')
     2273            else:
     2274                # No status??? trouble
     2275                self.state_lock.release()
     2276                raise service_error(service_error.internal,
     2277                        "Experiment has no status!?")
     2278
    21912279            ids = []
    21922280            #  experimentID is a list of dicts that are self-describing
     
    21992287            # Construct enough of the tbparams to make the stop_segment calls
    22002288            # work
    2201             for fed in fed_exp['federant']:
     2289            for fed in fed_exp.get('federant', []):
    22022290                try:
    22032291                    for e in fed['name']:
     
    22332321                # Create and start a thread to stop the segment
    22342322                thread_pool.wait_for_slot()
    2235                 t  = self.pooled_thread(target=self.stop_segment,
     2323                t  = self.pooled_thread(\
     2324                        target=self.stop_segment(log=self.log,
     2325                            keyfile=self.ssh_privkey_file, debug=self.debug),
    22362326                        args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    22372327                        pdata=thread_pool, trace_file=self.trace_file)
  • fedd/wsdl/fedd_types.xsd

    r728001e rbd3e314  
    5252      <xsd:enumeration value="max"/>
    5353      <xsd:enumeration value="average"/>
     54    </xsd:restriction>
     55  </xsd:simpleType>
     56
     57  <xsd:simpleType name="statusType">
     58    <xsd:annotation>
     59      <xsd:documentation>
     60        The current state of the experiment.
     61      </xsd:documentation>
     62    </xsd:annotation>
     63    <xsd:restriction base="xsd:string">
     64      <xsd:enumeration value="active"/>
     65      <xsd:enumeration value="starting"/>
     66      <xsd:enumeration value="terminating"/>
     67      <xsd:enumeration value="failed"/>
    5468    </xsd:restriction>
    5569  </xsd:simpleType>
     
    440454    <xsd:annotation>
    441455      <xsd:documentation>
    442         The reply to a successful creation request.  Includes the
    443         information about federants hosting sub-experiments for service
    444         access as well as virtual topology and visualization
    445         information.  All that information is relative to the requester.
    446         ExperimentAccess includes credentials with which one can access
    447         the experiment.  These may include a public key necessary to
    448         prove possession of the credential and should be treated with
    449         care.
    450       </xsd:documentation>
    451     </xsd:annotation>
    452     <xsd:sequence>
    453       <xsd:element name="federant" type="tns:federatedExperimentType"
    454         minOccurs="1" maxOccurs="unbounded"/>
    455       <xsd:element name="vtopo" type="tns:vtopoType" minOccurs="0"
    456         maxOccurs="1"/>
    457       <xsd:element name="vis" type="tns:visType" minOccurs="0"
    458         maxOccurs="1"/>
     456        Returned to let the caller know that the request is underway and what
     457        credentials will eventauly be able to be used to access them.
     458      </xsd:documentation>
     459    </xsd:annotation>
     460    <xsd:sequence>
    459461      <xsd:element name="experimentID" type="tns:IDType" minOccurs="1"
    460462        maxOccurs="unbounded"/>
     463      <xsd:element name="experimentStatus" type="tns:statusType"/>
    461464      <xsd:element name="experimentAccess" type="tns:accessType" minOccurs="0"
    462465        maxOccurs="1"/>
     
    523526        A combined topology, visualalization, and federant request.
    524527        Different information may be returned based on the user's rights
    525         to see the topology.  </xsd:documentation>
     528        to see the topology.
     529      </xsd:documentation>
    526530    </xsd:annotation>
    527531    <xsd:sequence>
     
    533537    <xsd:annotation>
    534538      <xsd:documentation>
    535         Information on an instantiated experiment.  A createResponse
    536         without the secret information.  Different information may be
    537         returned based on the user's rights to see the topology.
     539        Information on an instantiated experiment.  Different information may
     540        be returned based on the user's rights to see the topology.  Includes
     541        the information about federants hosting sub-experiments for service
     542        access as well as virtual topology and visualization information.  All
     543        that information is relative to the requester.  ExperimentAccess
     544        includes credentials with which one can access the experiment.  These
     545        may include a public key necessary to prove possession of the
     546        credential and should be treated with care.
    538547      </xsd:documentation>
    539548    </xsd:annotation>
    540549    <xsd:sequence>
    541550      <xsd:element name="federant" type="tns:federatedExperimentType"
    542         minOccurs="1" maxOccurs="unbounded"/>
     551        minOccurs="0" maxOccurs="unbounded"/>
    543552      <xsd:element name="vtopo" type="tns:vtopoType" minOccurs="0"
    544553        maxOccurs="1"/>
     
    547556      <xsd:element name="experimentID" type="tns:IDType" minOccurs="1"
    548557        maxOccurs="unbounded"/>
     558      <xsd:element name="allocationLog" type="xsd:string" minOccurs="0"
     559        maxOccurs="1"/>
     560      <xsd:element name="experimentStatus" type="tns:statusType"/>
     561      <xsd:element name="experimentAccess" type="tns:accessType" minOccurs="0"
     562        maxOccurs="1"/>
    549563    </xsd:sequence>
    550564  </xsd:complexType>
Note: See TracChangeset for help on using the changeset viewer.