Changeset f07fa49


Ignore:
Timestamp:
Sep 8, 2009 5:57:00 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
40dd8c1
Parents:
430e98d
Message:

better logging and cleanup

Location:
fedd/federation
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    r430e98d rf07fa49  
    2222
    2323import topdl
     24import list_log
    2425import httplib
    2526import tempfile
     
    782783                        msg = { 'ReleaseProjectRequestBody' : msg}
    783784                        self.allocate_project.release_project(msg)
     785                # And remove the access cert
     786                cf = "%s/%s.pem" % (self.certdir, aid)
     787                self.log.debug("Removing %s" % cf)
     788                os.remove(cf)
    784789                return { 'allocID': req['allocID'] }
    785790            else:
     
    13401345                user = self.allocation[aid].get('user', None)
    13411346                self.allocation[aid]['experiment'] = ename
     1347                self.allocation[aid]['log'] = [ ]
     1348                # Create a logger that logs to the experiment's state object as
     1349                # well as to the main log file.
     1350                alloc_log = logging.getLogger('fedd.access.%s' % ename)
     1351                h = logging.StreamHandler(
     1352                        list_log.list_log(self.allocation[aid]['log']))
     1353                # XXX: there should be a global one of these rather than
     1354                # repeating the code.
     1355                h.setFormatter(logging.Formatter(
     1356                    "%(asctime)s %(name)s %(message)s",
     1357                            '%d %b %y %H:%M:%S'))
     1358                alloc_log.addHandler(h)
    13421359                self.write_state()
    13431360            self.state_lock.release()
     
    13571374            self.generate_ns2(topo, expfile,
    13581375                    "/proj/%s/software/%s/" % (proj, ename), master)
     1376
    13591377            starter = self.start_segment(keyfile=self.ssh_privkey_file,
    1360                     debug=self.create_debug)
     1378                    debug=self.create_debug, log=alloc_log)
    13611379            rv = starter(self, ename, proj, user, expfile, tmpdir)
    13621380        except service_error, e:
     
    13731391
    13741392        if rv:
    1375             return { 'allocID': req['allocID'] }
     1393            # Grab the log (this is some anal locking, but better safe than
     1394            # sorry)
     1395            self.state_lock.acquire()
     1396            logv = "".join(self.allocation[aid]['log'])
     1397            self.state_lock.release()
     1398
     1399            return { 'allocID': req['allocID'], 'allocationLog': logv }
    13761400        elif err:
    13771401            raise service_error(service_error.federant,
  • fedd/federation/experiment_control.py

    r430e98d rf07fa49  
    3030
    3131import topdl
     32import list_log
    3233from ip_allocator import ip_allocator
    3334from ip_addr import ip_addr
     
    4950
    5051    class ssh_cmd_timeout(RuntimeError): pass
    51 
    52     class list_log:
    53         """
    54         Provide an interface that lets logger.StreamHandler s write to a list
    55         of strings.
    56         """
    57         def __init__(self, l=[]):
    58             """
    59             Link to an existing list or just create a log
    60             """
    61             self.ll = l
    62             self.lock = Lock()
    63         def write(self, str):
    64             """
    65             Add the string to the log.  Lock for consistency.
    66             """
    67             self.lock.acquire()
    68             self.ll.append(str)
    69             self.lock.release()
    70 
    71         def flush(self):
    72             """
    73             No-op that StreamHandlers expect
    74             """
    75             pass
    76 
    7752   
    7853    class thread_pool:
     
    221196        self.thread_with_rv = experiment_control_local.pooled_thread
    222197        self.thread_pool = experiment_control_local.thread_pool
    223         self.list_log = experiment_control_local.list_log
     198        self.list_log = list_log.list_log
    224199
    225200        self.cert_file = config.get("experiment_control", "cert_file")
     
    924899    class start_segment:
    925900        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
    926                 cert_pwd=None, trusted_certs=None, caller=None):
     901                cert_pwd=None, trusted_certs=None, caller=None,
     902                log_collector=None):
    927903            self.log = log
    928904            self.debug = debug
     
    932908            self.caller = caller
    933909            self.testbed = testbed
     910            self.log_collector = log_collector
    934911
    935912        def __call__(self, uri, aid, topo, master, attrs=None):
     
    947924                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
    948925                        self.trusted_certs)
     926                if r.has_key('StartSegmentResponseBody'):
     927                    lval = r['StartSegmentResponseBody'].get('allocationLog',
     928                            None)
     929                    if lval and self.log_collector:
     930                        for line in  lval.splitlines(True):
     931                            self.log_collector.write(line)
     932                else:
     933                    raise service_error(service_error.internal,
     934                            "Bad response!?: %s" %r)
    949935                return True
    950936            except service_error, e:
     
    981967
    982968    def allocate_resources(self, allocated, master, eid, expid, expcert,
    983             tbparams, topo, tmpdir, alloc_log=None, attrs=None):
     969            tbparams, topo, tmpdir, alloc_log=None, log_collector=None,
     970            attrs=None):
    984971        started = { }           # Testbeds where a sub-experiment started
    985972                                # successfully
     
    10141001                        cert_pwd=self.cert_pwd,
    10151002                        trusted_certs=self.trusted_certs,
    1016                         caller=self.call_StartSegment),
     1003                        caller=self.call_StartSegment,
     1004                        log_collector=log_collector),
    10171005                    args=(uri, aid, topo[tb], False, attrs), name=tb,
    10181006                    pdata=thread_pool, trace_file=self.trace_file)
     
    10411029                    testbed=master, cert_file=self.cert_file,
    10421030                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
    1043                     caller=self.call_StartSegment)
     1031                    caller=self.call_StartSegment,
     1032                    log_collector=log_collector)
    10441033            if not starter(uri, aid, topo[master], True, attrs):
    10451034                failed.append(master)
     
    16781667        # to the main log file.
    16791668        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
    1680         h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     1669        alloc_collector = self.list_log(self.state[eid]['log'])
     1670        h = logging.StreamHandler(alloc_collector)
    16811671        # XXX: there should be a global one of these rather than repeating the
    16821672        # code.
     
    17121702        t  = Thread(target=self.allocate_resources,
    17131703                args=(allocated, master, eid, expid, expcert, tbparams,
    1714                     topo, tmpdir, alloc_log, attrs),
     1704                    topo, tmpdir, alloc_log, alloc_collector, attrs),
    17151705                name=eid)
    17161706        t.start()
Note: See TracChangeset for help on using the changeset viewer.