Ignore:
Timestamp:
Nov 28, 2011 7:34:11 PM (12 years ago)
Author:
Ted Faber <faber@…>
Branches:
compt_changes, info-ops, master
Children:
ec3aa4d
Parents:
45e880d
Message:

InfoSegment? to emulab access controllers

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r45e880d r6e33086  
    9090    call_StartSegment = service_caller('StartSegment')
    9191    call_TerminateSegment = service_caller('TerminateSegment')
     92    call_InfoSegment = service_caller('InfoSegment')
    9293    call_Ns2Topdl = service_caller('Ns2Topdl')
    9394
     
    158159                or 'legacy'
    159160        self.auth_dir = config.get('experiment_control', 'auth_dir')
     161        # XXX: document this!
     162        self.info_cache_limit = \
     163                config.getint('experiment_control', 'info_cache', 600)
    160164        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
    161165        else: self.direct_transit = [ ]
     
    930934                return False
    931935
     936    class info_segment(start_segment):
     937        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
     938                cert_pwd=None, trusted_certs=None, caller=None,
     939                log_collector=None):
     940            experiment_control_local.start_segment.__init__(self, debug,
     941                    log, testbed, cert_file, cert_pwd, trusted_certs,
     942                    caller, log_collector)
     943
     944        def __call__(self, uri, aid):
     945            req = { 'allocID': { 'fedid' : aid } }
     946
     947            try:
     948                self.log.debug("Calling InfoSegment at %s " % uri)
     949                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
     950                        self.trusted_certs)
     951                if r.has_key('InfoSegmentResponseBody'):
     952                    self.make_map(r['InfoSegmentResponseBody'])
     953                    if 'proof' in r: self.proof = r['proof']
     954                    self.response = r
     955                else:
     956                    raise service_error(service_error.internal,
     957                            "Bad response!?: %s" %r)
     958                return True
     959            except service_error, e:
     960                self.log.error("Info segment failed on %s: %s" % \
     961                        (self.testbed, e))
     962                return False
     963
     964    def annotate_topology(self, top, data):
     965        def add_new(ann, attr):
     966            for a in ann:
     967                if a not in attr: attr.append(a)
     968
     969        # Annotate the topology with embedding info
     970        for e in top.elements:
     971            if isinstance(e, topdl.Computer):
     972                for s in data:
     973                    ann = s.node.get(e.name, None)
     974                    if ann is not None:
     975                        add_new(ann[0], e.localname)
     976                        e.status = ann[1]
     977                        add_new(ann[2], e.service)
     978                        add_new(ann[3], e.operation)
     979                        break
     980
     981
     982
    932983    def allocate_resources(self, allocated, masters, eid, expid,
    933984            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None,
     
    10461097                self.state_lock.acquire()
    10471098                self.state[eid].status = 'failed'
     1099                self.state[eid].updated()
    10481100                if self.state_filename: self.write_state()
    10491101                self.state_lock.release()
     
    10651117                if s.proof:
    10661118                    proofs[s.testbed] = s.proof
    1067 
    1068             # Annotate the topology with embedding info
    1069             for e in top.elements:
    1070                 if isinstance(e, topdl.Computer):
    1071                     for s in starters:
    1072                         ann = s.node.get(e.name, None)
    1073                         if ann is not None:
    1074                             e.localname.extend(ann[0])
    1075                             e.status = ann[1]
    1076                             e.service.extend(ann[2])
    1077                             e.operation.extend(ann[3])
    1078                             break
    1079 
     1119            self.annotate_topology(top, starters)
    10801120            log.info("[start_segment]: Experiment %s active" % eid)
    10811121
     
    10921132        self.state[eid] = self.state[expid]
    10931133        self.state[eid].top = top
     1134        self.state[eid].updated()
    10941135        # Append startup proofs
    10951136        for f in self.state[eid].get_all_allocations():
     
    16451686            exp = self.state[key]
    16461687            exp.status = "starting"
     1688            exp.updated()
    16471689            expid = exp.fedid
    16481690            eid = exp.localname
     
    19802022        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
    19812023                    for p in tbparams[k].proof]
     2024        exp.updated()
    19822025        if self.state_filename:
    19832026            self.write_state()
     
    21922235            return (None, None)
    21932236
     2237    def update_info(self, key, force=False):
     2238        top = None
     2239        self.state_lock.acquire()
     2240        if key in self.state:
     2241            if force or self.state[key].older_than(self.info_cache_limit):
     2242                top = self.state[key].top
     2243                if top is not None: top = top.clone()
     2244                d1, info_params, cert, d2 = \
     2245                        self.get_segment_info(self.state[key], need_lock=False)
     2246        self.state_lock.release()
     2247
     2248        if top is None: return
     2249
     2250        try:
     2251            tmpdir = tempfile.mkdtemp(prefix="info-")
     2252        except EnvironmentError:
     2253            raise service_error(service_error.internal,
     2254                    "Cannot create tmp dir")
     2255        cert_file = self.make_temp_certfile(cert, tmpdir)
     2256
     2257        data = []
     2258        try:
     2259            for k, (uri, aid) in info_params.items():
     2260                info=self.info_segment(log=self.log, testbed=uri,
     2261                            cert_file=cert_file, cert_pwd=None,
     2262                            trusted_certs=self.trusted_certs,
     2263                            caller=self.call_InfoSegment)
     2264                info(uri, aid)
     2265                data.append(info)
     2266        # Clean up the tmpdir no matter what
     2267        finally:
     2268            if tmpdir: self.remove_dirs(tmpdir)
     2269
     2270        self.annotate_topology(top, data)
     2271        self.state_lock.acquire()
     2272        if key in self.state:
     2273            self.state[key].top = top
     2274            self.state[key].updated()
     2275            if self.state_filename: self.write_state()
     2276        self.state_lock.release()
     2277
    21942278   
    21952279    def get_info(self, req, fid):
     
    22052289        exp = req.get('experiment', None)
    22062290        legacy = req.get('legacy', False)
     2291        fresh = req.get('fresh', False)
    22072292        if exp:
    22082293            if exp.has_key('fedid'):
     
    22192304        proof = self.check_experiment_access(fid, key)
    22202305
     2306        self.update_info(key, fresh)
     2307
    22212308        self.state_lock.acquire()
    22222309        if self.state.has_key(key):
    22232310            rv = self.state[key].get_info()
    2224             # Copy the topo if we need to generate legacy representations
     2311            # Copy the topo if we need legacy annotations
    22252312            if legacy:
    22262313                top = self.state[key].top
     
    22292316
    22302317        # If the legacy visualization and topology representations are
    2231         # requested, calculate them and add them to teh return.
     2318        # requested, calculate them and add them to the return.
    22322319        if legacy and rv is not None:
    22332320            if top is not None:
     
    23002387                    "Experiment has no status!?")
    23012388
    2302 
    2303     def get_termination_info(self, fed_exp):
     2389    def get_segment_info(self, fed_exp, need_lock=True):
    23042390        ids = []
    23052391        term_params = { }
    2306         self.state_lock.acquire()
     2392        if need_lock: self.state_lock.acquire()
    23072393        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
    23082394        expcert = fed_exp.identity
     
    23152401            aid = fed.allocID
    23162402            term_params[aid] = (uri, aid)
     2403        if need_lock: self.state_lock.release()
     2404        return ids, term_params, expcert, repo
     2405
     2406
     2407    def get_termination_info(self, fed_exp):
     2408        self.state_lock.acquire()
     2409        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
    23172410        # Change the experiment state
    23182411        fed_exp.status = 'terminating'
     2412        fed_exp.updated()
    23192413        if self.state_filename: self.write_state()
    23202414        self.state_lock.release()
     
    24142508        if fed_exp:
    24152509            status = self.check_termination_status(fed_exp, force)
     2510            # get_termination_info updates the experiment state
    24162511            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
    24172512            self.deallocate_resources(term_params, expcert, status, force,
Note: See TracChangeset for help on using the changeset viewer.