Changeset 46e4682 for fedd/federation


Ignore:
Timestamp:
Jul 28, 2009 10:57:38 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
d15522f
Parents:
3c20a31
Message:

termination logs, a terminating state, and avoiding a deadlock

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r3c20a31 r46e4682  
    20442044        # Create a logger that logs to the experiment's state object as well as
    20452045        # to the main log file.
    2046 
    20472046        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
    20482047        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     
    20532052        alloc_log.addHandler(h)
    20542053       
    2055 
    2056 
    2057 
    2058 
    20592054        # Start a thread to do the resource allocation
    20602055        t  = Thread(target=self.allocate_resources,
     
    22332228            for f in rv.get('federant', []):
    22342229                if f.has_key('allocID'): del f['allocID']
    2235 
    22362230        return rv
    22372231
     
    23182312        self.check_experiment_access(fid, key)
    23192313
     2314        dealloc_list = [ ]
     2315
     2316
     2317        # Create a logger that logs to the dealloc_list as well as to the main
     2318        # log file.
     2319        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
     2320        h = logging.StreamHandler(self.list_log(dealloc_list))
     2321        # XXX: there should be a global one of these rather than repeating the
     2322        # code.
     2323        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2324                    '%d %b %y %H:%M:%S'))
     2325        dealloc_log.addHandler(h)
     2326
    23202327        self.state_lock.acquire()
    23212328        fed_exp = self.state.get(key, None)
     
    23292336            # First make sure that the experiment creation is complete.
    23302337            status = fed_exp.get('experimentStatus', None)
     2338
    23312339            if status:
    2332                 if status == 'starting':
     2340                if status in ('starting', 'terminating'):
    23332341                    if not force:
    23342342                        self.state_lock.release()
    23352343                        raise service_error(service_error.partial,
    2336                                 'Experiment still being created')
     2344                                'Experiment still being created or destroyed')
    23372345                    else:
    2338                         self.log.warning('Experiment in starting state ' + \
    2339                                 'being terminated by admin.')
     2346                        self.log.warning('Experiment in %s state ' % status + \
     2347                                'being terminated by force.')
    23402348            else:
    23412349                # No status??? trouble
     
    23812389                        'aid': aid,\
    23822390                    }
     2391                print "%s %s" % (tb, tbparams[tb])
     2392            fed_exp['experimentStatus'] = 'terminating'
     2393            if self.state_filename: self.write_state()
    23832394            self.state_lock.release()
    23842395
    2385             # Stop everyone.
    2386             thread_pool = self.thread_pool(self.nthreads)
    2387             for tb in tbparams.keys():
    2388                 # Create and start a thread to stop the segment
    2389                 thread_pool.wait_for_slot()
    2390                 t  = self.pooled_thread(\
    2391                         target=self.stop_segment(log=self.log,
    2392                             keyfile=self.ssh_privkey_file, debug=self.debug),
    2393                         args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    2394                         pdata=thread_pool, trace_file=self.trace_file)
    2395                 t.start()
    2396             # Wait for completions
    2397             thread_pool.wait_for_all_done()
     2396            # Stop everyone.  NB, wait_for_all waits until a thread starts and
     2397            # then completes, so we can't wait if nothing starts.  So, no
     2398            # tbparams, no start.
     2399            if len(tbparams) > 0:
     2400                thread_pool = self.thread_pool(self.nthreads)
     2401                for tb in tbparams.keys():
     2402                    # Create and start a thread to stop the segment
     2403                    thread_pool.wait_for_slot()
     2404                    t  = self.pooled_thread(\
     2405                            target=self.stop_segment(log=dealloc_log,
     2406                                keyfile=self.ssh_privkey_file, debug=self.debug),
     2407                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
     2408                            pdata=thread_pool, trace_file=self.trace_file)
     2409                    t.start()
     2410                # Wait for completions
     2411                thread_pool.wait_for_all_done()
    23982412
    23992413            # release the allocations (failed experiments have done this
     
    24152429            self.state_lock.release()
    24162430
    2417             return { 'experiment': exp }
     2431            return {
     2432                    'experiment': exp ,
     2433                    'deallocationLog': "".join(dealloc_list),
     2434                    }
    24182435        else:
    24192436            # Don't forget to release the lock
Note: See TracChangeset for help on using the changeset viewer.