Ignore:
Timestamp:
Oct 7, 2008 11:03:50 AM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
8ecfbad
Parents:
eee2b2e
Message:

beginnings of a real multithreaded server

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd_experiment_control.py

    reee2b2e ra97394b  
    147147        self.state = { }
    148148        self.state_filename = config.experiment_state_file
     149        self.state_lock = Lock()
    149150        self.tclsh = "/usr/local/bin/otclsh"
    150151        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
     
    252253        d.close()
    253254
     255    # Call while holding self.state_lock
    254256    def write_state(self):
    255257        if os.access(self.state_filename, os.W_OK):
     
    265267            print >>sys.stderr, "Pickling problem: %s" % e
    266268
     269    # Call while holding self.state_lock
    267270    def read_state(self):
    268271        try:
     
    12161219                req['experimentID'].has_key('localname'):
    12171220            eid = req['experimentID']['localname']
     1221            self.state_lock.acquire()
    12181222            while (self.state.has_key(eid)):
    12191223                eid += random.choice(string.ascii_letters)
     1224            self.state[eid] = "placeholder"
     1225            self.state_lock.release()
    12201226        else:
    12211227            eid = self.exp_stem
    12221228            for i in range(0,5):
    12231229                eid += random.choice(string.ascii_letters)
     1230            self.state_lock.acquire()
    12241231            while (self.state.has_key(eid)):
    12251232                eid = self.exp_stem
    12261233                for i in range(0,5):
    12271234                    eid += random.choice(string.ascii_letters)
     1235            self.state[eid] = "placeholder"
     1236            self.state_lock.release()
    12281237
    12291238        try:
     
    13511360                failed.append(master)
    13521361
    1353         # If one failed clean up
    1354         if len(failed) > 0:
    1355             succeeded = [tb for tb in allocated.keys() if tb not in failed]
    1356             if fail_soft:
    1357                 raise service_error(service_error.partial, \
    1358                         "Partial swap in on %s" % ",".join(succeeded))
    1359             else:
     1362        succeeded = [tb for tb in allocated.keys() if tb not in failed]
     1363        # If one failed clean up, unless fail_soft is set
     1364        if failed:
     1365            if not fail_soft:
    13601366                for tb in succeeded:
    13611367                    self.stop_segment(tb, eid, tbparams)
     1368                # Remove the placeholder
     1369                self.state_lock.acquire()
     1370                del self.state[eid]
     1371                self.state_lock.release()
     1372
    13621373                raise service_error(service_error.federant,
    13631374                    "Swap in failed on %s" % ",".join(failed))
     
    13941405                    'experimentAccess': { 'X509' : expcert },\
    13951406                }
    1396        
     1407
     1408        self.state_lock.acquire()
    13971409        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
    13981410                for tb in tbparams.keys() \
     
    14061418        self.state[eid] = self.state[expid]
    14071419        if self.state_filename: self.write_state()
    1408         return resp
     1420        self.state_lock.release()
     1421
     1422        if not failed:
     1423            return resp
     1424        else:
     1425            raise service_error(service_error.partial, \
     1426                    "Partial swap in on %s" % ",".join(succeeded))
     1427
    14091428
    14101429    def get_vtopo(self, req, fid):
     1430        rv = None
    14111431
    14121432        req = req.get('VtopoRequestBody', None)
     
    14271447            raise service_error(service_error.req, "No request?")
    14281448
     1449        self.state_lock.acquire()
    14291450        if self.state.has_key(key):
    1430             return { 'experiment' : {keytype: key },\
     1451            rv = { 'experiment' : {keytype: key },\
    14311452                    'vtopo': self.state[key]['vtopo'],\
    1432                     }
    1433         else:
    1434             raise service_error(service_error.req, "No such experiment")
     1453                }
     1454        self.state_lock.release()
     1455
     1456        if rv: return rv
     1457        else: raise service_error(service_error.req, "No such experiment")
    14351458
    14361459    def get_vis(self, req, fid):
     1460        rv = None
    14371461
    14381462        req = req.get('VisRequestBody', None)
     
    14531477            raise service_error(service_error.req, "No request?")
    14541478
     1479        self.state_lock.acquire()
    14551480        if self.state.has_key(key):
    1456             return { 'experiment' : {keytype: key },\
     1481            rv = { 'experiment' : {keytype: key },\
    14571482                    'vis': self.state[key]['vis'],\
    14581483                    }
    1459         else:
    1460             raise service_error(service_error.req, "No such experiment")
     1484        self.state_lock.release()
     1485
     1486        if rv: return rv
     1487        else: raise service_error(service_error.req, "No such experiment")
    14611488
    14621489    def get_info(self, req, fid):
     1490        rv = None
    14631491
    14641492        req = req.get('InfoRequestBody', None)
     
    14821510        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
    14831511        # state.
     1512        self.state_lock.acquire()
    14841513        if self.state.has_key(key):
    1485             return copy.deepcopy(self.state[key])
    1486         else:
    1487             raise service_error(service_error.req, "No such experiment")
    1488 
     1514            rv = copy.deepcopy(self.state[key])
     1515        self.state_lock.release()
     1516
     1517        if rv: return rv
     1518        else: raise service_error(service_error.req, "No such experiment")
    14891519
    14901520
     
    15081538            raise service_error(service_error.req, "No request?")
    15091539
     1540        self.state_lock.acquire()
    15101541        fed_exp = self.state.get(key, None)
    15111542
    15121543        if fed_exp:
     1544            # This branch of the conditional holds the lock to generate a
     1545            # consistent temporary tbparams variable to deallocate experiments.
     1546            # It releases the lock to do the deallocations and reacquires it to
     1547            # remove the experiment state when the termination is complete.
    15131548            ids = []
    15141549            #  experimentID is a list of dicts that are self-describing
     
    15461581                        'eid': eid,\
    15471582                    }
     1583            self.state_lock.release()
     1584
    15481585            # Stop everyone.
    15491586            for tb in tbparams.keys():
    15501587                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
    15511588
     1589            # Remove teh terminated experiment
     1590            self.state_lock.acquire()
    15521591            for id in ids:
    15531592                if self.state.has_key(id): del self.state[id]
    15541593
    15551594            if self.state_filename: self.write_state()
     1595            self.state_lock.release()
     1596
    15561597            return { 'experiment': exp }
    15571598        else:
     1599            # Don't forget to release the lock
     1600            self.state_lock.release()
    15581601            raise service_error(service_error.req, "No saved state")
    15591602
Note: See TracChangeset for help on using the changeset viewer.