Changeset 109a32a for fedd/federation


Ignore:
Timestamp:
Mar 1, 2010 4:38:17 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
Children:
8353ac6
Parents:
2761484
Message:

More dragon cleanup and finally removing the last "special" testbed. We
self-organize now.

Tested w/o full actually allocating.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r2761484 r109a32a  
    10311031            tbparams, topo, tmpdir, alloc_log=None, log_collector=None,
    10321032            attrs=None, connInfo={}, services=[]):
    1033         def get_vlan(r):
    1034             if r.has_key('StartSegmentResponseBody'):
    1035                 srb = r['StartSegmentResponseBody']
    1036                 if srb.has_key('fedAttr'):
    1037                     for k, v in [ (a['attribute'], a['value']) \
    1038                             for a in srb['fedAttr']]:
    1039                         if k == 'vlan': return v
    1040             return None
    10411033
    10421034        started = { }           # Testbeds where a sub-experiment started
     
    10461038        fail_soft = False
    10471039
    1048         non_transit = [ k for k in allocated.keys() \
    1049                 if not topo[k].get_attribute('transit')]
    1050         transit = [ k for k in allocated.keys() \
    1051                 if topo[k].get_attribute('transit')]
    1052 
    10531040        log = alloc_log or self.log
    10541041
     
    10561043        threads = [ ]
    10571044
    1058         for tb in transit:
    1059             uri = tbparams[tb]['uri']
     1045        for tb in allocated.keys():
     1046            # Create and start a thread to start the segment, and save it
     1047            # to get the return value later
     1048            thread_pool.wait_for_slot()
     1049            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
     1050            if not uri:
     1051                raise service_error(service_error.internal,
     1052                        "Unknown testbed %s !?" % tb)
     1053
    10601054            if tbparams[tb].has_key('allocID') and \
    10611055                    tbparams[tb]['allocID'].has_key('fedid'):
     
    10651059                        "No alloc id for testbed %s !?" % tb)
    10661060
    1067             m = re.search('(\d+)', tb)
    1068             if m:
    1069                 to_repl = "unassigned%s" % m.group(1)
    1070             else:
    1071                 raise service_error(service_error.internal,
    1072                         "Bad dynamic allocation name")
    1073                 break
    1074 
    1075             ss = self.start_segment(log=log, debug=self.debug,
    1076                 testbed=tb, cert_file=self.cert_file,
    1077                 cert_pwd=self.cert_pwd,
    1078                 trusted_certs=self.trusted_certs,
    1079                 caller=self.call_StartSegment,
    1080                 log_collector=log_collector)
    1081             t = self.pooled_thread(
    1082                     target=ss,
    1083                     args =(uri, aid, topo[tb], False, attrs, connInfo[tb],
    1084                         services),
    1085                     name=tb, pdata=thread_pool, trace_file=self.trace_file)
     1061            t  = self.pooled_thread(\
     1062                    target=self.start_segment(log=log, debug=self.debug,
     1063                        testbed=tb, cert_file=self.cert_file,
     1064                        cert_pwd=self.cert_pwd,
     1065                        trusted_certs=self.trusted_certs,
     1066                        caller=self.call_StartSegment,
     1067                        log_collector=log_collector),
     1068                    args=(uri, aid, topo[tb], tb == master,
     1069                        attrs, connInfo[tb], services),
     1070                    name=tb,
     1071                    pdata=thread_pool, trace_file=self.trace_file)
    10861072            threads.append(t)
    10871073            t.start()
    1088             # Wait until the this transit node finishes (keep pinging the log,
    1089             # though)
    1090 
    1091             mins = 0
    1092             while not thread_pool.wait_for_all_done(60.0):
    1093                 mins += 1
    1094                 alloc_log.info("Waiting for transit (it has been %d mins)" \
    1095                         % mins)
    1096 
    1097             if t.rv:
    1098                 vlan = get_vlan(ss.response)
    1099                 if vlan is not None:
    1100                     for v in connInfo.values():
    1101                         for i in v:
    1102                             for a in i.get('fedAttr', []):
    1103                                 if a.get('attribute', "") == 'vlan_id' and \
    1104                                         a.get('value', "") == to_repl:
    1105                                     a['value'] = vlan
    1106             else:
    1107                 break
    1108             thread_pool.clear()
    1109 
    1110 
    1111         failed = [ t.getName() for t in threads if not t.rv ]
    1112 
    1113         if len(failed) == 0:
    1114             for tb in non_transit:
    1115                 # Create and start a thread to start the segment, and save it
    1116                 # to get the return value later
    1117                 thread_pool.wait_for_slot()
    1118                 uri = self.tbmap.get(tb, None)
    1119                 if not uri:
    1120                     raise service_error(service_error.internal,
    1121                             "Unknown testbed %s !?" % tb)
    1122 
    1123                 if tbparams[tb].has_key('allocID') and \
    1124                         tbparams[tb]['allocID'].has_key('fedid'):
    1125                     aid = tbparams[tb]['allocID']['fedid']
    1126                 else:
    1127                     raise service_error(service_error.internal,
    1128                             "No alloc id for testbed %s !?" % tb)
    1129 
    1130                 t  = self.pooled_thread(\
    1131                         target=self.start_segment(log=log, debug=self.debug,
    1132                             testbed=tb, cert_file=self.cert_file,
    1133                             cert_pwd=self.cert_pwd,
    1134                             trusted_certs=self.trusted_certs,
    1135                             caller=self.call_StartSegment,
    1136                             log_collector=log_collector),
    1137                         args=(uri, aid, topo[tb], tb == master,
    1138                             attrs, connInfo[tb], services),
    1139                         name=tb,
    1140                         pdata=thread_pool, trace_file=self.trace_file)
    1141                 threads.append(t)
    1142                 t.start()
    1143 
    1144             # Wait until all finish (keep pinging the log, though)
    1145             mins = 0
    1146             while not thread_pool.wait_for_all_done(60.0):
    1147                 mins += 1
    1148                 alloc_log.info("Waiting for sub threads (it has been %d mins)" \
    1149                         % mins)
    1150 
    1151             thread_pool.clear()
     1074
     1075        # Wait until all finish (keep pinging the log, though)
     1076        mins = 0
     1077        while not thread_pool.wait_for_all_done(60.0):
     1078            mins += 1
     1079            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
     1080                    % mins)
     1081
     1082        thread_pool.clear()
    11521083
    11531084        failed = [ t.getName() for t in threads if not t.rv ]
     
    14191350                aid = tbparams[st]['allocID']['fedid']
    14201351            except:
    1421                 self.log.debug("Can't get allocation id?")
     1352                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
     1353                        % st)
    14221354                aid = None
    14231355            info = {
     
    15111443                            ('vlan', 'unassigned%d' % idx),)])
    15121444        name = "dragon%d" % idx
     1445        store_key = 'fedid:%s/vlan%d' % (expid, idx)
    15131446        for tb in tbs.keys():
    15141447            seg = topdl.Segment(
     
    15291462            segs.append(seg)
    15301463
    1531 
    1532         try:
    1533             aid = tbparams[tb]['allocID']['fedid']
    1534         except:
    1535             self.log.debug("Can't get allocation id?")
    1536             aid = None
     1464            # Give this allocation the rights to access the key of the
     1465            # vlan_id
     1466            try:
     1467                aid = tbparams[tb]['allocID']['fedid']
     1468                self.auth.set_attribute(aid, store_key)
     1469            except:
     1470                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
     1471                        % tb)
     1472
    15371473        connInfo[name] = [ {
     1474            'type': 'transit',
    15381475            'parameter': [ {
    15391476                'name': 'vlan_id',
    1540                 'key': "fedid:%s/vlan%d" % (expid, idx),
     1477                'key': store_key,
    15411478                'store': self.store_url,
    15421479                'type': 'output'
    15431480                } ]
    15441481            } ]
    1545 
    1546         # Give this allocation the rights to access the key of the
    1547         # vlan_id
    1548         if aid:
    1549             self.auth.set_attribute(aid, 'fedid:%s/vlan%d' % (expid, idx))
    15501482
    15511483        topo[name] = \
     
    15541486                            topdl.Attribute(attribute="transit", value='true'),
    15551487                            topdl.Attribute(attribute="dynamic", value='true'),
    1556                             topdl.Attribute(attribute="testbed", value='dragon'),
     1488                            topdl.Attribute(attribute="testbed",
     1489                                value='dragon'),
     1490                            topdl.Attribute(attribute="store_keys",
     1491                                value=store_key),
    15571492                            ]
    15581493                        )
    15591494
    15601495    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
    1561             connInfo, peer={ }, expid=None):
     1496            connInfo, expid=None):
    15621497        """
    15631498        Add attribiutes to the various elements indicating that they are to be
     
    15701505                if s.name == name: return s
    15711506            else: return None
     1507
    15721508
    15731509        mdomain = tbparams[master].get('domain', '.example.com')
     
    15971533                    aid = tbparams[tb]['allocID']['fedid']
    15981534                except:
    1599                     self.log.debug("Can't get allocation id?")
     1535                    self.log.debug("[creat_dragon_substrate] " +
     1536                            "Can't get alloc id for %s?" %tb)
    16001537                    aid = None
    16011538
     
    16121549                                    if isinstance(i.element, topdl.Computer) ],
    16131550                        'fedAttr': [
    1614                             { 'attribute': 'vlan_id',
    1615                                 'value': 'unassigned%d' % dn },
    16161551                            { 'attribute': 'masterdomain', 'value': mdomain},
    16171552                            { 'attribute': 'masterexperiment', 'value':
     
    16261561                            } ]
    16271562                        }
    1628                 if peer.has_key(tb):
    1629                     info['peer'] = peer[tb]
     1563                if tbs.has_key(tb):
     1564                    info['peer'] = tbs[tb]
    16301565                connInfo[tb].append(info)
    16311566
     
    16391574                        "No substrate %s in testbed %s" % (sub.name, tb))
    16401575
    1641         self.new_dragon_topo(dn, sub, topo, tbs, tbparams)
     1576        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
    16421577
    16431578    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
     
    18921827
    18931828                    self.create_dragon_substrate(csub, topo,
    1894                             {tb: 1, master:1}, tbparams, master, eid, connInfo,
    1895                             {tb: ip_addr(mip), master: ip_addr(dip)}, expid)
     1829                            {tb: ip_addr(mip), master: ip_addr(dip)},
     1830                            tbparams, master, eid, connInfo,
     1831                            expid)
    18961832                else:
    18971833                    self.add_control_portal(master, tb, master, eid, topo,
     
    22392175                    del tbparams[tb]
    22402176                    allocated[k] = 1
     2177                    store_keys = t.get_attribute('store_keys')
     2178                    # Give the testbed access to keys it exports or imports
     2179                    if store_keys:
     2180                        for sk in store_keys.split(" "):
     2181                            self.auth.set_attribute(\
     2182                                    tbparams[k]['allocID']['fedid'], sk)
    22412183                else:
    22422184                    raise service_error(service_error.internal,
     
    27482690            if v is not None:
    27492691                rv['value'] = v
     2692            self.log.debug("[GetValue] got %s from %s" % (v, name))
    27502693            return rv
    27512694        else:
     
    27692712                self.synch_store.set_value(name, v)
    27702713                self.write_store()
     2714                self.log.debug("[SetValue] set %s to %s" % (name, v))
    27712715            except synch_store.CollisionError:
    27722716                # Translate into a service_error
Note: See TracChangeset for help on using the changeset viewer.