Changeset a97394b for fedd


Ignore:
Timestamp:
Oct 7, 2008 11:03:50 AM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
8ecfbad
Parents:
eee2b2e
Message:

beginnings of a real multithreaded server

Location:
fedd
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd.py

    reee2b2e ra97394b  
    1717from fedd_deter_impl import new_feddservice
    1818from service_error import *
     19
     20from threading import *
    1921
    2022# The SSL server here is based on the implementation described at
     
    166168                version="0.1")
    167169
    168         self.set_defaults(host="localhost", port=23235,
    169                 transport="soap",debug=0)
     170        self.set_defaults(host="localhost", port=23235, transport="soap",
     171                debug=0)
    170172
    171173        self.add_option("-d", "--debug", action="count", dest="debug",
     
    177179        self.add_option("-p", "--port", action="store", type="int",
    178180                dest="port", help="Port to listen on (default %default)")
     181        self.add_option("-s", "--service", action="append", type="string",
     182                dest="services",
     183                help="Service description: host:port:transport")
    179184        self.add_option("-x","--transport", action="store", type="choice",
    180185                choices=("xmlrpc", "soap"),
     
    182187        self.add_option("--trace", action="store_const", dest="tracefile",
    183188                const=sys.stderr, help="Print SOAP exchange to stderr")
     189
     190def run_server(s):
     191    if s: s.serve_forever()
     192
     193services = [ ]
     194servers = [ ]
    184195
    185196opts, args = fedd_opts().parse_args()
     
    209220            raise
    210221
    211 if opts.transport == "soap":
    212     s = fedd_server(SOAP_port, fedd_soap_handler, ctx, impl)
    213 elif opts.transport == "xmlrpc":
    214     s = fedd_server(SOAP_port, fedd_xmlrpc_handler, ctx, impl)
     222if opts.services:
     223    for s in opts.services:
     224        h, p, t  = s.split(':')
     225
     226        if not h: h = opts.host
     227        if not p: p = opts.port
     228        if not t: h = opts.transport
     229
     230        p = int(p)
     231
     232        services.append((t, (h, p)))
    215233else:
    216     s = None
    217 
    218 s.serve_forever()
     234    services.append((opts.transport, (opts.host, opts.port)))
     235
     236for s in services:
     237    if s[0] == "soap":
     238        servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl))
     239    elif s[0] == "xmlrpc":
     240        servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl))
     241    else: print >>sys.stderr, "Unknown transport: %s" % s[0]
     242
     243for s in servers:
     244    t = Thread(target=run_server, args=(s,))
     245    t.start()
  • fedd/fedd_experiment_control.py

    reee2b2e ra97394b  
    147147        self.state = { }
    148148        self.state_filename = config.experiment_state_file
     149        self.state_lock = Lock()
    149150        self.tclsh = "/usr/local/bin/otclsh"
    150151        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
     
    252253        d.close()
    253254
     255    # Call while holding self.state_lock
    254256    def write_state(self):
    255257        if os.access(self.state_filename, os.W_OK):
     
    265267            print >>sys.stderr, "Pickling problem: %s" % e
    266268
     269    # Call while holding self.state_lock
    267270    def read_state(self):
    268271        try:
     
    12161219                req['experimentID'].has_key('localname'):
    12171220            eid = req['experimentID']['localname']
     1221            self.state_lock.acquire()
    12181222            while (self.state.has_key(eid)):
    12191223                eid += random.choice(string.ascii_letters)
     1224            self.state[eid] = "placeholder"
     1225            self.state_lock.release()
    12201226        else:
    12211227            eid = self.exp_stem
    12221228            for i in range(0,5):
    12231229                eid += random.choice(string.ascii_letters)
     1230            self.state_lock.acquire()
    12241231            while (self.state.has_key(eid)):
    12251232                eid = self.exp_stem
    12261233                for i in range(0,5):
    12271234                    eid += random.choice(string.ascii_letters)
     1235            self.state[eid] = "placeholder"
     1236            self.state_lock.release()
    12281237
    12291238        try:
     
    13511360                failed.append(master)
    13521361
    1353         # If one failed clean up
    1354         if len(failed) > 0:
    1355             succeeded = [tb for tb in allocated.keys() if tb not in failed]
    1356             if fail_soft:
    1357                 raise service_error(service_error.partial, \
    1358                         "Partial swap in on %s" % ",".join(succeeded))
    1359             else:
     1362        succeeded = [tb for tb in allocated.keys() if tb not in failed]
     1363        # If one failed clean up, unless fail_soft is set
     1364        if failed:
     1365            if not fail_soft:
    13601366                for tb in succeeded:
    13611367                    self.stop_segment(tb, eid, tbparams)
     1368                # Remove the placeholder
     1369                self.state_lock.acquire()
     1370                del self.state[eid]
     1371                self.state_lock.release()
     1372
    13621373                raise service_error(service_error.federant,
    13631374                    "Swap in failed on %s" % ",".join(failed))
     
    13941405                    'experimentAccess': { 'X509' : expcert },\
    13951406                }
    1396        
     1407
     1408        self.state_lock.acquire()
    13971409        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
    13981410                for tb in tbparams.keys() \
     
    14061418        self.state[eid] = self.state[expid]
    14071419        if self.state_filename: self.write_state()
    1408         return resp
     1420        self.state_lock.release()
     1421
     1422        if not failed:
     1423            return resp
     1424        else:
     1425            raise service_error(service_error.partial, \
     1426                    "Partial swap in on %s" % ",".join(succeeded))
     1427
    14091428
    14101429    def get_vtopo(self, req, fid):
     1430        rv = None
    14111431
    14121432        req = req.get('VtopoRequestBody', None)
     
    14271447            raise service_error(service_error.req, "No request?")
    14281448
     1449        self.state_lock.acquire()
    14291450        if self.state.has_key(key):
    1430             return { 'experiment' : {keytype: key },\
     1451            rv = { 'experiment' : {keytype: key },\
    14311452                    'vtopo': self.state[key]['vtopo'],\
    1432                     }
    1433         else:
    1434             raise service_error(service_error.req, "No such experiment")
     1453                }
     1454        self.state_lock.release()
     1455
     1456        if rv: return rv
     1457        else: raise service_error(service_error.req, "No such experiment")
    14351458
    14361459    def get_vis(self, req, fid):
     1460        rv = None
    14371461
    14381462        req = req.get('VisRequestBody', None)
     
    14531477            raise service_error(service_error.req, "No request?")
    14541478
     1479        self.state_lock.acquire()
    14551480        if self.state.has_key(key):
    1456             return { 'experiment' : {keytype: key },\
     1481            rv = { 'experiment' : {keytype: key },\
    14571482                    'vis': self.state[key]['vis'],\
    14581483                    }
    1459         else:
    1460             raise service_error(service_error.req, "No such experiment")
     1484        self.state_lock.release()
     1485
     1486        if rv: return rv
     1487        else: raise service_error(service_error.req, "No such experiment")
    14611488
    14621489    def get_info(self, req, fid):
     1490        rv = None
    14631491
    14641492        req = req.get('InfoRequestBody', None)
     
    14821510        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
    14831511        # state.
     1512        self.state_lock.acquire()
    14841513        if self.state.has_key(key):
    1485             return copy.deepcopy(self.state[key])
    1486         else:
    1487             raise service_error(service_error.req, "No such experiment")
    1488 
     1514            rv = copy.deepcopy(self.state[key])
     1515        self.state_lock.release()
     1516
     1517        if rv: return rv
     1518        else: raise service_error(service_error.req, "No such experiment")
    14891519
    14901520
     
    15081538            raise service_error(service_error.req, "No request?")
    15091539
     1540        self.state_lock.acquire()
    15101541        fed_exp = self.state.get(key, None)
    15111542
    15121543        if fed_exp:
     1544            # This branch of the conditional holds the lock to generate a
     1545            # consistent temporary tbparams variable to deallocate experiments.
     1546            # It releases the lock to do the deallocations and reacquires it to
     1547            # remove the experiment state when the termination is complete.
    15131548            ids = []
    15141549            #  experimentID is a list of dicts that are self-describing
     
    15461581                        'eid': eid,\
    15471582                    }
     1583            self.state_lock.release()
     1584
    15481585            # Stop everyone.
    15491586            for tb in tbparams.keys():
    15501587                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
    15511588
     1589            # Remove teh terminated experiment
     1590            self.state_lock.acquire()
    15521591            for id in ids:
    15531592                if self.state.has_key(id): del self.state[id]
    15541593
    15551594            if self.state_filename: self.write_state()
     1595            self.state_lock.release()
     1596
    15561597            return { 'experiment': exp }
    15571598        else:
     1599            # Don't forget to release the lock
     1600            self.state_lock.release()
    15581601            raise service_error(service_error.req, "No saved state")
    15591602
Note: See TracChangeset for help on using the changeset viewer.