Ignore:
Timestamp:
Apr 21, 2010 5:31:03 AM (14 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
f7a54c6
Parents:
f54e8e4
Message:

Get topology information into the info operation, as annotations of a topology description. This required adding such information to the start segment replies as well

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    rf54e8e4 rb4b19c7  
    10071007            self.log_collector = log_collector
    10081008            self.response = None
     1009            self.node = { }
     1010
     1011        def make_map(self, resp):
     1012            if 'segmentdescription' in resp and \
     1013                    'topdldescription' in resp['segmentdescription']:
     1014                top = topdl.Topology(\
     1015                        **resp['segmentdescription']['topdldescription'])
     1016                for e in [e for e in top.elements \
     1017                        if isinstance(e, topdl.Computer)]:
     1018                    hn = e.get_attribute('hostname')
     1019                    if hn:
     1020                        for n in e.name:
     1021                            self.node[n] = hn
    10091022
    10101023        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
     
    10501063                        for line in  lval.splitlines(True):
    10511064                            self.log_collector.write(line)
     1065                    self.make_map(r['StartSegmentResponseBody'])
    10521066                    self.response = r
    10531067                else:
     
    10881102
    10891103    def allocate_resources(self, allocated, masters, eid, expid,
    1090             tbparams, topo, tmpdir, alloc_log=None, log_collector=None,
     1104            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None,
    10911105            attrs=None, connInfo={}):
    10921106
     
    11011115        thread_pool = self.thread_pool(self.nthreads)
    11021116        threads = [ ]
     1117        starters = [ ]
    11031118
    11041119        for tb in allocated.keys():
     
    11181133                        "No alloc id for testbed %s !?" % tb)
    11191134
     1135            s = self.start_segment(log=log, debug=self.debug,
     1136                    testbed=tb, cert_file=self.cert_file,
     1137                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
     1138                    caller=self.call_StartSegment,
     1139                    log_collector=log_collector)
     1140            starters.append(s)
    11201141            t  = self.pooled_thread(\
    1121                     target=self.start_segment(log=log, debug=self.debug,
    1122                         testbed=tb, cert_file=self.cert_file,
    1123                         cert_pwd=self.cert_pwd,
    1124                         trusted_certs=self.trusted_certs,
    1125                         caller=self.call_StartSegment,
    1126                         log_collector=log_collector),
     1142                    target=s, name=tb,
    11271143                    args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]),
    1128                     name=tb,
    11291144                    pdata=thread_pool, trace_file=self.trace_file)
    11301145            threads.append(t)
     
    11901205                return
    11911206        else:
     1207            # Walk through the successes and gather the virtual to physical
     1208            # mapping.
     1209            node = { }
     1210            for s in starters:
     1211                node.update(s.node)
     1212            # Assigng the mapping as a hostname attribute
     1213            for e in [ e for e in top.elements \
     1214                    if isinstance(e, topdl.Computer)]:
     1215                for n in e.name:
     1216                    if n in node:
     1217                        e.set_attribute('hostname', node[n])
    11921218            log.info("[start_segment]: Experiment %s active" % eid)
    11931219
     
    12051231            log.debug("[start_experiment]: not removing %s" % tmpdir)
    12061232
    1207         # Insert the experiment into our state and update the disk copy
     1233        # Insert the experiment into our state and update the disk copy.
    12081234        self.state_lock.acquire()
    12091235        self.state[expid]['experimentStatus'] = 'active'
    12101236        self.state[eid] = self.state[expid]
     1237        self.state[eid]['experimentdescription']['topdldescription'] = \
     1238                top.to_dict()
    12111239        if self.state_filename: self.write_state()
    12121240        self.state_lock.release()
     
    12281256
    12291257
    1230     def create_experiment_state(self, fid, req, expid, expcert, 
     1258    def create_experiment_state(self, fid, req, expid, expcert,
    12311259            state='starting'):
    12321260        """
     
    16621690                # Add the service to masters
    16631691                for tb in s.get('export', []):
    1664                     if s.get('name', None) and s.get('import', None):
     1692                    if s.get('name', None):
    16651693                        if tb not in masters:
    16661694                            masters[tb] = [ ]
     
    16761704                                params=params))
    16771705                    else:
    1678                         log.error('Testbed service does not have name " + \
     1706                        self.log.error('Testbed service does not have name " + \
    16791707                                "and importers')
    16801708
     
    17611789            self.state[eid]['vtopo'] = vtopo
    17621790            self.state[eid]['vis'] = vis
     1791            self.state[eid]['experimentdescription'] = \
     1792                    { 'topdldescription': top.clone() }
    17631793            self.state[expid]['federant'] = \
    17641794                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
     
    18331863        t  = Thread(target=self.allocate_resources,
    18341864                args=(allocated, masters, eid, expid, tbparams,
    1835                     topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo),
     1865                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
     1866                    connInfo),
    18361867                name=eid)
    18371868        t.start()
     
    20112042                if f.has_key('allocID'): del f['allocID']
    20122043                if f.has_key('uri'): del f['uri']
     2044
    20132045        return rv
    20142046
Note: See TracChangeset for help on using the changeset viewer.