- Timestamp:
- Sep 8, 2009 5:57:00 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
- Children:
- 40dd8c1
- Parents:
- 430e98d
- Location:
- fedd/federation
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r430e98d rf07fa49 22 22 23 23 import topdl 24 import list_log 24 25 import httplib 25 26 import tempfile … … 782 783 msg = { 'ReleaseProjectRequestBody' : msg} 783 784 self.allocate_project.release_project(msg) 785 # And remove the access cert 786 cf = "%s/%s.pem" % (self.certdir, aid) 787 self.log.debug("Removing %s" % cf) 788 os.remove(cf) 784 789 return { 'allocID': req['allocID'] } 785 790 else: … … 1340 1345 user = self.allocation[aid].get('user', None) 1341 1346 self.allocation[aid]['experiment'] = ename 1347 self.allocation[aid]['log'] = [ ] 1348 # Create a logger that logs to the experiment's state object as 1349 # well as to the main log file. 1350 alloc_log = logging.getLogger('fedd.access.%s' % ename) 1351 h = logging.StreamHandler( 1352 list_log.list_log(self.allocation[aid]['log'])) 1353 # XXX: there should be a global one of these rather than 1354 # repeating the code. 1355 h.setFormatter(logging.Formatter( 1356 "%(asctime)s %(name)s %(message)s", 1357 '%d %b %y %H:%M:%S')) 1358 alloc_log.addHandler(h) 1342 1359 self.write_state() 1343 1360 self.state_lock.release() … … 1357 1374 self.generate_ns2(topo, expfile, 1358 1375 "/proj/%s/software/%s/" % (proj, ename), master) 1376 1359 1377 starter = self.start_segment(keyfile=self.ssh_privkey_file, 1360 debug=self.create_debug )1378 debug=self.create_debug, log=alloc_log) 1361 1379 rv = starter(self, ename, proj, user, expfile, tmpdir) 1362 1380 except service_error, e: … … 1373 1391 1374 1392 if rv: 1375 return { 'allocID': req['allocID'] } 1393 # Grab the log (this is some anal locking, but better safe than 1394 # sorry) 1395 self.state_lock.acquire() 1396 logv = "".join(self.allocation[aid]['log']) 1397 self.state_lock.release() 1398 1399 return { 'allocID': req['allocID'], 'allocationLog': logv } 1376 1400 elif err: 1377 1401 raise service_error(service_error.federant, -
fedd/federation/experiment_control.py
r430e98d rf07fa49 30 30 31 31 import topdl 32 import list_log 32 33 from ip_allocator import ip_allocator 33 34 from ip_addr import ip_addr … … 49 50 50 51 class ssh_cmd_timeout(RuntimeError): pass 51 52 class list_log:53 """54 Provide an interface that lets logger.StreamHandler s write to a list55 of strings.56 """57 def __init__(self, l=[]):58 """59 Link to an existing list or just create a log60 """61 self.ll = l62 self.lock = Lock()63 def write(self, str):64 """65 Add the string to the log. Lock for consistency.66 """67 self.lock.acquire()68 self.ll.append(str)69 self.lock.release()70 71 def flush(self):72 """73 No-op that StreamHandlers expect74 """75 pass76 77 52 78 53 class thread_pool: … … 221 196 self.thread_with_rv = experiment_control_local.pooled_thread 222 197 self.thread_pool = experiment_control_local.thread_pool 223 self.list_log = experiment_control_local.list_log198 self.list_log = list_log.list_log 224 199 225 200 self.cert_file = config.get("experiment_control", "cert_file") … … 924 899 class start_segment: 925 900 def __init__(self, debug=False, log=None, testbed="", cert_file=None, 926 cert_pwd=None, trusted_certs=None, caller=None): 901 cert_pwd=None, trusted_certs=None, caller=None, 902 log_collector=None): 927 903 self.log = log 928 904 self.debug = debug … … 932 908 self.caller = caller 933 909 self.testbed = testbed 910 self.log_collector = log_collector 934 911 935 912 def __call__(self, uri, aid, topo, master, attrs=None): … … 947 924 r = self.caller(uri, req, self.cert_file, self.cert_pwd, 948 925 self.trusted_certs) 926 if r.has_key('StartSegmentResponseBody'): 927 lval = r['StartSegmentResponseBody'].get('allocationLog', 928 None) 929 if lval and self.log_collector: 930 for line in lval.splitlines(True): 931 self.log_collector.write(line) 932 else: 933 raise service_error(service_error.internal, 934 "Bad response!?: %s" %r) 949 935 return True 950 936 except service_error, e: … … 981 967 982 968 def allocate_resources(self, allocated, master, eid, expid, expcert, 983 tbparams, topo, tmpdir, alloc_log=None, attrs=None): 969 tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 970 attrs=None): 984 971 started = { } # Testbeds where a sub-experiment started 985 972 # successfully … … 1014 1001 cert_pwd=self.cert_pwd, 1015 1002 trusted_certs=self.trusted_certs, 1016 caller=self.call_StartSegment), 1003 caller=self.call_StartSegment, 1004 log_collector=log_collector), 1017 1005 args=(uri, aid, topo[tb], False, attrs), name=tb, 1018 1006 pdata=thread_pool, trace_file=self.trace_file) … … 1041 1029 testbed=master, cert_file=self.cert_file, 1042 1030 cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs, 1043 caller=self.call_StartSegment) 1031 caller=self.call_StartSegment, 1032 log_collector=log_collector) 1044 1033 if not starter(uri, aid, topo[master], True, attrs): 1045 1034 failed.append(master) … … 1678 1667 # to the main log file. 1679 1668 alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid) 1680 h = logging.StreamHandler(self.list_log(self.state[eid]['log'])) 1669 alloc_collector = self.list_log(self.state[eid]['log']) 1670 h = logging.StreamHandler(alloc_collector) 1681 1671 # XXX: there should be a global one of these rather than repeating the 1682 1672 # code. … … 1712 1702 t = Thread(target=self.allocate_resources, 1713 1703 args=(allocated, master, eid, expid, expcert, tbparams, 1714 topo, tmpdir, alloc_log, a ttrs),1704 topo, tmpdir, alloc_log, alloc_collector, attrs), 1715 1705 name=eid) 1716 1706 t.start()
Note: See TracChangeset
for help on using the changeset viewer.