Changeset 6e33086 for fedd/federation


Ignore:
Timestamp:
Nov 28, 2011 7:34:11 PM (12 years ago)
Author:
Ted Faber <faber@…>
Branches:
compt_changes, info-ops, master
Children:
ec3aa4d
Parents:
45e880d
Message:

InfoSegment? to emulab access controllers

Location:
fedd/federation
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/emulab_access.py

    r45e880d r6e33086  
    101101            self.start_segment = proxy_emulab_segment.start_segment
    102102            self.stop_segment = proxy_emulab_segment.stop_segment
     103            self.info_segment = proxy_emulab_segment.info_segment
    103104        elif self.access_type == 'local_emulab':
    104105            self.start_segment = local_emulab_segment.start_segment
    105106            self.stop_segment = local_emulab_segment.stop_segment
     107            self.info_segment = local_emulab_segment.info_segment
    106108        else:
    107109            self.start_segment = None
    108110            self.stop_segment = None
     111            self.info_segment = None
    109112
    110113        self.restricted = [ ]
     
    181184            'TerminateSegment': soap_handler("TerminateSegment",
    182185                self.TerminateSegment),
     186            'InfoSegment': soap_handler("InfoSegment", self.InfoSegment),
    183187            }
    184188        self.xmlrpc_services =  {\
     
    190194            'TerminateSegment': xmlrpc_handler('TerminateSegment',
    191195                self.TerminateSegment),
     196            'InfoSegment': xmlrpc_handler("InfoSegment", self.InfoSegment),
    192197            }
    193198
     
    10111016        return (ename, proj, user, pubkey_base, secretkey_base, alloc_log)
    10121017
    1013     def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
    1014         """
    1015         Store key bits of experiment state in the global repository, including
    1016         the response that may need to be replayed, and return the response.
    1017         """
    1018         i = 0
    1019         t = topo.clone()
     1018    def decorate_topology(self, info, t):
     1019        """
     1020        Copy the physical mapping and status onto the topology.  Used by
     1021        StartSegment and InfoSegment
     1022        """
     1023        def add_new(ann, attr):
     1024            for a in ann:
     1025                if a not in attr: attr.append(a)
     1026
    10201027        # Copy the assigned names into the return topology
    10211028        for e in t.elements:
    10221029            if isinstance(e, topdl.Computer):
    10231030                if not self.create_debug:
    1024                     if e.name in starter.node:
    1025                         e.localname.append("%s%s" % \
    1026                                 (starter.node[e.name][0], self.domain))
    1027                         e.status = starter.node[e.name][1]
     1031                    if e.name in info.node:
     1032                        add_new(("%s%s" % (info.node[e.name][0], self.domain),),
     1033                                e.localname)
     1034                        e.status = info.node[e.name][1]
    10281035                else:
    10291036                    # Simple debugging assignment
    1030                     e.localname.append("node%d%s" % (i, self.domain))
     1037                    add_new(("node%d%s" % (i, self.domain),), e.localname)
    10311038                    e.status = 'active'
    1032                     e.operation.extend(('testop1', 'testop2'))
     1039                    add_new(('testop1', 'testop2'), e.operation)
    10331040                    i += 1
    10341041
     1042
     1043    def finalize_experiment(self, starter, topo, aid, alloc_id, proof):
     1044        """
     1045        Store key bits of experiment state in the global repository, including
     1046        the response that may need to be replayed, and return the response.
     1047        """
     1048        i = 0
     1049        t = topo.clone()
     1050        self.decorate_topology(starter, t)
    10351051        # Grab the log (this is some anal locking, but better safe than
    10361052        # sorry)
     
    12461262        info = self.info_segment(keyfile=self.ssh_privkey_file,
    12471263                debug=self.create_debug, boss=self.boss, cert=self.xmlrpc_cert)
    1248         info(self, user, proj, ename, gid, nonce)
     1264        info(self, user, proj, ename)
     1265        self.decorate_topology(info, topo)
    12491266        return {
    12501267                'allocID': req['allocID'],
    1251                 'experimentdescription' { 'topdldescription' : topo.to_dict() },
     1268                'segmentdescription':
     1269                    { 'topdldescription' : topo.to_dict() },
    12521270                'proof': proof.to_dict(),
    12531271                }
    1254 
  • fedd/federation/emulab_segment.py

    r45e880d r6e33086  
    3131        self.log =  getattr(self, 'log', None)
    3232        self.debug = getattr(self, 'debug', False)
     33        self.node = { }
    3334
    3435    def emulab_call(self, method, params):
     
    210211        """
    211212
     213        ev_ok = ('ISUP', 'ALWAYSUP' )
     214
    212215        if self.debug:
    213216            if self.log:
     
    226229            if code ==0:
    227230                for k, v in nodes.items():
    228                     print v
    229231                    if v.get('erole', False) and 'pnode' in v:
    230232                        st = v.get('status', 'up')
    231                         if st == 'up': st = 'active'
     233                        ev = v.get('eventstatus', 'ISUP')
     234
     235                        if st == 'up' and ev in ev_ok: st = 'active'
    232236                        else: st = 'failed'
    233237                        self.node[k] = (v['pnode'], st)
  • fedd/federation/experiment_control.py

    r45e880d r6e33086  
    9090    call_StartSegment = service_caller('StartSegment')
    9191    call_TerminateSegment = service_caller('TerminateSegment')
     92    call_InfoSegment = service_caller('InfoSegment')
    9293    call_Ns2Topdl = service_caller('Ns2Topdl')
    9394
     
    158159                or 'legacy'
    159160        self.auth_dir = config.get('experiment_control', 'auth_dir')
     161        # XXX: document this!
     162        self.info_cache_limit = \
     163                config.getint('experiment_control', 'info_cache', 600)
    160164        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
    161165        else: self.direct_transit = [ ]
     
    930934                return False
    931935
     936    class info_segment(start_segment):
     937        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
     938                cert_pwd=None, trusted_certs=None, caller=None,
     939                log_collector=None):
     940            experiment_control_local.start_segment.__init__(self, debug,
     941                    log, testbed, cert_file, cert_pwd, trusted_certs,
     942                    caller, log_collector)
     943
     944        def __call__(self, uri, aid):
     945            req = { 'allocID': { 'fedid' : aid } }
     946
     947            try:
     948                self.log.debug("Calling InfoSegment at %s " % uri)
     949                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
     950                        self.trusted_certs)
     951                if r.has_key('InfoSegmentResponseBody'):
     952                    self.make_map(r['InfoSegmentResponseBody'])
     953                    if 'proof' in r: self.proof = r['proof']
     954                    self.response = r
     955                else:
     956                    raise service_error(service_error.internal,
     957                            "Bad response!?: %s" %r)
     958                return True
     959            except service_error, e:
     960                self.log.error("Info segment failed on %s: %s" % \
     961                        (self.testbed, e))
     962                return False
     963
     964    def annotate_topology(self, top, data):
     965        def add_new(ann, attr):
     966            for a in ann:
     967                if a not in attr: attr.append(a)
     968
     969        # Annotate the topology with embedding info
     970        for e in top.elements:
     971            if isinstance(e, topdl.Computer):
     972                for s in data:
     973                    ann = s.node.get(e.name, None)
     974                    if ann is not None:
     975                        add_new(ann[0], e.localname)
     976                        e.status = ann[1]
     977                        add_new(ann[2], e.service)
     978                        add_new(ann[3], e.operation)
     979                        break
     980
     981
     982
    932983    def allocate_resources(self, allocated, masters, eid, expid,
    933984            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None,
     
    10461097                self.state_lock.acquire()
    10471098                self.state[eid].status = 'failed'
     1099                self.state[eid].updated()
    10481100                if self.state_filename: self.write_state()
    10491101                self.state_lock.release()
     
    10651117                if s.proof:
    10661118                    proofs[s.testbed] = s.proof
    1067 
    1068             # Annotate the topology with embedding info
    1069             for e in top.elements:
    1070                 if isinstance(e, topdl.Computer):
    1071                     for s in starters:
    1072                         ann = s.node.get(e.name, None)
    1073                         if ann is not None:
    1074                             e.localname.extend(ann[0])
    1075                             e.status = ann[1]
    1076                             e.service.extend(ann[2])
    1077                             e.operation.extend(ann[3])
    1078                             break
    1079 
     1119            self.annotate_topology(top, starters)
    10801120            log.info("[start_segment]: Experiment %s active" % eid)
    10811121
     
    10921132        self.state[eid] = self.state[expid]
    10931133        self.state[eid].top = top
     1134        self.state[eid].updated()
    10941135        # Append startup proofs
    10951136        for f in self.state[eid].get_all_allocations():
     
    16451686            exp = self.state[key]
    16461687            exp.status = "starting"
     1688            exp.updated()
    16471689            expid = exp.fedid
    16481690            eid = exp.localname
     
    19802022        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
    19812023                    for p in tbparams[k].proof]
     2024        exp.updated()
    19822025        if self.state_filename:
    19832026            self.write_state()
     
    21922235            return (None, None)
    21932236
     2237    def update_info(self, key, force=False):
     2238        top = None
     2239        self.state_lock.acquire()
     2240        if key in self.state:
     2241            if force or self.state[key].older_than(self.info_cache_limit):
     2242                top = self.state[key].top
     2243                if top is not None: top = top.clone()
     2244                d1, info_params, cert, d2 = \
     2245                        self.get_segment_info(self.state[key], need_lock=False)
     2246        self.state_lock.release()
     2247
     2248        if top is None: return
     2249
     2250        try:
     2251            tmpdir = tempfile.mkdtemp(prefix="info-")
     2252        except EnvironmentError:
     2253            raise service_error(service_error.internal,
     2254                    "Cannot create tmp dir")
     2255        cert_file = self.make_temp_certfile(cert, tmpdir)
     2256
     2257        data = []
     2258        try:
     2259            for k, (uri, aid) in info_params.items():
     2260                info=self.info_segment(log=self.log, testbed=uri,
     2261                            cert_file=cert_file, cert_pwd=None,
     2262                            trusted_certs=self.trusted_certs,
     2263                            caller=self.call_InfoSegment)
     2264                info(uri, aid)
     2265                data.append(info)
     2266        # Clean up the tmpdir no matter what
     2267        finally:
     2268            if tmpdir: self.remove_dirs(tmpdir)
     2269
     2270        self.annotate_topology(top, data)
     2271        self.state_lock.acquire()
     2272        if key in self.state:
     2273            self.state[key].top = top
     2274            self.state[key].updated()
     2275            if self.state_filename: self.write_state()
     2276        self.state_lock.release()
     2277
    21942278   
    21952279    def get_info(self, req, fid):
     
    22052289        exp = req.get('experiment', None)
    22062290        legacy = req.get('legacy', False)
     2291        fresh = req.get('fresh', False)
    22072292        if exp:
    22082293            if exp.has_key('fedid'):
     
    22192304        proof = self.check_experiment_access(fid, key)
    22202305
     2306        self.update_info(key, fresh)
     2307
    22212308        self.state_lock.acquire()
    22222309        if self.state.has_key(key):
    22232310            rv = self.state[key].get_info()
    2224             # Copy the topo if we need to generate legacy representations
     2311            # Copy the topo if we need legacy annotations
    22252312            if legacy:
    22262313                top = self.state[key].top
     
    22292316
    22302317        # If the legacy visualization and topology representations are
    2231         # requested, calculate them and add them to teh return.
     2318        # requested, calculate them and add them to the return.
    22322319        if legacy and rv is not None:
    22332320            if top is not None:
     
    23002387                    "Experiment has no status!?")
    23012388
    2302 
    2303     def get_termination_info(self, fed_exp):
     2389    def get_segment_info(self, fed_exp, need_lock=True):
    23042390        ids = []
    23052391        term_params = { }
    2306         self.state_lock.acquire()
     2392        if need_lock: self.state_lock.acquire()
    23072393        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
    23082394        expcert = fed_exp.identity
     
    23152401            aid = fed.allocID
    23162402            term_params[aid] = (uri, aid)
     2403        if need_lock: self.state_lock.release()
     2404        return ids, term_params, expcert, repo
     2405
     2406
     2407    def get_termination_info(self, fed_exp):
     2408        self.state_lock.acquire()
     2409        ids, term_params, expcert, repo = self.get_segment_info(fed_exp, False)
    23172410        # Change the experiment state
    23182411        fed_exp.status = 'terminating'
     2412        fed_exp.updated()
    23192413        if self.state_filename: self.write_state()
    23202414        self.state_lock.release()
     
    24142508        if fed_exp:
    24152509            status = self.check_termination_status(fed_exp, force)
     2510            # get_termination_info updates the experiment state
    24162511            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
    24172512            self.deallocate_resources(term_params, expcert, status, force,
  • fedd/federation/experiment_info.py

    r45e880d r6e33086  
    22
    33import copy
     4from datetime import datetime, timedelta
     5from numbers import Number
    46
    57class allocation_info:
     
    3840        self.log = []
    3941        self.alloc = { }
     42        self.last_update = datetime.now()
    4043
    4144    def add_allocation(self, a):
     
    4750    def get_all_allocations(self):
    4851        return self.alloc.values()
     52
     53    def updated(self):
     54        self.last_update = datetime.now()
     55
     56    def older_than(self, secs=None, dt=None):
     57        """
     58        If the last update of this info was more than secs seconds ago, or
     59        before dt (a datetime), return True.  If both secs and dt or neither is
     60        given return False.  If the last update time is completelt unknown
     61        (which should never happen) return True.
     62        """
     63        if self.last_update is None:
     64            return True
     65        elif dt is None and isinstance(secs, Number):
     66            return self.last_update + timedelta(seconds=secs) < datetime.now()
     67        elif secs is None and isinstance(dt, datetime):
     68            return self.last_update < dt
     69        else:
     70            return False
     71
    4972
    5073    def get_info(self):
  • fedd/federation/local_emulab_segment.py

    r45e880d r6e33086  
    2121        local_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
    2222        emulab_segment.__init__(self, boss=boss, cert=cert)
    23         self.node = { }
    2423
    2524    def set_up_experiment_filespace(self, pid, eid, tmpdir):
     
    123122        return rv
    124123
     124class info_segment(local_segment, emulab_segment):
     125    def __init__(self, log=None, keyfile=None, debug=False, boss=None,
     126            cert=None):
     127        local_segment.__init__(self, log=log, keyfile=keyfile, debug=debug)
     128        emulab_segment.__init__(self, boss=boss, cert=cert)
     129
     130    def __call__(self, parent, user, pid, eid):
     131        self.log.info("[info_segment]: Getting info from %s" % eid)
     132        self.get_mapping(pid,eid)
     133        return True
Note: See TracChangeset for help on using the changeset viewer.