Ignore:
Timestamp:
Sep 6, 2009 2:15:52 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
2b7d768
Parents:
1da6a23
Message:

terminate works

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r1da6a23 r5ae3857  
    197197    call_ReleaseAccess = service_caller('ReleaseAccess')
    198198    call_StartSegment = service_caller('StartSegment')
     199    call_TerminateSegment = service_caller('TerminateSegment')
    199200    call_Ns2Split = service_caller('Ns2Split')
    200201
     
    350351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
    351352                'Terminate': soap_handler('Terminate',
    352                     self.terminate_experiment),
     353                    self.new_terminate_experiment),
    353354        }
    354355
     
    360361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
    361362                'Terminate': xmlrpc_handler('Terminate',
    362                     self.terminate_experiment),
     363                    self.new_terminate_experiment),
    363364        }
    364365
     
    21662167                    self.trusted_certs)
    21672168            print r
     2169            return True
     2170
     2171
     2172
     2173    class new_terminate_segment:
     2174        def __init__(self, debug=False, log=None, cert_file=None,
     2175                cert_pwd=None, trusted_certs=None, caller=None):
     2176            self.log = log
     2177            self.debug = debug
     2178            self.cert_file = cert_file
     2179            self.cert_pwd = cert_pwd
     2180            self.trusted_certs = None
     2181            self.caller = caller
     2182
     2183        def __call__(self, uri, aid ):
     2184            print "in terminate_segment: %s" % aid
     2185            req = {
     2186                    'allocID': aid ,
     2187                }
     2188            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
     2189                    self.trusted_certs)
    21682190            return True
    21692191
     
    32363258            self.state_lock.release()
    32373259            raise service_error(service_error.req, "No saved state")
     3260
     3261    def new_terminate_experiment(self, req, fid):
     3262        """
     3263        Swap this experiment out on the federants and delete the shared
     3264        information
     3265        """
     3266        tbparams = { }
     3267        req = req.get('TerminateRequestBody', None)
     3268        if not req:
     3269            raise service_error(service_error.req,
     3270                    "Bad request format (no TerminateRequestBody)")
     3271        force = req.get('force', False)
     3272        exp = req.get('experiment', None)
     3273        if exp:
     3274            if exp.has_key('fedid'):
     3275                key = exp['fedid']
     3276                keytype = "fedid"
     3277            elif exp.has_key('localname'):
     3278                key = exp['localname']
     3279                keytype = "localname"
     3280            else:
     3281                raise service_error(service_error.req, "Unknown lookup type")
     3282        else:
     3283            raise service_error(service_error.req, "No request?")
     3284
     3285        self.check_experiment_access(fid, key)
     3286
     3287        dealloc_list = [ ]
     3288
     3289
     3290        # Create a logger that logs to the dealloc_list as well as to the main
     3291        # log file.
     3292        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
     3293        h = logging.StreamHandler(self.list_log(dealloc_list))
     3294        # XXX: there should be a global one of these rather than repeating the
     3295        # code.
     3296        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     3297                    '%d %b %y %H:%M:%S'))
     3298        dealloc_log.addHandler(h)
     3299
     3300        self.state_lock.acquire()
     3301        fed_exp = self.state.get(key, None)
     3302
     3303        if fed_exp:
     3304            # This branch of the conditional holds the lock to generate a
     3305            # consistent temporary tbparams variable to deallocate experiments.
     3306            # It releases the lock to do the deallocations and reacquires it to
     3307            # remove the experiment state when the termination is complete.
     3308
     3309            # First make sure that the experiment creation is complete.
     3310            status = fed_exp.get('experimentStatus', None)
     3311
     3312            if status:
     3313                if status in ('starting', 'terminating'):
     3314                    if not force:
     3315                        self.state_lock.release()
     3316                        raise service_error(service_error.partial,
     3317                                'Experiment still being created or destroyed')
     3318                    else:
     3319                        self.log.warning('Experiment in %s state ' % status + \
     3320                                'being terminated by force.')
     3321            else:
     3322                # No status??? trouble
     3323                self.state_lock.release()
     3324                raise service_error(service_error.internal,
     3325                        "Experiment has no status!?")
     3326
     3327            ids = []
     3328            #  experimentID is a list of dicts that are self-describing
     3329            #  identifiers.  This finds all the fedids and localnames - the
     3330            #  keys of self.state - and puts them into ids.
     3331            for id in fed_exp.get('experimentID', []):
     3332                if id.has_key('fedid'): ids.append(id['fedid'])
     3333                if id.has_key('localname'): ids.append(id['localname'])
     3334
     3335            # Collect the allocation/segment ids
     3336            for fed in fed_exp.get('federant', []):
     3337                try:
     3338                    print "looking at %s" % fed
     3339                    tb = fed['emulab']['project']['testbed']['localname']
     3340                    aid = fed['allocID']
     3341                except KeyError, e:
     3342                    print "Key error: %s" %e
     3343                    continue
     3344                tbparams[tb] = aid
     3345            fed_exp['experimentStatus'] = 'terminating'
     3346            if self.state_filename: self.write_state()
     3347            self.state_lock.release()
     3348
     3349            # Stop everyone.  NB, wait_for_all waits until a thread starts and
     3350            # then completes, so we can't wait if nothing starts.  So, no
     3351            # tbparams, no start.
     3352            if len(tbparams) > 0:
     3353                thread_pool = self.thread_pool(self.nthreads)
     3354                for tb in tbparams.keys():
     3355                    # Create and start a thread to stop the segment
     3356                    thread_pool.wait_for_slot()
     3357                    uri = self.tbmap.get(tb, None)
     3358                    t  = self.pooled_thread(\
     3359                            target=self.new_terminate_segment(log=dealloc_log,
     3360                                cert_file=self.cert_file,
     3361                                cert_pwd=self.cert_pwd,
     3362                                trusted_certs=self.trusted_certs,
     3363                                caller=self.call_TerminateSegment),
     3364                            args=(uri, tbparams[tb]), name=tb,
     3365                            pdata=thread_pool, trace_file=self.trace_file)
     3366                    t.start()
     3367                # Wait for completions
     3368                thread_pool.wait_for_all_done()
     3369
     3370            # release the allocations (failed experiments have done this
     3371            # already, and starting experiments may be in odd states, so we
     3372            # ignore errors releasing those allocations
     3373            try:
     3374                for tb in tbparams.keys():
     3375                    self.release_access(tb, tbparams[tb])
     3376            except service_error, e:
     3377                if status != 'failed' and not force:
     3378                    raise e
     3379
     3380            # Remove the terminated experiment
     3381            self.state_lock.acquire()
     3382            for id in ids:
     3383                if self.state.has_key(id): del self.state[id]
     3384
     3385            if self.state_filename: self.write_state()
     3386            self.state_lock.release()
     3387
     3388            return {
     3389                    'experiment': exp ,
     3390                    'deallocationLog': "".join(dealloc_list),
     3391                    }
     3392        else:
     3393            # Don't forget to release the lock
     3394            self.state_lock.release()
     3395            raise service_error(service_error.req, "No saved state")
Note: See TracChangeset for help on using the changeset viewer.