source: fedd/federation/experiment_partition.py @ f9c2f63

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f9c2f63 was 1e7f268, checked in by Ted Faber <faber@…>, 15 years ago

Make computers single named

  • Property mode set to 100644
File size: 19.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58
59
60    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
61            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
62            expid=None):
63        """
64        Return a new internet portal node and a dict with the connectionInfo to
65        be attached.
66        """
67        seer_master = None
68        for m in masters.values():
69            for s in m:
70                if s.name == 'SEER':
71                    seer_master = m
72                    break
73            if seer_master: break
74
75        if seer_master:
76            mdomain = tbparams[seer_master].get('domain', '.example.com')
77            mproject = tbparams[seer_master].get('project', 'project')
78            muser = tbparams[seer_master].get('user', 'root')
79            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
80        else:
81            mdomain = '.example.com'
82            mproject = 'project'
83            muser = 'root'
84            smbshare = 'USERS'
85
86        dproject = tbparams[dt].get('project', 'project')
87        ddomain = tbparams[dt].get('domain', '.example.com')
88
89        if (st in masters and dt not in masters) or \
90                ( st not in masters and dt in masters ):
91            active = ("%s" % (st in masters))
92        else:
93            active = ("%s" % (st > dt))
94
95        ifaces = [ ]
96        for sub, attrs in iface_desc:
97            inf = topdl.Interface(
98                    name="inf%03d" % len(ifaces),
99                    substrate=sub,
100                    attribute=[
101                        topdl.Attribute(
102                            attribute=n,
103                            value = v)
104                        for n, v in attrs
105                        ]
106                    )
107            ifaces.append(inf)
108        if conn_type == "ssh":
109            try:
110                aid = tbparams[st]['allocID']['fedid']
111            except:
112                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
113                        % st)
114                aid = None
115            info = {
116                    "type" : conn_type, 
117                    "portal": myname,
118                    'fedAttr': [ 
119                            { 'attribute': 'masterdomain', 'value': mdomain},
120                            { 'attribute': 'masterexperiment', 'value': 
121                                "%s/%s" % (mproject, eid)},
122                            { 'attribute': 'active', 'value': active},
123                            # Move to SMB service description
124                            { 'attribute': 'masteruser', 'value': muser},
125                            { 'attribute': 'smbshare', 'value': smbshare},
126                        ],
127                    'parameter': [
128                        {
129                            'name': 'peer',
130                            'key': 'fedid:%s/%s' % (expid, myname),
131                            'store': self.store_url,
132                            'type': 'output',
133                        },
134                        {
135                            'name': 'ssh_port',
136                            'key': 'fedid:%s/%s-port' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'peer',
142                            'key': 'fedid:%s/%s' % (expid, desthost),
143                            'store': self.store_url,
144                            'type': 'input',
145                        },
146                        {
147                            'name': 'ssh_port',
148                            'key': 'fedid:%s/%s-port' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        ]
153                    }
154            # Give this allocation the rights to access the key of the
155            # peers
156            if aid:
157                for h in (myname, desthost):
158                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
159                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
160                            (expid, h))
161            else:
162                self.log.error("No aid for %s in new_portal_node" % st)
163        else:
164            info = None
165
166        return (topdl.Computer(
167                name=myname,
168                attribute=[ 
169                    topdl.Attribute(attribute=n,value=v)
170                        for n, v in (\
171                            ('portal', 'true'),
172                            ('portal_type', portal_type), 
173                            ('destination_testbed', dt),
174                        )
175                    ],
176                interface=ifaces,
177                ), info)
178
179    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
180        ddomain = tbparams[dt].get('domain', ".example.com")
181        dproject = tbparams[dt].get('project', 'project')
182        tsubstrate = \
183                topdl.Substrate(name='%s-%s' % (st, dt),
184                        attribute= [
185                            topdl.Attribute(
186                                attribute='portal',
187                                value='true')
188                            ]
189                        )
190        segment_element = topdl.Segment(
191                id= tbparams[dt]['allocID'],
192                type='emulab',
193                uri = self.tbmap.get(testbed_base(dt), None),
194                interface=[ 
195                    topdl.Interface(
196                        substrate=tsubstrate.name),
197                    ],
198                attribute = [
199                    topdl.Attribute(attribute=n, value=v)
200                        for n, v in (\
201                            ('domain', ddomain),
202                            ('experiment', "%s/%s" % \
203                                    (dproject, eid)),)
204                    ],
205                )
206
207        return (tsubstrate, segment_element)
208
209    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
210            tb_name):
211        if sub.capacity is None:
212            raise service_error(service_error.internal,
213                    "Cannot DRAGON split substrate w/o capacity")
214        segs = [ ]
215        name = join_testbed(tb_name, "%d" % idx)
216        substr = topdl.Substrate(name=name, 
217                capacity=sub.capacity.clone(),
218                attribute=[ topdl.Attribute(attribute=n, value=v)
219                    for n, v, in (\
220                            ('vlan', 'unassigned%d' % idx),)])
221        store_key = 'fedid:%s/vlan%d' % (expid, idx)
222        for tb in tbs.keys():
223            seg = topdl.Segment(
224                    id = tbparams[tb]['allocID'],
225                    type='emulab',
226                    uri = self.tbmap.get(testbed_base(tb), None),
227                    interface=[ 
228                        topdl.Interface(
229                            substrate=substr.name),
230                        ],
231                    attribute=[ topdl.Attribute(
232                        attribute='%s_endpoint' % tb_name, 
233                        value=tbparams[tb][tb_name]),
234                        ]
235                    )
236            vlan_key = "%s_vlans" % tb_name
237            if vlan_key in tbparams[tb]:
238                seg.set_attribute(vlan_key, tbparams[tb][vlan_key])
239            segs.append(seg)
240
241            # Give this allocation the rights to access the key of the
242            # vlan_id
243            try:
244                aid = tbparams[tb]['allocID']['fedid']
245                self.auth.set_attribute(aid, store_key)
246            except:
247                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
248                        % tb)
249
250        connInfo[name] = [ { 
251            'type': 'transit',
252            'parameter': [ { 
253                'name': 'vlan_id',
254                'key': store_key,
255                'store': self.store_url,
256                'type': 'output'
257                } ]
258            } ]
259
260        topo[name] = \
261                topdl.Topology(substrates=[substr], elements=segs,
262                        attribute=[
263                            topdl.Attribute(attribute="transit", value='true'),
264                            topdl.Attribute(attribute="dynamic", value='true'),
265                            topdl.Attribute(attribute="testbed", value=tb_name),
266                            topdl.Attribute(attribute="store_keys", 
267                                value=store_key),
268                            ]
269                        )
270
271    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
272            connInfo, expid=None, tb_name=None):
273        """
274        Add attribiutes to the various elements indicating that they are to be
275        dragon connected and create a dragon segment in topo to be
276        instantiated.
277        """
278
279        def get_substrate_from_topo(name, t):
280            for s in t.substrates:
281                if s.name == name: return s
282            else: return None
283
284
285        seer_master = None
286        for m in masters.values():
287            for s in m:
288                if s.name == 'SEER':
289                    seer_master = m
290                    break
291            if seer_master: break
292
293        if seer_master:
294            mdomain = tbparams[seer_master].get('domain', '.example.com')
295            mproject = tbparams[seer_master].get('project', 'project')
296        else:
297            mdomain = '.example.com'
298            mproject = 'project'
299
300        # dn is the number of previously created direct nets on this direct
301        # testbed.  This routine creates a net numbered by dn
302        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
303        # Count the number of interfaces on this substrate in each testbed from
304        # the global topology
305        count = { }
306        node = { }
307        for e in [ i.element for i in sub.interfaces ]:
308            tb = e.get_attribute('testbed')
309            count[tb] = count.get(tb, 0) + 1
310            node[tb] = i.get_attribute('ip4_address')
311
312
313        # Set the attributes in the copies that will allow setup of direct
314        # connections.
315        for tb in tbs.keys():
316            s = get_substrate_from_topo(sub.name, topo[tb])
317            if s:
318                if not connInfo.has_key(tb):
319                    connInfo[tb] = [ ]
320
321                try:
322                    aid = tbparams[tb]['allocID']['fedid']
323                except:
324                    self.log.debug("[create_direct_substrate] " + 
325                            "Can't get alloc id for %s?" %tb)
326                    aid = None
327
328                # This may need another look, but only a service gateway will
329                # look at the active parameter, and these are only inserted to
330                # connect to a master.
331                active = "%s" % ( tb in masters)
332                info = {
333                        'type': 'transit',
334                        'member': [ {
335                            'element': i.element.name, 
336                            'interface': i.name
337                            } for i in s.interfaces \
338                                    if isinstance(i.element, topdl.Computer) ],
339                        'fedAttr': [ 
340                            { 'attribute': 'masterdomain', 'value': mdomain},
341                            { 'attribute': 'masterexperiment', 'value': 
342                                "%s/%s" % (mproject, eid)},
343                            { 'attribute': 'active', 'value': active},
344                            ],
345                        'parameter': [ {
346                            'name': 'vlan_id',
347                            'key': 'fedid:%s/vlan%d' % (expid, dn),
348                            'store': self.store_url,
349                            'type': 'input',
350                            } ]
351                        }
352                if tbs.has_key(tb):
353                    info['peer'] = tbs[tb]
354                connInfo[tb].append(info)
355
356                # Give this allocation the rights to access the key of the
357                # vlan_id
358                if aid:
359                    self.auth.set_attribute(aid, 
360                            'fedid:%s/vlan%d' % (expid, dn))
361            else:
362                raise service_error(service_error.internal,
363                        "No substrate %s in testbed %s" % (sub.name, tb))
364
365        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
366                tb_name)
367
368    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
369            segment_substrate, portals, connInfo, expid):
370        # More than one testbed is on this substrate.  Insert
371        # some portals into the subtopologies.  st == source testbed,
372        # dt == destination testbed.
373        for st in tbs.keys():
374            if not segment_substrate.has_key(st):
375                segment_substrate[st] = { }
376            if not portals.has_key(st): 
377                portals[st] = { }
378            if not connInfo.has_key(st):
379                connInfo[st] = [ ]
380            for dt in [ t for t in tbs.keys() if t != st]:
381                sproject = tbparams[st].get('project', 'project')
382                dproject = tbparams[dt].get('project', 'project')
383                sdomain = tbparams[st].get('domain', ".example.com")
384                ddomain = tbparams[dt].get('domain', ".example.com")
385                aid = tbparams[dt]['allocID']['fedid']
386
387                seer_master = None
388
389                for m in masters.values():
390                    for s in m:
391                        if s.name == 'SEER':
392                            seer_master = m
393                            break
394                    if seer_master: break
395
396                if seer_master:
397                    mdomain = tbparams[seer_master].get('domain', '.example.com')
398                    mproject = tbparams[seer_master].get('project', 'project')
399                    muser = tbparams[seer_master].get('user', 'root')
400                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
401                else:
402                    mdomain = '.example.com'
403                    mproject = 'project'
404                    muser = 'root'
405                    smbshare = 'USERS'
406
407                if (st in masters  and dt not in masters) or \
408                        (st not in masters and dt in masters):
409                    active = ("%s" % (st in masters))
410                else:
411                    active = ("%s" %(st > dt))
412
413                if not segment_substrate[st].has_key(dt):
414                    # Put a substrate and a segment for the connected
415                    # testbed in there.
416                    tsubstrate, segment_element = \
417                            self.new_portal_substrate(st, dt, eid, tbparams,
418                                    expid)
419                    segment_substrate[st][dt] = tsubstrate
420                    topo[st].substrates.append(tsubstrate)
421                    topo[st].elements.append(segment_element)
422
423                new_portal = False
424                dname = "-".join(split_testbed(dt))
425                sname = "-".join(split_testbed(st))
426                if portals[st].has_key(dt):
427                    # There's a portal set up to go to this destination.
428                    # See if there's room to multiplex this connection on
429                    # it.  If so, add an interface to the portal; if not,
430                    # set up to add a portal below.
431                    # [This little festival of braces is just a pop of the
432                    # last element in the list of portals between st and
433                    # dt.]
434                    portal = portals[st][dt][-1]
435                    mux = len([ i for i in portal.interface \
436                            if not i.get_attribute('portal')])
437                    if mux == self.muxmax:
438                        new_portal = True
439                        portal_type = "experiment"
440                        myname = "%stunnel%d" % (dname.lower(),
441                                len(portals[st][dt]))
442                        desthost = "%stunnel%d" % (sname.lower(), 
443                                len(portals[st][dt]))
444                    else:
445                        new_i = topdl.Interface(
446                                substrate=sub.name,
447                                attribute=[ 
448                                    topdl.Attribute(
449                                        attribute='ip4_address', 
450                                        value=tbs[dt]
451                                    )
452                                ])
453                        portal.interface.append(new_i)
454                else:
455                    # First connection to this testbed, make an empty list
456                    # and set up to add the new portal below
457                    new_portal = True
458                    portals[st][dt] = [ ]
459                    myname = "%stunnel%d" % (dname.lower(),
460                            len(portals[st][dt]))
461                    desthost = "%stunnel%d" % (sname.lower(),
462                            len(portals[st][dt]))
463
464                    if dt in masters or st in masters: portal_type = "both"
465                    else: portal_type = "experiment"
466
467                if new_portal:
468                    infs = (
469                            (segment_substrate[st][dt].name, 
470                                (('portal', 'true'),)),
471                            (sub.name, 
472                                (('ip4_address', tbs[dt]),))
473                        )
474                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
475                            masters, eid, myname, desthost, portal_type,
476                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
477
478                    topo[st].elements.append(portal)
479                    portals[st][dt].append(portal)
480                    connInfo[st].append(info)
481
482    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
483            connInfo, expid):
484        # Add to the master testbed
485        tsubstrate, segment_element = \
486                self.new_portal_substrate(st, dt, eid, tbparams, expid)
487        myname = "%stunnel" % dt
488        desthost = "%stunnel" % st
489
490        portal, info = self.new_portal_node(st, dt, tbparams, masters,
491                eid, myname, desthost, "control", 
492                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
493                conn_attrs=[], expid=expid)
494
495        topo[st].substrates.append(tsubstrate)
496        topo[st].elements.append(segment_element)
497        topo[st].elements.append(portal)
498        if not connInfo.has_key(st):
499            connInfo[st] = [ ]
500        connInfo[st].append(info)
501
502    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
503            substrate, tbparams, expid, tb_name):
504        # Add to the master testbed
505        myname = "%stunnel" % dt
506        desthost = "%s" % ip_addr(dip)
507
508        portal, info = self.new_portal_node(st, dt, tbparams, masters,
509                eid, myname, desthost, "control", 
510                ((substrate.name,(
511                    ('portal','true'),
512                    ('ip4_address', "%s" % ip_addr(myip)),)),),
513                conn_type="transit", conn_attrs=[], expid=expid)
514
515        return portal
516
517    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
518            connInfo, expid):
519        """
520        For each substrate in the main topology, find those that
521        have nodes on more than one testbed.  Insert portal nodes
522        into the copies of those substrates on the sub topologies.
523        """
524        segment_substrate = { }
525        portals = { }
526        for s in top.substrates:
527            # tbs will contain an ip address on this subsrate that is in
528            # each testbed.
529            tbs = { }
530            for i in s.interfaces:
531                e = i.element
532                tb = e.get_attribute('testbed')
533                if tb and not tbs.has_key(tb):
534                    for i in e.interface:
535                        if s in i.subs:
536                            tbs[tb]= i.get_attribute('ip4_address')
537            if len(tbs) < 2:
538                continue
539
540            base_tbs = set([testbed_base(t) for t in tbs])
541
542            # DRAGON will not create multi-site vlans yet, so we don't do multi
543            # connection direct transits yet.
544            if len(tbs) == 2 :
545                # NB: the else if on the for loop - if none of the direct
546                # transits is applicable, use the internet.
547                for d in self.direct_transit:
548                    if all([tbparams[x].has_key(d) for x in tbs]):
549                        self.create_direct_substrate(s, topo, tbs, tbparams, 
550                                masters, eid, connInfo, expid, d)
551                        break
552                else:
553                    self.insert_internet_portals(s, topo, tbs, tbparams,
554                            masters, eid, segment_substrate, portals,
555                            connInfo, expid)
556            else:
557                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
558                        eid, segment_substrate, portals, connInfo, expid)
559
560
561        # Make sure that all the service importers have a control portal back
562        # to the master for each service.
563        for mtb in [ t for t in tbparams if t in masters ]:
564            importers = set([])
565            for m in masters[mtb]:
566                importers |= set(m.importers)
567            for tb in importers:
568                if len([e for e in topo[tb].elements \
569                        if isinstance(e, topdl.Computer) and \
570                        e.get_attribute('destination_testbed') == mtb and \
571                        e.get_attribute('portal') and \
572                        e.get_attribute('portal_type') == 'both']) == 0:
573
574                    for tb_name in self.direct_transit:
575
576                        if tbparams[mtb].has_key(tb_name) \
577                                and tbparams[tb].has_key(tb_name):
578
579                            idx = len([x for x in topo.keys() \
580                                    if x.startswith(tb_name)])
581                            dip, leng = ip_allocator.allocate(4)
582                            dip += 1
583                            mip = dip+1
584                            csub = topdl.Substrate(
585                                    name="%s-control-%s" % (tb_name, tb),
586                                    capacity=topdl.Capacity(100000.0, 'max'),
587                                    attribute=[
588                                        topdl.Attribute(
589                                            attribute='portal',
590                                            value='true'
591                                            )
592                                        ]
593                                    )
594                            seg = topdl.Segment(
595                                    id= tbparams[mtb]['allocID'],
596                                    type='emulab',
597                                    uri = self.tbmap.get(testbed_base(mtb),
598                                        None),
599                                    interface=[ 
600                                        topdl.Interface(
601                                            substrate=csub.name),
602                                        ],
603                                    attribute = [
604                                        topdl.Attribute(attribute=n, value=v)
605                                            for n, v in (\
606                                                ('domain', 
607                                                    tbparams[mtb].get('domain',
608                                                        ".example.com")),
609                                                ('experiment', "%s/%s" % \
610                                                        (tbparams[mtb].get(
611                                                            'project', 
612                                                            'project'), 
613                                                            eid)),)
614                                        ],
615                                    )
616                            portal = self.new_direct_portal(tb, mtb,
617                                    masters, eid, dip, mip, idx, csub,
618                                    tbparams, expid, tb_name)
619                            topo[tb].substrates.append(csub)
620                            topo[tb].elements.append(portal)
621                            topo[tb].elements.append(seg)
622
623                            mcsub = csub.clone()
624                            seg = topdl.Segment(
625                                    id= tbparams[tb]['allocID'],
626                                    type='emulab',
627                                    uri = self.tbmap.get(testbed_base(tb),
628                                        None),
629                                    interface=[ 
630                                        topdl.Interface(
631                                            substrate=csub.name),
632                                        ],
633                                    attribute = [
634                                        topdl.Attribute(attribute=n, value=v)
635                                            for n, v in (\
636                                                ('domain', 
637                                                    tbparams[tb].get('domain',
638                                                        ".example.com")),
639                                                ('experiment', "%s/%s" % \
640                                                        (tbparams[tb].get('project', 
641                                                            'project'), 
642                                                            eid)),)
643                                        ],
644                                    )
645                            portal = self.new_direct_portal(mtb, tb, masters,
646                                    eid, mip, dip, idx, mcsub, tbparams, expid,
647                                    tb_name)
648                            topo[mtb].substrates.append(mcsub)
649                            topo[mtb].elements.append(portal)
650                            topo[mtb].elements.append(seg)
651                            for t in (mtb, tb):
652                                topo[t].incorporate_elements()
653
654                            self.create_direct_substrate(csub, topo, 
655                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
656                                    tbparams, masters, eid, connInfo,
657                                    expid, tb_name)
658                            break
659                    # This matches with the for tb_name in self.direct_transit
660                    else:
661                        self.add_control_portal(mtb, tb, masters, eid, topo, 
662                                tbparams, connInfo, expid)
663                        self.add_control_portal(tb, mtb, masters, eid, topo, 
664                                tbparams, connInfo, expid)
665
666        # Connect the portal nodes into the topologies and clear out
667        # substrates that are not in the topologies
668        for tb in tbparams:
669            topo[tb].incorporate_elements()
670            topo[tb].substrates = \
671                    [s for s in topo[tb].substrates \
672                        if len(s.interfaces) >0]
673
Note: See TracBrowser for help on using the repository browser.