Changeset 32e7d93


Ignore:
Timestamp:
Sep 9, 2009 1:40:11 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
e2a7a413
Parents:
3c6dbec
Message:

Incremental logging and correct failures.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r3c6dbec r32e7d93  
    120120            self.release()
    121121
    122         def wait_for_all_done(self):
    123             """
    124             Wait until all active threads finish (and at least one has started)
    125             """
     122        def wait_for_all_done(self, timeout=None):
     123            """
     124            Wait until all active threads finish (and at least one has
     125            started).  If a timeout is given, return after waiting that long
     126            for termination.  If all threads are done (and one has started in
     127            the since the last clear()) return True, otherwise False.
     128            """
     129            if timeout:
     130                deadline = time.time() + timeout
    126131            self.acquire()
    127132            while self.started == 0 or self.started > self.terminated:
    128                 self.wait()
     133                self.wait(timeout)
     134                if timeout:
     135                    if time.time() > deadline:
     136                        break
     137                    timeout = deadline - time.time()
    129138            self.release()
     139            return not (self.started == 0 or self.started > self.terminated)
    130140
    131141    class pooled_thread(Thread):
     
    9941004            t.start()
    9951005
    996         # Wait until all finish
    997         thread_pool.wait_for_all_done()
     1006        # Wait until all finish (keep pinging the log, though)
     1007        mins = 0
     1008        while not thread_pool.wait_for_all_done(60.0):
     1009            mins += 1
     1010            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
     1011                    % mins)
     1012
     1013        thread_pool.clear()
    9981014
    9991015        # If none failed, start the master
     
    10121028                raise service_error(service_error.internal,
    10131029                    "No alloc id for testbed %s !?" % master)
    1014             starter = self.start_segment(log=log, debug=self.debug,
    1015                     testbed=master, cert_file=self.cert_file,
    1016                     cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
    1017                     caller=self.call_StartSegment,
    1018                     log_collector=log_collector)
    1019             if not starter(uri, aid, topo[master], True, attrs):
    1020                 failed.append(master)
     1030            t = self.pooled_thread(
     1031                    target=self.start_segment(log=log, debug=self.debug,
     1032                        testbed=master, cert_file=self.cert_file,
     1033                        cert_pwd=self.cert_pwd,
     1034                        trusted_certs=self.trusted_certs,
     1035                        caller=self.call_StartSegment,
     1036                        log_collector=log_collector),
     1037                    args =(uri, aid, topo[master], True, attrs),
     1038                    name=master, pdata=thread_pool, trace_file=self.trace_file)
     1039            threads.append(t)
     1040            t.start()
     1041            # Wait until the master finishes (keep pinging the log, though)
     1042            mins = 0
     1043            while not thread_pool.wait_for_all_done(60.0):
     1044                mins += 1
     1045                alloc_log.info("Waiting for master (it has been %d mins)" \
     1046                        % mins)
     1047            # update failed to include the master, if it failed
     1048            failed = [ t.getName() for t in threads if not t.rv ]
    10211049
    10221050        succeeded = [tb for tb in allocated.keys() if tb not in failed]
    10231051        # If one failed clean up, unless fail_soft is set
    1024         if failed and False:
     1052        if failed:
    10251053            if not fail_soft:
    10261054                thread_pool.clear()
     
    10281056                    # Create and start a thread to stop the segment
    10291057                    thread_pool.wait_for_slot()
     1058                    uri = self.tbmap.get(tb, None)
    10301059                    t  = self.pooled_thread(\
    1031                             target=self.stop_segment(log=log,
     1060                            target=self.terminate_segment(log=log,
    10321061                                testbed=tb,
    1033                                 keyfile=self.ssh_privkey_file,
    1034                                 debug=self.debug),
    1035                             args=(tb, eid, tbparams), name=tb,
     1062                                cert_file=self.cert_file,
     1063                                cert_pwd=self.cert_pwd,
     1064                                trusted_certs=self.trusted_certs,
     1065                                caller=self.call_TerminateSegment),
     1066                            args=(uri, tbparams[tb]['federant']['allocID']),
     1067                            name=tb,
    10361068                            pdata=thread_pool, trace_file=self.trace_file)
    10371069                    t.start()
Note: See TracChangeset for help on using the changeset viewer.