Changeset a97394b
- Timestamp:
- Oct 7, 2008 11:03:50 AM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 8ecfbad
- Parents:
- eee2b2e
- Location:
- fedd
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/fedd.py
reee2b2e ra97394b 17 17 from fedd_deter_impl import new_feddservice 18 18 from service_error import * 19 20 from threading import * 19 21 20 22 # The SSL server here is based on the implementation described at … … 166 168 version="0.1") 167 169 168 self.set_defaults(host="localhost", port=23235, 169 transport="soap",debug=0)170 self.set_defaults(host="localhost", port=23235, transport="soap", 171 debug=0) 170 172 171 173 self.add_option("-d", "--debug", action="count", dest="debug", … … 177 179 self.add_option("-p", "--port", action="store", type="int", 178 180 dest="port", help="Port to listen on (default %default)") 181 self.add_option("-s", "--service", action="append", type="string", 182 dest="services", 183 help="Service description: host:port:transport") 179 184 self.add_option("-x","--transport", action="store", type="choice", 180 185 choices=("xmlrpc", "soap"), … … 182 187 self.add_option("--trace", action="store_const", dest="tracefile", 183 188 const=sys.stderr, help="Print SOAP exchange to stderr") 189 190 def run_server(s): 191 if s: s.serve_forever() 192 193 services = [ ] 194 servers = [ ] 184 195 185 196 opts, args = fedd_opts().parse_args() … … 209 220 raise 210 221 211 if opts.transport == "soap": 212 s = fedd_server(SOAP_port, fedd_soap_handler, ctx, impl) 213 elif opts.transport == "xmlrpc": 214 s = fedd_server(SOAP_port, fedd_xmlrpc_handler, ctx, impl) 222 if opts.services: 223 for s in opts.services: 224 h, p, t = s.split(':') 225 226 if not h: h = opts.host 227 if not p: p = opts.port 228 if not t: h = opts.transport 229 230 p = int(p) 231 232 services.append((t, (h, p))) 215 233 else: 216 s = None 217 218 s.serve_forever() 234 services.append((opts.transport, (opts.host, opts.port))) 235 236 for s in services: 237 if s[0] == "soap": 238 servers.append(fedd_server(s[1], fedd_soap_handler, ctx, impl)) 239 elif s[0] == "xmlrpc": 240 servers.append(fedd_server(s[1], fedd_xmlrpc_handler, ctx, impl)) 241 else: print >>sys.stderr, "Unknown transport: %s" % s[0] 242 243 for s in servers: 244 t = Thread(target=run_server, args=(s,)) 245 t.start() -
fedd/fedd_experiment_control.py
reee2b2e ra97394b 147 147 self.state = { } 148 148 self.state_filename = config.experiment_state_file 149 self.state_lock = Lock() 149 150 self.tclsh = "/usr/local/bin/otclsh" 150 151 self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl" … … 252 253 d.close() 253 254 255 # Call while holding self.state_lock 254 256 def write_state(self): 255 257 if os.access(self.state_filename, os.W_OK): … … 265 267 print >>sys.stderr, "Pickling problem: %s" % e 266 268 269 # Call while holding self.state_lock 267 270 def read_state(self): 268 271 try: … … 1216 1219 req['experimentID'].has_key('localname'): 1217 1220 eid = req['experimentID']['localname'] 1221 self.state_lock.acquire() 1218 1222 while (self.state.has_key(eid)): 1219 1223 eid += random.choice(string.ascii_letters) 1224 self.state[eid] = "placeholder" 1225 self.state_lock.release() 1220 1226 else: 1221 1227 eid = self.exp_stem 1222 1228 for i in range(0,5): 1223 1229 eid += random.choice(string.ascii_letters) 1230 self.state_lock.acquire() 1224 1231 while (self.state.has_key(eid)): 1225 1232 eid = self.exp_stem 1226 1233 for i in range(0,5): 1227 1234 eid += random.choice(string.ascii_letters) 1235 self.state[eid] = "placeholder" 1236 self.state_lock.release() 1228 1237 1229 1238 try: … … 1351 1360 failed.append(master) 1352 1361 1353 # If one failed clean up 1354 if len(failed) > 0: 1355 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1356 if fail_soft: 1357 raise service_error(service_error.partial, \ 1358 "Partial swap in on %s" % ",".join(succeeded)) 1359 else: 1362 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1363 # If one failed clean up, unless fail_soft is set 1364 if failed: 1365 if not fail_soft: 1360 1366 for tb in succeeded: 1361 1367 self.stop_segment(tb, eid, tbparams) 1368 # Remove the placeholder 1369 self.state_lock.acquire() 1370 del self.state[eid] 1371 self.state_lock.release() 1372 1362 1373 raise service_error(service_error.federant, 1363 1374 "Swap in failed on %s" % ",".join(failed)) … … 1394 1405 'experimentAccess': { 'X509' : expcert },\ 1395 1406 } 1396 1407 1408 self.state_lock.acquire() 1397 1409 self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \ 1398 1410 for tb in tbparams.keys() \ … … 1406 1418 self.state[eid] = self.state[expid] 1407 1419 if self.state_filename: self.write_state() 1408 return resp 1420 self.state_lock.release() 1421 1422 if not failed: 1423 return resp 1424 else: 1425 raise service_error(service_error.partial, \ 1426 "Partial swap in on %s" % ",".join(succeeded)) 1427 1409 1428 1410 1429 def get_vtopo(self, req, fid): 1430 rv = None 1411 1431 1412 1432 req = req.get('VtopoRequestBody', None) … … 1427 1447 raise service_error(service_error.req, "No request?") 1428 1448 1449 self.state_lock.acquire() 1429 1450 if self.state.has_key(key): 1430 r eturn{ 'experiment' : {keytype: key },\1451 rv = { 'experiment' : {keytype: key },\ 1431 1452 'vtopo': self.state[key]['vtopo'],\ 1432 } 1433 else: 1434 raise service_error(service_error.req, "No such experiment") 1453 } 1454 self.state_lock.release() 1455 1456 if rv: return rv 1457 else: raise service_error(service_error.req, "No such experiment") 1435 1458 1436 1459 def get_vis(self, req, fid): 1460 rv = None 1437 1461 1438 1462 req = req.get('VisRequestBody', None) … … 1453 1477 raise service_error(service_error.req, "No request?") 1454 1478 1479 self.state_lock.acquire() 1455 1480 if self.state.has_key(key): 1456 r eturn{ 'experiment' : {keytype: key },\1481 rv = { 'experiment' : {keytype: key },\ 1457 1482 'vis': self.state[key]['vis'],\ 1458 1483 } 1459 else: 1460 raise service_error(service_error.req, "No such experiment") 1484 self.state_lock.release() 1485 1486 if rv: return rv 1487 else: raise service_error(service_error.req, "No such experiment") 1461 1488 1462 1489 def get_info(self, req, fid): 1490 rv = None 1463 1491 1464 1492 req = req.get('InfoRequestBody', None) … … 1482 1510 # get_info (e.g., encoded for XMLRPC transport) so send a copy of the 1483 1511 # state. 1512 self.state_lock.acquire() 1484 1513 if self.state.has_key(key): 1485 return copy.deepcopy(self.state[key]) 1486 else: 1487 raise service_error(service_error.req, "No such experiment") 1488 1514 rv = copy.deepcopy(self.state[key]) 1515 self.state_lock.release() 1516 1517 if rv: return rv 1518 else: raise service_error(service_error.req, "No such experiment") 1489 1519 1490 1520 … … 1508 1538 raise service_error(service_error.req, "No request?") 1509 1539 1540 self.state_lock.acquire() 1510 1541 fed_exp = self.state.get(key, None) 1511 1542 1512 1543 if fed_exp: 1544 # This branch of the conditional holds the lock to generate a 1545 # consistent temporary tbparams variable to deallocate experiments. 1546 # It releases the lock to do the deallocations and reacquires it to 1547 # remove the experiment state when the termination is complete. 1513 1548 ids = [] 1514 1549 # experimentID is a list of dicts that are self-describing … … 1546 1581 'eid': eid,\ 1547 1582 } 1583 self.state_lock.release() 1584 1548 1585 # Stop everyone. 1549 1586 for tb in tbparams.keys(): 1550 1587 self.stop_segment(tb, tbparams[tb]['eid'], tbparams) 1551 1588 1589 # Remove teh terminated experiment 1590 self.state_lock.acquire() 1552 1591 for id in ids: 1553 1592 if self.state.has_key(id): del self.state[id] 1554 1593 1555 1594 if self.state_filename: self.write_state() 1595 self.state_lock.release() 1596 1556 1597 return { 'experiment': exp } 1557 1598 else: 1599 # Don't forget to release the lock 1600 self.state_lock.release() 1558 1601 raise service_error(service_error.req, "No saved state") 1559 1602
Note: See TracChangeset
for help on using the changeset viewer.