Changeset 22a1a77 for fedd/federation


Ignore:
Timestamp:
Nov 29, 2011 6:19:24 PM (13 years ago)
Author:
Ted Faber <faber@…>
Branches:
compt_changes, info-ops, master
Children:
b709861
Parents:
57facae
Message:

Checkpoint: untested operations stuff

Location:
fedd/federation
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r57facae r22a1a77  
    3737from thread_pool import thread_pool, pooled_thread
    3838from experiment_info import experiment_info, allocation_info, federated_service
     39from operation_status import operation_status
    3940
    4041import topdl
     
    6566    call_TerminateSegment = service_caller('TerminateSegment')
    6667    call_InfoSegment = service_caller('InfoSegment')
     68    call_OperationSegment = service_caller('OperationSegment')
    6769    call_Ns2Topdl = service_caller('Ns2Topdl')
    6870
     
    223225                'Info': soap_handler('Info', self.get_info),
    224226                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
     227                'Operation': soap_handler('Operation', self.do_operation),
    225228                'Terminate': soap_handler('Terminate',
    226229                    self.terminate_experiment),
     
    238241                'Terminate': xmlrpc_handler('Terminate',
    239242                    self.terminate_experiment),
     243                'Operation': xmlrpc_handler('Operation', self.do_operation),
    240244                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
    241245                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
     
    933937            except service_error, e:
    934938                self.log.error("Info segment failed on %s: %s" % \
     939                        (self.testbed, e))
     940                return False
     941
     942    class operation_segment:
     943        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
     944                cert_pwd=None, trusted_certs=None, caller=None,
     945                log_collector=None):
     946            self.log = log
     947            self.debug = debug
     948            self.cert_file = cert_file
     949            self.cert_pwd = cert_pwd
     950            self.trusted_certs = None
     951            self.caller = caller
     952            self.testbed = testbed
     953            self.status = None
     954
     955        def __call__(self, uri, aid, op, targets, params):
     956            req = {
     957                    'allocID': { 'fedid' : aid },
     958                    'operation': op,
     959                    'target': targets,
     960                    }
     961            if params: req['parameter'] = params
     962
     963
     964            try:
     965                self.log.debug("Calling OperationSegment at %s " % uri)
     966                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
     967                        self.trusted_certs)
     968                if 'OperationSegmentResponseBody' in r:
     969                    r = r['OperationSegmentResponseBody']
     970                    if 'status' in r:
     971                        self.status = r['status']
     972                else:
     973                    raise service_error(service_error.internal,
     974                            "Bad response!?: %s" %r)
     975                return True
     976            except service_error, e:
     977                self.log.error("Operation segment failed on %s: %s" % \
    935978                        (self.testbed, e))
    936979                return False
     
    23142357            raise service_error(service_error.req, "No such experiment")
    23152358
     2359    def operate_on_segments(self, op_params, cert, op, testbeds, params,
     2360            results):
     2361        """
     2362        Call OperateSegment on multiple testbeds and gather the results.
     2363        op_params contains the parameters needed to contact that testbed, cert
     2364        is a certificate containing the fedid to use, op is the operation,
     2365        testbeds is a dict mapping testbed name to targets in that testbed,
     2366        params are the parameters to include a,d results is a growing list of
     2367        the results of the calls.
     2368        """
     2369        try:
     2370            tmpdir = tempfile.mkdtemp(prefix="info-")
     2371        except EnvironmentError:
     2372            raise service_error(service_error.internal,
     2373                    "Cannot create tmp dir")
     2374        cert_file = self.make_temp_certfile(cert, tmpdir)
     2375
     2376        try:
     2377            for tb, targets in testbeds.items():
     2378                if tb in op_params:
     2379                    uri, aid = op_params[tb]
     2380                    operate=self.operation_segment(log=self.log, testbed=uri,
     2381                                cert_file=cert_file, cert_pwd=None,
     2382                                trusted_certs=self.trusted_certs,
     2383                                caller=self.call_OperationSegment)
     2384                    if operate(uri, aid, op, targets, params):
     2385                        if operate.status is not None:
     2386                            results.extend(operate.status)
     2387                            continue
     2388                # Something went wrong in a weird way.  Add statuses
     2389                # that reflect that to results
     2390                for t in targets:
     2391                    results.append(operation_status(t,
     2392                        operation_status.federant,
     2393                        'Unexpected error ion %s' % tb))
     2394        # Clean up the tmpdir no matter what
     2395        finally:
     2396            if tmpdir: self.remove_dirs(tmpdir)
     2397
     2398    def do_operation(self, req, fid):
     2399        """
     2400        Find the testbeds holding each target and ask them to carry out the
     2401        operation.  Return the statuses.
     2402        """
     2403        # Map an element to the testbed containing it
     2404        def element_to_tb(e):
     2405            if isinstance(e, topdl.Computer): return e.get_attribute("testbed")
     2406            elif isinstance(e, topdl.Testbed): return e.name
     2407            else: return None
     2408        # If d is an operation_status object, make it a dict
     2409        def make_dict(d):
     2410            if isinstance(d, dict): return d
     2411            elif isinstance(d, operation_status): return d.to_dict()
     2412            else: return { }
     2413
     2414        req = req.get('OperationRequestBody', None)
     2415        if not req:
     2416            raise service_error(service_error.req,
     2417                    "Bad request format (no OperationRequestBody)")
     2418        exp = req.get('experiment', None)
     2419        op = req.get('operation', None)
     2420        target = set(req.get('target', []))
     2421        params = req.get('parameter', None)
     2422
     2423        if exp:
     2424            if 'fedid' in exp:
     2425                key = exp['fedid']
     2426                keytype = "fedid"
     2427            elif 'localname' in exp:
     2428                key = exp['localname']
     2429                keytype = "localname"
     2430            else:
     2431                raise service_error(service_error.req, "Unknown lookup type")
     2432        else:
     2433            raise service_error(service_error.req, "No request?")
     2434
     2435        if op is None or not target
     2436            raise service_error(service_error.req, "No request?")
     2437
     2438        proof = self.check_experiment_access(fid, key)
     2439        self.state_lock.acquire()
     2440        if key in self.state:
     2441            d1, op_params, cert, d2 = \
     2442                    self.get_segment_info(self.state[key], need_lock=False)
     2443            top = self.state[key].top
     2444            if top is not None:
     2445                top = top.clone()
     2446        self.state_lock.release()
     2447
     2448        if top is None:
     2449            raise service_error(service_error.partial, "No topology yet",
     2450                    proof=proof)
     2451
     2452        testbeds = { }
     2453        results = []
     2454        for e in top.elements:
     2455            if e.name in targets:
     2456                tb = element_to_tb(e)
     2457                if tb is not None:
     2458                    if tb in testbeds: testbeds[tb].append(e.name)
     2459                    else: testbeds[tb] = [ e.name ]
     2460                else:
     2461                    results.append(operation_status(e.name,
     2462                        code=operation_status.no_target,
     2463                        description='Cannot map target to testbed'))
     2464
     2465        self.operate_on_segments(op_params, cert, op, testbeds, params,
     2466                results)
     2467
     2468        return {
     2469                'experiment': exp,
     2470                'status': [make_dict(r) for r in results]
     2471                'proof': proof.to_dict()
     2472                }
     2473
     2474
    23162475    def get_multi_info(self, req, fid):
    23172476        """
Note: See TracChangeset for help on using the changeset viewer.