Changeset cf0ff4f for fedd


Ignore:
Timestamp:
Dec 6, 2010 4:50:57 PM (14 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master
Children:
35a5879
Parents:
5ecb9a3
Message:

End of a detangling pass.

There are still some functions that are too long, but overall this is a
cleaner version of the experiment controller that's somewhat easier to
read and maintain.

Closes #10

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r5ecb9a3 rcf0ff4f  
    509509            self.log.error("Pickling problem (TypeError): %s" % e)
    510510
     511
     512    def remove_dirs(self, dir):
     513        """
     514        Remove the directory tree and all files rooted at dir.  Log any errors,
     515        but continue.
     516        """
     517        self.log.debug("[removedirs]: removing %s" % dir)
     518        try:
     519            for path, dirs, files in os.walk(dir, topdown=False):
     520                for f in files:
     521                    os.remove(os.path.join(path, f))
     522                for d in dirs:
     523                    os.rmdir(os.path.join(path, d))
     524            os.rmdir(dir)
     525        except EnvironmentError, e:
     526            self.log.error("Error deleting directory tree in %s" % e);
     527
     528    @staticmethod
     529    def make_temp_certfile(expcert, tmpdir):
     530        """
     531        make a protected copy of the access certificate so the experiment
     532        controller can act as the experiment principal.  mkstemp is the most
     533        secure way to do that. The directory should be created by
     534        mkdtemp.  Return the filename.
     535        """
     536        if expcert and tmpdir:
     537            try:
     538                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
     539                f = os.fdopen(certf, 'w')
     540                print >> f, expcert
     541                f.close()
     542            except EnvironmentError, e:
     543                raise service_error(service_error.internal,
     544                        "Cannot create temp cert file?")
     545            return certfn
     546        else:
     547            return None
     548
    511549       
    512550    def generate_ssh_keys(self, dest, type="rsa" ):
     
    847885                        (self.testbed, e))
    848886                return False
    849    
    850887
    851888    def allocate_resources(self, allocated, masters, eid, expid,
     
    14061443                            "new_experiment for fedid %s"  % fid)
    14071444
    1408         pid = "dummy"
    1409         gid = "dummy"
    1410 
    14111445        # Generate an ID for the experiment (slice) and a certificate that the
    14121446        # allocator can use to prove they own it.  We'll ship it back through
     
    14521486        return rv
    14531487
     1488    # create_experiment sub-functions
     1489
    14541490    @staticmethod
    1455     def get_create_key(req):
     1491    def get_experiment_key(req, field='experimentID'):
    14561492        """
    14571493        Parse the experiment identifiers out of the request (the request body
     
    14621498        """
    14631499        # Get the experiment access
    1464         exp = req.get('experimentID', None)
     1500        exp = req.get(field, None)
    14651501        if exp:
    14661502            if exp.has_key('fedid'):
     
    16111647        return masters, pmasters
    16121648
    1613 
    1614     def create_experiment(self, req, fid):
    1615         """
    1616         The external interface to experiment creation called from the
    1617         dispatcher.
    1618 
    1619         Creates a working directory, splits the incoming description using the
    1620         splitter script and parses out the various subsections using the
    1621         classes above.  Once each sub-experiment is created, use pooled threads
    1622         to instantiate them and start it all up.
    1623         """
    1624 
    1625         req = req.get('CreateRequestBody', None)
    1626         if req:
    1627             key = self.get_create_key(req)
    1628         else:
    1629             raise service_error(service_error.req,
    1630                     "Bad request format (no CreateRequestBody)")
    1631 
    1632         # Import information from the requester
    1633         if self.auth.import_credentials(data_list=req.get('credential', [])):
    1634             self.auth.save()
    1635 
    1636         # Make sure that the caller can talk to us
    1637         self.check_experiment_access(fid, key)
    1638 
    1639         # Install the testbed map entries supplied with the request into a copy
    1640         # of the testbed map.
    1641         tbmap = dict(self.tbmap)
    1642         for m in req.get('testbedmap', []):
    1643             if 'testbed' in m and 'uri' in m:
    1644                 tbmap[m['testbed']] = m['uri']
    1645 
    1646         # a place to work
    1647         try:
    1648             tmpdir = tempfile.mkdtemp(prefix="split-")
    1649             os.mkdir(tmpdir+"/keys")
    1650         except EnvironmentError:
    1651             raise service_error(service_error.internal, "Cannot create tmp dir")
    1652 
     1649    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
     1650        """
     1651        Create the ssh keys necessary for interconnecting the potral nodes and
     1652        the global hosts file for letting each segment know about the IP
     1653        addresses in play.  Save these into the repo.  Add attributes to the
     1654        autorizer allowing access controllers to download them and return a set
     1655        of attributes that inform the segments where to find this stuff.  Mau
     1656        raise service_errors in if there are problems.
     1657        """
    16531658        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
    16541659        gw_secretkey_base = "fed.%s" % self.ssh_type
    16551660        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
    16561661        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
    1657         tbparams = { }
    1658 
    1659         eid, expid, expcert_file = \
    1660                 self.get_experiment_ids_and_start(key, tmpdir)
    1661 
    1662         # This catches exceptions to clear the placeholder if necessary
    1663         try:
    1664             if not (eid and expid):
    1665                 raise service_error(service_error.internal,
    1666                         "Cannot find local experiment info!?")
    1667             try:
    1668                 self.generate_ssh_keys(gw_secretkey, self.ssh_type)
    1669             except ValueError:
    1670                 raise service_error(service_error.server_config,
    1671                         "Bad key type (%s)" % self.ssh_type)
    1672 
    1673             top = self.get_topology(req, tmpdir)
    1674 
    1675             # Assign the IPs
    1676             hosts, ip_allocator = self.allocate_ips_to_topo(top)
    1677             # Find the testbeds to look up
    1678             tb_hosts = { }
    1679             testbeds = [ ]
    1680             for e in top.elements:
    1681                 if isinstance(e, topdl.Computer):
    1682                     tb = e.get_attribute('testbed') or 'default'
    1683                     if tb in tb_hosts: tb_hosts[tb].append(e.name)
    1684                     else:
    1685                         tb_hosts[tb] = [ e.name ]
    1686                         testbeds.append(tb)
    1687 
    1688             masters, pmasters = self.get_testbed_services(req)
    1689             allocated = { }         # Testbeds we can access
    1690             topo ={ }               # Sub topologies
    1691             connInfo = { }          # Connection information
    1692 
    1693             self.get_access_to_testbeds(testbeds, fid, allocated,
    1694                     tbparams, masters, tbmap, expid, expcert_file)
    1695 
    1696             self.split_topology(top, topo, testbeds)
    1697 
    1698             # Copy configuration files into the remote file store
    1699             # The config urlpath
    1700             configpath = "/%s/config" % expid
    1701             # The config file system location
    1702             configdir ="%s%s" % ( self.repodir, configpath)
    1703             try:
    1704                 os.makedirs(configdir)
    1705             except EnvironmentError, e:
    1706                 raise service_error(service_error.internal,
    1707                         "Cannot create config directory: %s" % e)
    1708             try:
    1709                 f = open("%s/hosts" % configdir, "w")
    1710                 f.write('\n'.join(hosts))
    1711                 f.close()
    1712             except EnvironmentError, e:
    1713                 raise service_error(service_error.internal,
    1714                         "Cannot write hosts file: %s" % e)
    1715             try:
    1716                 copy_file("%s" % gw_pubkey, "%s/%s" % \
    1717                         (configdir, gw_pubkey_base))
    1718                 copy_file("%s" % gw_secretkey, "%s/%s" % \
    1719                         (configdir, gw_secretkey_base))
    1720             except EnvironmentError, e:
    1721                 raise service_error(service_error.internal,
    1722                         "Cannot copy keyfiles: %s" % e)
    1723 
    1724             # Allow the individual testbeds to access the configuration files.
    1725             for tb in tbparams.keys():
    1726                 asignee = tbparams[tb]['allocID']['fedid']
    1727                 for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
    1728                     self.auth.set_attribute(asignee, "%s/%s" % \
    1729                             (configpath, f))
    1730 
    1731             part = experiment_partition(self.auth, self.store_url, tbmap,
    1732                     self.muxmax, self.direct_transit)
    1733             part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
    1734                     connInfo, expid)
    1735             # Now get access to the dynamic testbeds (those added above)
    1736             for tb in [ t for t in topo if t not in allocated]:
    1737                 #XXX: ABAC
    1738                 if self.auth_type =='legacy':
    1739                     self.get_legacy_access(tb, None, tbparams, access_user,
    1740                             masters, tbmap)
    1741                 elif self.auth_type == 'abac':
    1742                     self.get_access(tb, tbparams, fid, masters, tbmap,
    1743                             expid, expcert_file)
    1744                 else:
    1745                     raise service_error(service_error.internal,
    1746                             "Unknown auth_type %s" % self.auth_type)
    1747                 allocated[tb] = 1
    1748                 store_keys = topo[tb].get_attribute('store_keys')
    1749                 # Give the testbed access to keys it exports or imports
    1750                 if store_keys:
    1751                     for sk in store_keys.split(" "):
    1752                         self.auth.set_attribute(\
    1753                                 tbparams[tb]['allocID']['fedid'], sk)
     1662
     1663        try:
     1664            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
     1665        except ValueError:
     1666            raise service_error(service_error.server_config,
     1667                    "Bad key type (%s)" % self.ssh_type)
     1668
     1669
     1670        # Copy configuration files into the remote file store
     1671        # The config urlpath
     1672        configpath = "/%s/config" % expid
     1673        # The config file system location
     1674        configdir ="%s%s" % ( self.repodir, configpath)
     1675        try:
     1676            os.makedirs(configdir)
     1677        except EnvironmentError, e:
     1678            raise service_error(service_error.internal,
     1679                    "Cannot create config directory: %s" % e)
     1680        try:
     1681            f = open("%s/hosts" % configdir, "w")
     1682            print >> f, string.join(hosts, '\n')
     1683            f.close()
     1684        except EnvironmentError, e:
     1685            raise service_error(service_error.internal,
     1686                    "Cannot write hosts file: %s" % e)
     1687        try:
     1688            copy_file("%s" % gw_pubkey, "%s/%s" % \
     1689                    (configdir, gw_pubkey_base))
     1690            copy_file("%s" % gw_secretkey, "%s/%s" % \
     1691                    (configdir, gw_secretkey_base))
     1692        except EnvironmentError, e:
     1693            raise service_error(service_error.internal,
     1694                    "Cannot copy keyfiles: %s" % e)
     1695
     1696        # Allow the individual testbeds to access the configuration files.
     1697        for tb in tbparams.keys():
     1698            asignee = tbparams[tb]['allocID']['fedid']
     1699            for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
     1700                self.auth.set_attribute(asignee, "%s/%s" % \
     1701                        (configpath, f))
    17541702            self.auth.save()
    17551703
    1756             self.wrangle_software(expid, top, topo, tbparams)
    1757 
    1758             vtopo = topdl.topology_to_vtopo(top)
    1759             vis = self.genviz(vtopo)
    1760 
    1761             # save federant information
    1762             for k in allocated.keys():
    1763                 tbparams[k]['federant'] = {
    1764                         'name': [ { 'localname' : eid} ],
    1765                         'allocID' : tbparams[k]['allocID'],
    1766                         'uri': tbparams[k]['uri'],
    1767                     }
    1768 
    1769             self.state_lock.acquire()
    1770             self.state[eid]['vtopo'] = vtopo
    1771             self.state[eid]['vis'] = vis
    1772             self.state[eid]['experimentdescription'] = \
    1773                     { 'topdldescription': top.to_dict() }
    1774             self.state[eid]['federant'] = \
    1775                     [ tbparams[tb]['federant'] for tb in tbparams.keys() \
    1776                         if tbparams[tb].has_key('federant') ]
    1777             if self.state_filename:
    1778                 self.write_state()
    1779             self.state_lock.release()
    1780         except service_error, e:
    1781             # If something goes wrong in the parse (usually an access error)
    1782             # clear the placeholder state.  From here on out the code delays
    1783             # exceptions.  Failing at this point returns a fault to the remote
    1784             # caller.
    1785 
    1786             self.state_lock.acquire()
    1787             del self.state[eid]
    1788             del self.state[expid]
    1789             if self.state_filename: self.write_state()
    1790             self.state_lock.release()
    1791             if tmpdir and self.cleanup:
    1792                 self.remove_dirs(tmpdir)
    1793             raise e
    1794 
    1795 
    1796         # Start the background swapper and return the starting state.  From
    1797         # here on out, the state will stick around a while.
    1798 
    1799         # Let users touch the state
    1800         self.auth.set_attribute(fid, expid)
    1801         self.auth.set_attribute(expid, expid)
    1802         # Override fedids can manipulate state as well
    1803         for o in self.overrides:
    1804             self.auth.set_attribute(o, expid)
    1805         self.auth.save()
    1806 
    1807         # Create a logger that logs to the experiment's state object as well as
    1808         # to the main log file.
    1809         alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
    1810         alloc_collector = self.list_log(self.state[eid]['log'])
    1811         h = logging.StreamHandler(alloc_collector)
    1812         # XXX: there should be a global one of these rather than repeating the
    1813         # code.
    1814         h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
    1815                     '%d %b %y %H:%M:%S'))
    1816         alloc_log.addHandler(h)
    1817        
    18181704        attrs = [
    18191705                {
     
    18331719                },
    18341720            ]
    1835 
    1836         # transit and disconnected testbeds may not have a connInfo entry.
    1837         # Fill in the blanks.
    1838         for t in allocated.keys():
    1839             if not connInfo.has_key(t):
    1840                 connInfo[t] = { }
     1721        return attrs
     1722
     1723
     1724    def get_vtopo(self, req, fid):
     1725        """
     1726        Return the stored virtual topology for this experiment
     1727        """
     1728        rv = None
     1729        state = None
     1730
     1731        req = req.get('VtopoRequestBody', None)
     1732        if not req:
     1733            raise service_error(service_error.req,
     1734                    "Bad request format (no VtopoRequestBody)")
     1735        exp = req.get('experiment', None)
     1736        if exp:
     1737            if exp.has_key('fedid'):
     1738                key = exp['fedid']
     1739                keytype = "fedid"
     1740            elif exp.has_key('localname'):
     1741                key = exp['localname']
     1742                keytype = "localname"
     1743            else:
     1744                raise service_error(service_error.req, "Unknown lookup type")
     1745        else:
     1746            raise service_error(service_error.req, "No request?")
     1747
     1748        self.check_experiment_access(fid, key)
     1749
     1750        self.state_lock.acquire()
     1751        if self.state.has_key(key):
     1752            if self.state[key].has_key('vtopo'):
     1753                rv = { 'experiment' : {keytype: key },\
     1754                        'vtopo': self.state[key]['vtopo'],\
     1755                    }
     1756            else:
     1757                state = self.state[key]['experimentStatus']
     1758        self.state_lock.release()
     1759
     1760        if rv: return rv
     1761        else:
     1762            if state:
     1763                raise service_error(service_error.partial,
     1764                        "Not ready: %s" % state)
     1765            else:
     1766                raise service_error(service_error.req, "No such experiment")
     1767
     1768    def get_vis(self, req, fid):
     1769        """
     1770        Return the stored visualization for this experiment
     1771        """
     1772        rv = None
     1773        state = None
     1774
     1775        req = req.get('VisRequestBody', None)
     1776        if not req:
     1777            raise service_error(service_error.req,
     1778                    "Bad request format (no VisRequestBody)")
     1779        exp = req.get('experiment', None)
     1780        if exp:
     1781            if exp.has_key('fedid'):
     1782                key = exp['fedid']
     1783                keytype = "fedid"
     1784            elif exp.has_key('localname'):
     1785                key = exp['localname']
     1786                keytype = "localname"
     1787            else:
     1788                raise service_error(service_error.req, "Unknown lookup type")
     1789        else:
     1790            raise service_error(service_error.req, "No request?")
     1791
     1792        self.check_experiment_access(fid, key)
     1793
     1794        self.state_lock.acquire()
     1795        if self.state.has_key(key):
     1796            if self.state[key].has_key('vis'):
     1797                rv =  { 'experiment' : {keytype: key },\
     1798                        'vis': self.state[key]['vis'],\
     1799                        }
     1800            else:
     1801                state = self.state[key]['experimentStatus']
     1802        self.state_lock.release()
     1803
     1804        if rv: return rv
     1805        else:
     1806            if state:
     1807                raise service_error(service_error.partial,
     1808                        "Not ready: %s" % state)
     1809            else:
     1810                raise service_error(service_error.req, "No such experiment")
     1811
     1812   
     1813    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
     1814            top):
     1815        """
     1816        Store the various data that have changed in the experiment state
     1817        between when it was started and the beginning of resource allocation.
     1818        This is basically the information about each local allocation.  This
     1819        fills in the values of the placeholder allocation in the state.
     1820        """
     1821        # save federant information
     1822        for k in allocated.keys():
     1823            tbparams[k]['federant'] = {
     1824                    'name': [ { 'localname' : eid} ],
     1825                    'allocID' : tbparams[k]['allocID'],
     1826                    'uri': tbparams[k]['uri'],
     1827                }
     1828
     1829        self.state_lock.acquire()
     1830        self.state[eid]['vtopo'] = vtopo
     1831        self.state[eid]['vis'] = vis
     1832        self.state[eid]['experimentdescription'] = \
     1833                { 'topdldescription': top.to_dict() }
     1834        self.state[eid]['federant'] = \
     1835                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
     1836                    if tbparams[tb].has_key('federant') ]
     1837        if self.state_filename:
     1838            self.write_state()
     1839        self.state_lock.release()
     1840
     1841    def clear_placeholder(self, eid, expid, tmpdir):
     1842        """
     1843        Clear the placeholder and remove any allocated temporary dir.
     1844        """
     1845
     1846        self.state_lock.acquire()
     1847        del self.state[eid]
     1848        del self.state[expid]
     1849        if self.state_filename: self.write_state()
     1850        self.state_lock.release()
     1851        if tmpdir and self.cleanup:
     1852            self.remove_dirs(tmpdir)
     1853
     1854    # end of create_experiment sub-functions
     1855
     1856    def create_experiment(self, req, fid):
     1857        """
     1858        The external interface to experiment creation called from the
     1859        dispatcher.
     1860
     1861        Creates a working directory, splits the incoming description using the
     1862        splitter script and parses out the various subsections using the
     1863        classes above.  Once each sub-experiment is created, use pooled threads
     1864        to instantiate them and start it all up.
     1865        """
     1866
     1867        req = req.get('CreateRequestBody', None)
     1868        if req:
     1869            key = self.get_experiment_key(req)
     1870        else:
     1871            raise service_error(service_error.req,
     1872                    "Bad request format (no CreateRequestBody)")
     1873
     1874        # Import information from the requester
     1875        if self.auth.import_credentials(data_list=req.get('credential', [])):
     1876            self.auth.save()
     1877
     1878        # Make sure that the caller can talk to us
     1879        self.check_experiment_access(fid, key)
     1880
     1881        # Install the testbed map entries supplied with the request into a copy
     1882        # of the testbed map.
     1883        tbmap = dict(self.tbmap)
     1884        for m in req.get('testbedmap', []):
     1885            if 'testbed' in m and 'uri' in m:
     1886                tbmap[m['testbed']] = m['uri']
     1887
     1888        # a place to work
     1889        try:
     1890            tmpdir = tempfile.mkdtemp(prefix="split-")
     1891            os.mkdir(tmpdir+"/keys")
     1892        except EnvironmentError:
     1893            raise service_error(service_error.internal, "Cannot create tmp dir")
     1894
     1895        tbparams = { }
     1896
     1897        eid, expid, expcert_file = \
     1898                self.get_experiment_ids_and_start(key, tmpdir)
     1899
     1900        # This catches exceptions to clear the placeholder if necessary
     1901        try:
     1902            if not (eid and expid):
     1903                raise service_error(service_error.internal,
     1904                        "Cannot find local experiment info!?")
     1905
     1906            top = self.get_topology(req, tmpdir)
     1907            # Assign the IPs
     1908            hosts, ip_allocator = self.allocate_ips_to_topo(top)
     1909            # Find the testbeds to look up
     1910            tb_hosts = { }
     1911            testbeds = [ ]
     1912            for e in top.elements:
     1913                if isinstance(e, topdl.Computer):
     1914                    tb = e.get_attribute('testbed') or 'default'
     1915                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
     1916                    else:
     1917                        tb_hosts[tb] = [ e.name ]
     1918                        testbeds.append(tb)
     1919
     1920            masters, pmasters = self.get_testbed_services(req)
     1921            allocated = { }         # Testbeds we can access
     1922            topo ={ }               # Sub topologies
     1923            connInfo = { }          # Connection information
     1924
     1925            self.get_access_to_testbeds(testbeds, fid, allocated,
     1926                    tbparams, masters, tbmap, expid, expcert_file)
     1927
     1928            self.split_topology(top, topo, testbeds)
     1929
     1930            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
     1931
     1932            part = experiment_partition(self.auth, self.store_url, tbmap,
     1933                    self.muxmax, self.direct_transit)
     1934            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
     1935                    connInfo, expid)
     1936            # Now get access to the dynamic testbeds (those added above)
     1937            for tb in [ t for t in topo if t not in allocated]:
     1938                self.get_access(tb, tbparams, fid, masters, tbmap,
     1939                        expid, expcert_file)
     1940                allocated[tb] = 1
     1941                store_keys = topo[tb].get_attribute('store_keys')
     1942                # Give the testbed access to keys it exports or imports
     1943                if store_keys:
     1944                    for sk in store_keys.split(" "):
     1945                        self.auth.set_attribute(\
     1946                                tbparams[tb]['allocID']['fedid'], sk)
     1947            self.auth.save()
     1948
     1949            # transit and disconnected testbeds may not have a connInfo entry.
     1950            # Fill in the blanks.
     1951            for t in allocated.keys():
     1952                if not connInfo.has_key(t):
     1953                    connInfo[t] = { }
     1954
     1955            self.wrangle_software(expid, top, topo, tbparams)
     1956
     1957            vtopo = topdl.topology_to_vtopo(top)
     1958            vis = self.genviz(vtopo)
     1959            self.save_federant_information(allocated, tbparams, eid, vtopo,
     1960                    vis, top)
     1961        except service_error, e:
     1962            # If something goes wrong in the parse (usually an access error)
     1963            # clear the placeholder state.  From here on out the code delays
     1964            # exceptions.  Failing at this point returns a fault to the remote
     1965            # caller.
     1966            self.clear_placeholder(eid, expid, tmpdir)
     1967            raise e
     1968
     1969        # Start the background swapper and return the starting state.  From
     1970        # here on out, the state will stick around a while.
     1971
     1972        # Let users touch the state
     1973        self.auth.set_attribute(fid, expid)
     1974        self.auth.set_attribute(expid, expid)
     1975        # Override fedids can manipulate state as well
     1976        for o in self.overrides:
     1977            self.auth.set_attribute(o, expid)
     1978        self.auth.save()
     1979
     1980        # Create a logger that logs to the experiment's state object as well as
     1981        # to the main log file.
     1982        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
     1983        alloc_collector = self.list_log(self.state[eid]['log'])
     1984        h = logging.StreamHandler(alloc_collector)
     1985        # XXX: there should be a global one of these rather than repeating the
     1986        # code.
     1987        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     1988                    '%d %b %y %H:%M:%S'))
     1989        alloc_log.addHandler(h)
    18411990
    18421991        # Start a thread to do the resource allocation
     
    19052054
    19062055    def get_handler(self, path, fid):
     2056        """
     2057        Perhaps surprisingly named, this function handles HTTP GET requests to
     2058        this server (SOAP requests are POSTs).
     2059        """
    19072060        self.log.info("Get handler %s %s" % (path, fid))
    19082061        if self.auth.check_attribute(fid, path):
     
    19102063        else:
    19112064            return (None, None)
    1912 
    1913     def get_vtopo(self, req, fid):
    1914         """
    1915         Return the stored virtual topology for this experiment
    1916         """
    1917         rv = None
    1918         state = None
    1919 
    1920         req = req.get('VtopoRequestBody', None)
    1921         if not req:
    1922             raise service_error(service_error.req,
    1923                     "Bad request format (no VtopoRequestBody)")
    1924         exp = req.get('experiment', None)
    1925         if exp:
    1926             if exp.has_key('fedid'):
    1927                 key = exp['fedid']
    1928                 keytype = "fedid"
    1929             elif exp.has_key('localname'):
    1930                 key = exp['localname']
    1931                 keytype = "localname"
    1932             else:
    1933                 raise service_error(service_error.req, "Unknown lookup type")
    1934         else:
    1935             raise service_error(service_error.req, "No request?")
    1936 
    1937         self.check_experiment_access(fid, key)
    1938 
    1939         self.state_lock.acquire()
    1940         if self.state.has_key(key):
    1941             if self.state[key].has_key('vtopo'):
    1942                 rv = { 'experiment' : {keytype: key },\
    1943                         'vtopo': self.state[key]['vtopo'],\
    1944                     }
    1945             else:
    1946                 state = self.state[key]['experimentStatus']
    1947         self.state_lock.release()
    1948 
    1949         if rv: return rv
    1950         else:
    1951             if state:
    1952                 raise service_error(service_error.partial,
    1953                         "Not ready: %s" % state)
    1954             else:
    1955                 raise service_error(service_error.req, "No such experiment")
    1956 
    1957     def get_vis(self, req, fid):
    1958         """
    1959         Return the stored visualization for this experiment
    1960         """
    1961         rv = None
    1962         state = None
    1963 
    1964         req = req.get('VisRequestBody', None)
    1965         if not req:
    1966             raise service_error(service_error.req,
    1967                     "Bad request format (no VisRequestBody)")
    1968         exp = req.get('experiment', None)
    1969         if exp:
    1970             if exp.has_key('fedid'):
    1971                 key = exp['fedid']
    1972                 keytype = "fedid"
    1973             elif exp.has_key('localname'):
    1974                 key = exp['localname']
    1975                 keytype = "localname"
    1976             else:
    1977                 raise service_error(service_error.req, "Unknown lookup type")
    1978         else:
    1979             raise service_error(service_error.req, "No request?")
    1980 
    1981         self.check_experiment_access(fid, key)
    1982 
    1983         self.state_lock.acquire()
    1984         if self.state.has_key(key):
    1985             if self.state[key].has_key('vis'):
    1986                 rv =  { 'experiment' : {keytype: key },\
    1987                         'vis': self.state[key]['vis'],\
    1988                         }
    1989             else:
    1990                 state = self.state[key]['experimentStatus']
    1991         self.state_lock.release()
    1992 
    1993         if rv: return rv
    1994         else:
    1995             if state:
    1996                 raise service_error(service_error.partial,
    1997                         "Not ready: %s" % state)
    1998             else:
    1999                 raise service_error(service_error.req, "No such experiment")
    20002065
    20012066    def clean_info_response(self, rv):
     
    20872152        return rv
    20882153
    2089     def remove_dirs(self, dir):
    2090         """
    2091         Remove the directory tree and all files rooted at dir.  Log any errors,
    2092         but continue.
    2093         """
    2094         self.log.debug("[removedirs]: removing %s" % dir)
     2154    def check_termination_status(self, fed_exp, force):
     2155        """
     2156        Confirm that the experiment is sin a valid state to stop (or force it)
     2157        return the state - invalid states for deletion and force settings cause
     2158        exceptions.
     2159        """
     2160        self.state_lock.acquire()
     2161        status = fed_exp.get('experimentStatus', None)
     2162
     2163        if status:
     2164            if status in ('starting', 'terminating'):
     2165                if not force:
     2166                    self.state_lock.release()
     2167                    raise service_error(service_error.partial,
     2168                            'Experiment still being created or destroyed')
     2169                else:
     2170                    self.log.warning('Experiment in %s state ' % status + \
     2171                            'being terminated by force.')
     2172            self.state_lock.release()
     2173            return status
     2174        else:
     2175            # No status??? trouble
     2176            self.state_lock.release()
     2177            raise service_error(service_error.internal,
     2178                    "Experiment has no status!?")
     2179
     2180
     2181    def get_termination_info(self, fed_exp):
     2182        ids = []
     2183        term_params = { }
     2184        self.state_lock.acquire()
     2185        #  experimentID is a list of dicts that are self-describing
     2186        #  identifiers.  This finds all the fedids and localnames - the
     2187        #  keys of self.state - and puts them into ids, which is used to delete
     2188        #  the state after everything is swapped out.
     2189        for id in fed_exp.get('experimentID', []):
     2190            if 'fedid' in id:
     2191                ids.append(id['fedid'])
     2192                repo = "%s" % id['fedid']
     2193            if 'localname' in id: ids.append(id['localname'])
     2194
     2195        # Get the experimentAccess - the principal for this experiment.  It
     2196        # is this principal to which credentials have been delegated, and
     2197        # as which the experiment controller must act.
     2198        if 'experimentAccess' in fed_exp and \
     2199                'X509' in fed_exp['experimentAccess']:
     2200            expcert = fed_exp['experimentAccess']['X509']
     2201        else:
     2202            expcert = None
     2203
     2204        # Collect the allocation/segment ids into a dict keyed by the fedid
     2205        # of the allocation (or a monotonically increasing integer) that
     2206        # contains a tuple of uri, aid (which is a dict...)
     2207        for i, fed in enumerate(fed_exp.get('federant', [])):
     2208            try:
     2209                uri = fed['uri']
     2210                aid = fed['allocID']
     2211                k = fed['allocID'].get('fedid', i)
     2212            except KeyError, e:
     2213                continue
     2214            term_params[k] = (uri, aid)
     2215        # Change the experiment state
     2216        fed_exp['experimentStatus'] = 'terminating'
     2217        if self.state_filename: self.write_state()
     2218        self.state_lock.release()
     2219
     2220        return ids, term_params, expcert, repo
     2221
     2222
     2223    def deallocate_resources(self, term_params, expcert, status, force,
     2224            dealloc_log):
     2225        tmpdir = None
     2226        # This try block makes sure the tempdir is cleared
    20952227        try:
    2096             for path, dirs, files in os.walk(dir, topdown=False):
    2097                 for f in files:
    2098                     os.remove(os.path.join(path, f))
    2099                 for d in dirs:
    2100                     os.rmdir(os.path.join(path, d))
    2101             os.rmdir(dir)
    2102         except EnvironmentError, e:
    2103             self.log.error("Error deleting directory tree in %s" % e);
    2104 
    2105     @staticmethod
    2106     def make_temp_certfile(expcert, tmpdir):
    2107         """
    2108         make a protected copy of the access certificate so the experiment
    2109         controller can act as the experiment principal.  mkstemp is the most
    2110         secure way to do that. The directory should be created by
    2111         mkdtemp.  Return the filename.
    2112         """
    2113         if expcert and tmpdir:
    2114             try:
    2115                 certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
    2116                 f = os.fdopen(certf, 'w')
    2117                 print >> f, expcert
    2118                 f.close()
    2119             except EnvironmentError, e:
    2120                 raise service_error(service_error.internal,
    2121                         "Cannot create temp cert file?")
    2122             return certfn
    2123         else:
    2124             return None
     2228            # If no expcert, try the deallocation as the experiment
     2229            # controller instance.
     2230            if expcert and self.auth_type != 'legacy':
     2231                try:
     2232                    tmpdir = tempfile.mkdtemp(prefix="term-")
     2233                except EnvironmentError:
     2234                    raise service_error(service_error.internal,
     2235                            "Cannot create tmp dir")
     2236                cert_file = self.make_temp_certfile(expcert, tmpdir)
     2237                pw = None
     2238            else:
     2239                cert_file = self.cert_file
     2240                pw = self.cert_pwd
     2241
     2242            # Stop everyone.  NB, wait_for_all waits until a thread starts
     2243            # and then completes, so we can't wait if nothing starts.  So,
     2244            # no tbparams, no start.
     2245            if len(term_params) > 0:
     2246                tp = thread_pool(self.nthreads)
     2247                for k, (uri, aid) in term_params.items():
     2248                    # Create and start a thread to stop the segment
     2249                    tp.wait_for_slot()
     2250                    t  = pooled_thread(\
     2251                            target=self.terminate_segment(log=dealloc_log,
     2252                                testbed=uri,
     2253                                cert_file=cert_file,
     2254                                cert_pwd=pw,
     2255                                trusted_certs=self.trusted_certs,
     2256                                caller=self.call_TerminateSegment),
     2257                            args=(uri, aid), name=k,
     2258                            pdata=tp, trace_file=self.trace_file)
     2259                    t.start()
     2260                # Wait for completions
     2261                tp.wait_for_all_done()
     2262
     2263            # release the allocations (failed experiments have done this
     2264            # already, and starting experiments may be in odd states, so we
     2265            # ignore errors releasing those allocations
     2266            try:
     2267                for k, (uri, aid)  in term_params.items():
     2268                    self.release_access(None, aid, uri=uri,
     2269                            cert_file=cert_file, cert_pwd=pw)
     2270            except service_error, e:
     2271                if status != 'failed' and not force:
     2272                    raise e
     2273
     2274        # Clean up the tmpdir no matter what
     2275        finally:
     2276            if tmpdir: self.remove_dirs(tmpdir)
     2277
    21252278
    21262279    def terminate_experiment(self, req, fid):
     
    21342287            raise service_error(service_error.req,
    21352288                    "Bad request format (no TerminateRequestBody)")
     2289
     2290        key = self.get_experiment_key(req, 'experiment')
     2291        self.check_experiment_access(fid, key)
     2292        exp = req.get('experiment', False)
    21362293        force = req.get('force', False)
    2137         exp = req.get('experiment', None)
    2138         if exp:
    2139             if exp.has_key('fedid'):
    2140                 key = exp['fedid']
    2141                 keytype = "fedid"
    2142             elif exp.has_key('localname'):
    2143                 key = exp['localname']
    2144                 keytype = "localname"
    2145             else:
    2146                 raise service_error(service_error.req, "Unknown lookup type")
    2147         else:
    2148             raise service_error(service_error.req, "No request?")
    2149 
    2150         self.check_experiment_access(fid, key)
    21512294
    21522295        dealloc_list = [ ]
     
    21652308        self.state_lock.acquire()
    21662309        fed_exp = self.state.get(key, None)
     2310        self.state_lock.release()
    21672311        repo = None
    21682312
    21692313        if fed_exp:
    2170             # This branch of the conditional holds the lock to generate a
    2171             # consistent temporary tbparams variable to deallocate experiments.
    2172             # It releases the lock to do the deallocations and reacquires it to
    2173             # remove the experiment state when the termination is complete.
    2174 
    2175             # First make sure that the experiment creation is complete.
    2176             status = fed_exp.get('experimentStatus', None)
    2177 
    2178             if status:
    2179                 if status in ('starting', 'terminating'):
    2180                     if not force:
    2181                         self.state_lock.release()
    2182                         raise service_error(service_error.partial,
    2183                                 'Experiment still being created or destroyed')
    2184                     else:
    2185                         self.log.warning('Experiment in %s state ' % status + \
    2186                                 'being terminated by force.')
    2187             else:
    2188                 # No status??? trouble
    2189                 self.state_lock.release()
    2190                 raise service_error(service_error.internal,
    2191                         "Experiment has no status!?")
    2192 
    2193             ids = []
    2194             #  experimentID is a list of dicts that are self-describing
    2195             #  identifiers.  This finds all the fedids and localnames - the
    2196             #  keys of self.state - and puts them into ids.
    2197             for id in fed_exp.get('experimentID', []):
    2198                 if id.has_key('fedid'):
    2199                     ids.append(id['fedid'])
    2200                     repo = "%s" % id['fedid']
    2201                 if id.has_key('localname'): ids.append(id['localname'])
    2202 
    2203             # Get the experimentAccess - the principal for this experiment.  It
    2204             # is this principal to which credentials have been delegated, and
    2205             # as which the experiment controller must act.
    2206             if 'experimentAccess' in self.state[key] and \
    2207                     'X509' in self.state[key]['experimentAccess']:
    2208                 expcert = self.state[key]['experimentAccess']['X509']
    2209             else:
    2210                 expcert = None
    2211             # Collect the allocation/segment ids into a dict keyed by the fedid
    2212             # of the allocation (or a monotonically increasing integer) that
    2213             # contains a tuple of uri, aid (which is a dict...)
    2214             for i, fed in enumerate(fed_exp.get('federant', [])):
    2215                 try:
    2216                     uri = fed['uri']
    2217                     aid = fed['allocID']
    2218                     k = fed['allocID'].get('fedid', i)
    2219                 except KeyError, e:
    2220                     continue
    2221                 tbparams[k] = (uri, aid)
    2222             fed_exp['experimentStatus'] = 'terminating'
    2223             if self.state_filename: self.write_state()
    2224             self.state_lock.release()
    2225 
    2226             try:
    2227                 tmpdir = tempfile.mkdtemp(prefix="split-")
    2228             except EnvironmentError:
    2229                 raise service_error(service_error.internal,
    2230                         "Cannot create tmp dir")
    2231             # This try block makes sure the tempdir is cleared
    2232             try:
    2233                 # If no expcert, try the deallocation as the experiment
    2234                 # controller instance.
    2235                 if expcert and self.auth_type != 'legacy':
    2236                     cert_file = self.make_temp_certfile(expcert, tmpdir)
    2237                     pw = None
    2238                 else:
    2239                     cert_file = self.cert_file
    2240                     pw = self.cert_pwd
    2241 
    2242                 # Stop everyone.  NB, wait_for_all waits until a thread starts
    2243                 # and then completes, so we can't wait if nothing starts.  So,
    2244                 # no tbparams, no start.
    2245                 if len(tbparams) > 0:
    2246                     tp = thread_pool(self.nthreads)
    2247                     for k in tbparams.keys():
    2248                         # Create and start a thread to stop the segment
    2249                         tp.wait_for_slot()
    2250                         uri, aid = tbparams[k]
    2251                         t  = pooled_thread(\
    2252                                 target=self.terminate_segment(log=dealloc_log,
    2253                                     testbed=uri,
    2254                                     cert_file=cert_file,
    2255                                     cert_pwd=pw,
    2256                                     trusted_certs=self.trusted_certs,
    2257                                     caller=self.call_TerminateSegment),
    2258                                 args=(uri, aid), name=k,
    2259                                 pdata=tp, trace_file=self.trace_file)
    2260                         t.start()
    2261                     # Wait for completions
    2262                     tp.wait_for_all_done()
    2263 
    2264                 # release the allocations (failed experiments have done this
    2265                 # already, and starting experiments may be in odd states, so we
    2266                 # ignore errors releasing those allocations
    2267                 try:
    2268                     for k in tbparams.keys():
    2269                         # This releases access by uri
    2270                         uri, aid = tbparams[k]
    2271                         self.release_access(None, aid, uri=uri,
    2272                                 cert_file=cert_file, cert_pwd=pw)
    2273                 except service_error, e:
    2274                     if status != 'failed' and not force:
    2275                         raise e
    2276 
    2277             # Clean up the tmpdir no matter what
    2278             finally:
    2279                 self.remove_dirs(tmpdir)
     2314            status = self.check_termination_status(fed_exp, force)
     2315            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
     2316            self.deallocate_resources(term_params, expcert, status, force,
     2317                    dealloc_log)
    22802318
    22812319            # Remove the terminated experiment
    22822320            self.state_lock.acquire()
    22832321            for id in ids:
    2284                 if self.state.has_key(id): del self.state[id]
     2322                if id in self.state: del self.state[id]
    22852323
    22862324            if self.state_filename: self.write_state()
     
    23052343            return {
    23062344                    'experiment': exp ,
    2307                     'deallocationLog': "".join(dealloc_list),
     2345                    'deallocationLog': string.join(dealloc_list, ''),
    23082346                    }
    23092347        else:
    2310             # Don't forget to release the lock
    2311             self.state_lock.release()
    23122348            raise service_error(service_error.req, "No saved state")
    23132349
     
    23222358                    "Bad request format (no GetValueRequestBody)")
    23232359       
    2324         name = req['name']
    2325         wait = req['wait']
     2360        name = req.get('name', None)
     2361        wait = req.get('wait', False)
    23262362        rv = { 'name': name }
    23272363
    2328         if self.auth.check_attribute(fid, name):
     2364        if name and self.auth.check_attribute(fid, name):
    23292365            self.log.debug("[GetValue] asking for %s " % name)
    23302366            try:
     
    23512387                    "Bad request format (no SetValueRequestBody)")
    23522388       
    2353         name = req['name']
    2354         v = req['value']
    2355 
    2356         if self.auth.check_attribute(fid, name):
     2389        name = req.get('name', None)
     2390        v = req.get('value', '')
     2391
     2392        if name and self.auth.check_attribute(fid, name):
    23572393            try:
    23582394                self.synch_store.set_value(name, v)
Note: See TracChangeset for help on using the changeset viewer.