Changeset 46e4682


Ignore:
Timestamp:
Jul 28, 2009 10:57:38 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
d15522f
Parents:
3c20a31
Message:

termination logs, a terminating state, and avoiding a deadlock

Location:
fedd
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd_client.py

    r3c20a31 r46e4682  
    163163                action="store_true", default=False,
    164164                help="Force termination if experiment is in strange state")
     165        self.add_option("--logfile", dest="logfile", default=None,
     166                help="File to write log to")
     167        self.add_option("--print_log", dest="print_log", default=False,
     168                action="store_true",
     169                help="Print deallocation log to standard output")
    165170
    166171class fedd_multi_exp_data_opts(fedd_client_opts):
     
    10541059        if opts.exp_name and opts.exp_certfile:
    10551060            sys.exit("Only one of --experiment_cert and " +\
    1056                     "--experiment_name permitted");
     1061                    "--experiment_name permitted")
     1062
     1063        if opts.print_log and opts.logfile:
     1064            sys.exit("Only one of --logfile and --print_log is permitted")
     1065        elif opts.print_log:
     1066            out = sys.stdout
     1067        elif opts.logfile:
     1068            try:
     1069                out = open(opts.logfile, "w")
     1070            except IOError,e:
     1071                sys.exit("Cannot open logfile: %s" %e)
     1072        else:
     1073            out = None
    10571074
    10581075        exp_id = None
     
    10821099            sys.exit("Error processing RPC: %s" % e)
    10831100
    1084         eid = resp_dict.get('experimentID', None)
    1085         if eid:
    1086             for id in eid:
    1087                 for k in id.keys():
    1088                     if k == 'fedid': print "%s: %s" % (k,fedid(bits=id[k]))
    1089                     else: print "%s: %s" % (k, id[k])
     1101        if out:
     1102            log = resp_dict.get('deallocationLog', None)
     1103            if log:
     1104                print >>out, log
     1105                out.close()
     1106            else:
     1107                out.close()
     1108                sys.exit("No log returned")
    10901109
    10911110class create(fedd_rpc):
  • fedd/federation/experiment_control.py

    r3c20a31 r46e4682  
    20442044        # Create a logger that logs to the experiment's state object as well as
    20452045        # to the main log file.
    2046 
    20472046        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
    20482047        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     
    20532052        alloc_log.addHandler(h)
    20542053       
    2055 
    2056 
    2057 
    2058 
    20592054        # Start a thread to do the resource allocation
    20602055        t  = Thread(target=self.allocate_resources,
     
    22332228            for f in rv.get('federant', []):
    22342229                if f.has_key('allocID'): del f['allocID']
    2235 
    22362230        return rv
    22372231
     
    23182312        self.check_experiment_access(fid, key)
    23192313
     2314        dealloc_list = [ ]
     2315
     2316
     2317        # Create a logger that logs to the dealloc_list as well as to the main
     2318        # log file.
     2319        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
     2320        h = logging.StreamHandler(self.list_log(dealloc_list))
     2321        # XXX: there should be a global one of these rather than repeating the
     2322        # code.
     2323        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2324                    '%d %b %y %H:%M:%S'))
     2325        dealloc_log.addHandler(h)
     2326
    23202327        self.state_lock.acquire()
    23212328        fed_exp = self.state.get(key, None)
     
    23292336            # First make sure that the experiment creation is complete.
    23302337            status = fed_exp.get('experimentStatus', None)
     2338
    23312339            if status:
    2332                 if status == 'starting':
     2340                if status in ('starting', 'terminating'):
    23332341                    if not force:
    23342342                        self.state_lock.release()
    23352343                        raise service_error(service_error.partial,
    2336                                 'Experiment still being created')
     2344                                'Experiment still being created or destroyed')
    23372345                    else:
    2338                         self.log.warning('Experiment in starting state ' + \
    2339                                 'being terminated by admin.')
     2346                        self.log.warning('Experiment in %s state ' % status + \
     2347                                'being terminated by force.')
    23402348            else:
    23412349                # No status??? trouble
     
    23812389                        'aid': aid,\
    23822390                    }
     2391                print "%s %s" % (tb, tbparams[tb])
     2392            fed_exp['experimentStatus'] = 'terminating'
     2393            if self.state_filename: self.write_state()
    23832394            self.state_lock.release()
    23842395
    2385             # Stop everyone.
    2386             thread_pool = self.thread_pool(self.nthreads)
    2387             for tb in tbparams.keys():
    2388                 # Create and start a thread to stop the segment
    2389                 thread_pool.wait_for_slot()
    2390                 t  = self.pooled_thread(\
    2391                         target=self.stop_segment(log=self.log,
    2392                             keyfile=self.ssh_privkey_file, debug=self.debug),
    2393                         args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    2394                         pdata=thread_pool, trace_file=self.trace_file)
    2395                 t.start()
    2396             # Wait for completions
    2397             thread_pool.wait_for_all_done()
     2396            # Stop everyone.  NB, wait_for_all waits until a thread starts and
     2397            # then completes, so we can't wait if nothing starts.  So, no
     2398            # tbparams, no start.
     2399            if len(tbparams) > 0:
     2400                thread_pool = self.thread_pool(self.nthreads)
     2401                for tb in tbparams.keys():
     2402                    # Create and start a thread to stop the segment
     2403                    thread_pool.wait_for_slot()
     2404                    t  = self.pooled_thread(\
     2405                            target=self.stop_segment(log=dealloc_log,
     2406                                keyfile=self.ssh_privkey_file, debug=self.debug),
     2407                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
     2408                            pdata=thread_pool, trace_file=self.trace_file)
     2409                    t.start()
     2410                # Wait for completions
     2411                thread_pool.wait_for_all_done()
    23982412
    23992413            # release the allocations (failed experiments have done this
     
    24152429            self.state_lock.release()
    24162430
    2417             return { 'experiment': exp }
     2431            return {
     2432                    'experiment': exp ,
     2433                    'deallocationLog': "".join(dealloc_list),
     2434                    }
    24182435        else:
    24192436            # Don't forget to release the lock
  • fedd/wsdl/fedd_types.xsd

    r3c20a31 r46e4682  
    607607    <xsd:sequence>
    608608      <xsd:element name="experiment" type="tns:IDType"/>
     609      <xsd:element name="deallocationLog" type="xsd:string" minOccurs="0"
     610        maxOccurs="1"/>
    609611    </xsd:sequence>
    610612  </xsd:complexType>
Note: See TracChangeset for help on using the changeset viewer.