Changeset 5ae3857 for fedd/federation


Ignore:
Timestamp:
Sep 6, 2009 2:15:52 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
2b7d768
Parents:
1da6a23
Message:

terminate works

Location:
fedd/federation
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    r1da6a23 r5ae3857  
    6767        self.certdir = config.get("access","certdir")
    6868        self.ssh_privkey_file = config.get("access","ssh_privkey_file")
     69        self.create_debug = config.getboolean("access", "create_debug")
     70        print self.create_debug
    6971
    7072        self.attrs = { }
     
    116118            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
    117119            'StartSegment': soap_handler("StartSegment", self.StartSegment),
     120            'TerminateSegment': soap_handler("TerminateSegment", self.TerminateSegment),
    118121            }
    119122        self.xmlrpc_services =  {\
     
    122125            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
    123126                self.ReleaseAccess),
    124             'StartSegment': xmlrpc_handler('StartSegment',
    125                 self.StartSegment),
     127            'StartSegment': xmlrpc_handler("StartSegment", self.StartSegment),
     128            'TerminateSegment': xmlrpc_handler('TerminateSegment',
     129                self.TerminateSegment),
    126130            }
    127131
     
    10831087    class stop_segment(proxy_emulab_segment):
    10841088        def __init__(self, log=None, keyfile=None, debug=False):
    1085             experiment_control_local.emulab_segment.__init__(self,
     1089            access.proxy_emulab_segment.__init__(self,
    10861090                    log=log, keyfile=keyfile, debug=debug)
    10871091
    1088         def __call__(self, tb, eid, tbparams):
     1092        def __call__(self, parent, user, pid, eid):
    10891093            """
    10901094            Stop a sub experiment by calling swapexp on the federant
    10911095            """
    1092             user = tbparams[tb]['user']
    1093             host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    1094             pid = tbparams[tb]['project']
    1095 
    1096             self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
     1096            host = "%s%s" % (parent.ops, parent.domain)
     1097            self.log.info("[stop_segment]: Stopping %s" % eid)
    10971098            rv = False
    10981099            try:
    10991100                # Clean out tar files: we've gone over quota in the past
    1100                 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
    1101                 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
    1102                         (pid, eid))
     1101                self.ssh_cmd(user, host, "rm -rf /proj/%s/software/%s" % \
     1102                        (pid, eid))
    11031103                rv = self.ssh_cmd(user, host,
    11041104                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
     
    13351335                proj = self.allocation[aid].get('sproject', None)
    13361336            user = self.allocation[aid].get('user', None)
     1337            self.allocation[aid]['experiment'] = ename
     1338            self.write_state()
    13371339        self.state_lock.release()
    13381340
     
    13511353        self.generate_ns2(topo, expfile,
    13521354                "/proj/%s/software/%s/" % (proj, ename), master)
    1353         starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=False)
     1355        starter = self.start_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug)
    13541356        starter(self, ename, proj, user, expfile, tmpdir)
    13551357
    13561358        return { 'allocID': req['allocID'] }
     1359
     1360    def TerminateSegment(self, req, fid):
     1361        print "In terminate"
     1362        try:
     1363            req = req['TerminateSegmentRequestBody']
     1364        except KeyError:
     1365            raise service_error(server_error.req, "Badly formed request")
     1366
     1367        auth_attr = req['allocID']['fedid']
     1368        aid = "%s" % auth_attr
     1369        attrs = req.get('fedAttr', [])
     1370        print "in terminate %s" % auth_attr
     1371        if not self.auth.check_attribute(fid, auth_attr):
     1372            print "access denied"
     1373            raise service_error(service_error.access, "Access denied")
     1374
     1375        self.state_lock.acquire()
     1376        if self.allocation.has_key(aid):
     1377            proj = self.allocation[aid].get('project', None)
     1378            if not proj:
     1379                proj = self.allocation[aid].get('sproject', None)
     1380            user = self.allocation[aid].get('user', None)
     1381            ename = self.allocation[aid].get('experiment', None)
     1382        self.state_lock.release()
     1383
     1384        if not proj:
     1385            raise service_error(service_error.internal,
     1386                    "Can't find project for %s" % aid)
     1387
     1388        if not user:
     1389            raise service_error(service_error.internal,
     1390                    "Can't find creation user for %s" % aid)
     1391        if not ename:
     1392            raise service_error(service_error.internal,
     1393                    "Can't find experiment name for %s" % aid)
     1394        stopper = self.stop_segment(keyfile=self.ssh_privkey_file, debug=self.create_debug)
     1395        stopper(self, user, proj, ename)
     1396        print { 'allocID': req['allocID'] }
     1397        return { 'allocID': req['allocID'] }
  • fedd/federation/experiment_control.py

    r1da6a23 r5ae3857  
    197197    call_ReleaseAccess = service_caller('ReleaseAccess')
    198198    call_StartSegment = service_caller('StartSegment')
     199    call_TerminateSegment = service_caller('TerminateSegment')
    199200    call_Ns2Split = service_caller('Ns2Split')
    200201
     
    350351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
    351352                'Terminate': soap_handler('Terminate',
    352                     self.terminate_experiment),
     353                    self.new_terminate_experiment),
    353354        }
    354355
     
    360361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
    361362                'Terminate': xmlrpc_handler('Terminate',
    362                     self.terminate_experiment),
     363                    self.new_terminate_experiment),
    363364        }
    364365
     
    21662167                    self.trusted_certs)
    21672168            print r
     2169            return True
     2170
     2171
     2172
     2173    class new_terminate_segment:
     2174        def __init__(self, debug=False, log=None, cert_file=None,
     2175                cert_pwd=None, trusted_certs=None, caller=None):
     2176            self.log = log
     2177            self.debug = debug
     2178            self.cert_file = cert_file
     2179            self.cert_pwd = cert_pwd
     2180            self.trusted_certs = None
     2181            self.caller = caller
     2182
     2183        def __call__(self, uri, aid ):
     2184            print "in terminate_segment: %s" % aid
     2185            req = {
     2186                    'allocID': aid ,
     2187                }
     2188            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
     2189                    self.trusted_certs)
    21682190            return True
    21692191
     
    32363258            self.state_lock.release()
    32373259            raise service_error(service_error.req, "No saved state")
     3260
     3261    def new_terminate_experiment(self, req, fid):
     3262        """
     3263        Swap this experiment out on the federants and delete the shared
     3264        information
     3265        """
     3266        tbparams = { }
     3267        req = req.get('TerminateRequestBody', None)
     3268        if not req:
     3269            raise service_error(service_error.req,
     3270                    "Bad request format (no TerminateRequestBody)")
     3271        force = req.get('force', False)
     3272        exp = req.get('experiment', None)
     3273        if exp:
     3274            if exp.has_key('fedid'):
     3275                key = exp['fedid']
     3276                keytype = "fedid"
     3277            elif exp.has_key('localname'):
     3278                key = exp['localname']
     3279                keytype = "localname"
     3280            else:
     3281                raise service_error(service_error.req, "Unknown lookup type")
     3282        else:
     3283            raise service_error(service_error.req, "No request?")
     3284
     3285        self.check_experiment_access(fid, key)
     3286
     3287        dealloc_list = [ ]
     3288
     3289
     3290        # Create a logger that logs to the dealloc_list as well as to the main
     3291        # log file.
     3292        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
     3293        h = logging.StreamHandler(self.list_log(dealloc_list))
     3294        # XXX: there should be a global one of these rather than repeating the
     3295        # code.
     3296        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     3297                    '%d %b %y %H:%M:%S'))
     3298        dealloc_log.addHandler(h)
     3299
     3300        self.state_lock.acquire()
     3301        fed_exp = self.state.get(key, None)
     3302
     3303        if fed_exp:
     3304            # This branch of the conditional holds the lock to generate a
     3305            # consistent temporary tbparams variable to deallocate experiments.
     3306            # It releases the lock to do the deallocations and reacquires it to
     3307            # remove the experiment state when the termination is complete.
     3308
     3309            # First make sure that the experiment creation is complete.
     3310            status = fed_exp.get('experimentStatus', None)
     3311
     3312            if status:
     3313                if status in ('starting', 'terminating'):
     3314                    if not force:
     3315                        self.state_lock.release()
     3316                        raise service_error(service_error.partial,
     3317                                'Experiment still being created or destroyed')
     3318                    else:
     3319                        self.log.warning('Experiment in %s state ' % status + \
     3320                                'being terminated by force.')
     3321            else:
     3322                # No status??? trouble
     3323                self.state_lock.release()
     3324                raise service_error(service_error.internal,
     3325                        "Experiment has no status!?")
     3326
     3327            ids = []
     3328            #  experimentID is a list of dicts that are self-describing
     3329            #  identifiers.  This finds all the fedids and localnames - the
     3330            #  keys of self.state - and puts them into ids.
     3331            for id in fed_exp.get('experimentID', []):
     3332                if id.has_key('fedid'): ids.append(id['fedid'])
     3333                if id.has_key('localname'): ids.append(id['localname'])
     3334
     3335            # Collect the allocation/segment ids
     3336            for fed in fed_exp.get('federant', []):
     3337                try:
     3338                    print "looking at %s" % fed
     3339                    tb = fed['emulab']['project']['testbed']['localname']
     3340                    aid = fed['allocID']
     3341                except KeyError, e:
     3342                    print "Key error: %s" %e
     3343                    continue
     3344                tbparams[tb] = aid
     3345            fed_exp['experimentStatus'] = 'terminating'
     3346            if self.state_filename: self.write_state()
     3347            self.state_lock.release()
     3348
     3349            # Stop everyone.  NB, wait_for_all waits until a thread starts and
     3350            # then completes, so we can't wait if nothing starts.  So, no
     3351            # tbparams, no start.
     3352            if len(tbparams) > 0:
     3353                thread_pool = self.thread_pool(self.nthreads)
     3354                for tb in tbparams.keys():
     3355                    # Create and start a thread to stop the segment
     3356                    thread_pool.wait_for_slot()
     3357                    uri = self.tbmap.get(tb, None)
     3358                    t  = self.pooled_thread(\
     3359                            target=self.new_terminate_segment(log=dealloc_log,
     3360                                cert_file=self.cert_file,
     3361                                cert_pwd=self.cert_pwd,
     3362                                trusted_certs=self.trusted_certs,
     3363                                caller=self.call_TerminateSegment),
     3364                            args=(uri, tbparams[tb]), name=tb,
     3365                            pdata=thread_pool, trace_file=self.trace_file)
     3366                    t.start()
     3367                # Wait for completions
     3368                thread_pool.wait_for_all_done()
     3369
     3370            # release the allocations (failed experiments have done this
     3371            # already, and starting experiments may be in odd states, so we
     3372            # ignore errors releasing those allocations
     3373            try:
     3374                for tb in tbparams.keys():
     3375                    self.release_access(tb, tbparams[tb])
     3376            except service_error, e:
     3377                if status != 'failed' and not force:
     3378                    raise e
     3379
     3380            # Remove the terminated experiment
     3381            self.state_lock.acquire()
     3382            for id in ids:
     3383                if self.state.has_key(id): del self.state[id]
     3384
     3385            if self.state_filename: self.write_state()
     3386            self.state_lock.release()
     3387
     3388            return {
     3389                    'experiment': exp ,
     3390                    'deallocationLog': "".join(dealloc_list),
     3391                    }
     3392        else:
     3393            # Don't forget to release the lock
     3394            self.state_lock.release()
     3395            raise service_error(service_error.req, "No saved state")
Note: See TracChangeset for help on using the changeset viewer.