source: fedd/federation/experiment_partition.py @ 0ea75d2

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0ea75d2 was 0ea75d2, checked in by Ted Faber <faber@…>, 15 years ago

Plain bug. Don't know how this survived so long.

  • Property mode set to 100644
File size: 19.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58
59
60    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
61            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
62            expid=None):
63        """
64        Return a new internet portal node and a dict with the connectionInfo to
65        be attached.
66        """
67        seer_master = None
68        for m in masters.values():
69            for s in m:
70                if s.name == 'SEER':
71                    seer_master = m
72                    break
73            if seer_master: break
74
75        if seer_master:
76            mdomain = tbparams[seer_master].get('domain', '.example.com')
77            mproject = tbparams[seer_master].get('project', 'project')
78            muser = tbparams[seer_master].get('user', 'root')
79            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
80        else:
81            mdomain = '.example.com'
82            mproject = 'project'
83            muser = 'root'
84            smbshare = 'USERS'
85
86        dproject = tbparams[dt].get('project', 'project')
87        ddomain = tbparams[dt].get('domain', '.example.com')
88
89        if (st in masters and dt not in masters) or \
90                ( st not in masters and dt in masters ):
91            active = ("%s" % (st in masters))
92        else:
93            active = ("%s" % (st > dt))
94
95        ifaces = [ ]
96        for sub, attrs in iface_desc:
97            inf = topdl.Interface(
98                    name="inf%03d" % len(ifaces),
99                    substrate=sub,
100                    attribute=[
101                        topdl.Attribute(
102                            attribute=n,
103                            value = v)
104                        for n, v in attrs
105                        ]
106                    )
107            ifaces.append(inf)
108        if conn_type == "ssh":
109            try:
110                aid = tbparams[st]['allocID']['fedid']
111            except:
112                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
113                        % st)
114                aid = None
115            info = {
116                    "type" : conn_type, 
117                    "portal": myname,
118                    'fedAttr': [ 
119                            { 'attribute': 'masterdomain', 'value': mdomain},
120                            { 'attribute': 'masterexperiment', 'value': 
121                                "%s/%s" % (mproject, eid)},
122                            { 'attribute': 'active', 'value': active},
123                            # Move to SMB service description
124                            { 'attribute': 'masteruser', 'value': muser},
125                            { 'attribute': 'smbshare', 'value': smbshare},
126                        ],
127                    'parameter': [
128                        {
129                            'name': 'peer',
130                            'key': 'fedid:%s/%s' % (expid, myname),
131                            'store': self.store_url,
132                            'type': 'output',
133                        },
134                        {
135                            'name': 'ssh_port',
136                            'key': 'fedid:%s/%s-port' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'peer',
142                            'key': 'fedid:%s/%s' % (expid, desthost),
143                            'store': self.store_url,
144                            'type': 'input',
145                        },
146                        {
147                            'name': 'ssh_port',
148                            'key': 'fedid:%s/%s-port' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        ]
153                    }
154            # Give this allocation the rights to access the key of the
155            # peers
156            if aid:
157                for h in (myname, desthost):
158                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
159                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
160                            (expid, h))
161            else:
162                self.log.error("No aid for %s in new_portal_node" % st)
163        else:
164            info = None
165
166        return (topdl.Computer(
167                name=myname,
168                attribute=[ 
169                    topdl.Attribute(attribute=n,value=v)
170                        for n, v in (\
171                            ('portal', 'true'),
172                            ('portal_type', portal_type), 
173                            ('destination_testbed', dt),
174                        )
175                    ],
176                interface=ifaces,
177                ), info)
178
179    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
180        ddomain = tbparams[dt].get('domain', ".example.com")
181        dproject = tbparams[dt].get('project', 'project')
182        tsubstrate = \
183                topdl.Substrate(name='%s-%s' % (st, dt),
184                        attribute= [
185                            topdl.Attribute(
186                                attribute='portal',
187                                value='true')
188                            ]
189                        )
190        segment_element = topdl.Segment(
191                id= tbparams[dt]['allocID'],
192                type='emulab',
193                uri = self.tbmap.get(testbed_base(dt), None),
194                interface=[ 
195                    topdl.Interface(
196                        substrate=tsubstrate.name),
197                    ],
198                attribute = [
199                    topdl.Attribute(attribute=n, value=v)
200                        for n, v in (\
201                            ('domain', ddomain),
202                            ('experiment', "%s/%s" % \
203                                    (dproject, eid)),)
204                    ],
205                )
206
207        return (tsubstrate, segment_element)
208
209    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
210            tb_name):
211        if sub.capacity is None:
212            raise service_error(service_error.internal,
213                    "Cannot DRAGON split substrate w/o capacity")
214        segs = [ ]
215        name = join_testbed(tb_name, "%d" % idx)
216        substr = topdl.Substrate(name=name, 
217                capacity=sub.capacity.clone(),
218                attribute=[ topdl.Attribute(attribute=n, value=v)
219                    for n, v, in (\
220                            ('vlan', 'unassigned%d' % idx),)])
221        store_key = 'fedid:%s/vlan%d' % (expid, idx)
222        for tb in tbs.keys():
223            seg = topdl.Segment(
224                    id = tbparams[tb]['allocID'],
225                    type='emulab',
226                    uri = self.tbmap.get(testbed_base(tb), None),
227                    interface=[ 
228                        topdl.Interface(
229                            substrate=substr.name),
230                        ],
231                    attribute=[ topdl.Attribute(
232                        attribute='%s_endpoint' % tb_name, 
233                        value=tbparams[tb][tb_name]),
234                        ]
235                    )
236            vlan_key = "%s_vlans" % tb_name
237            if vlan_key in tbparams[tb]:
238                seg.set_attribute(vlan_key, tbparams[tb][vlan_key])
239            segs.append(seg)
240
241            # Give this allocation the rights to access the key of the
242            # vlan_id
243            try:
244                aid = tbparams[tb]['allocID']['fedid']
245                self.auth.set_attribute(aid, store_key)
246            except:
247                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
248                        % tb)
249
250        connInfo[name] = [ { 
251            'type': 'transit',
252            'parameter': [ { 
253                'name': 'vlan_id',
254                'key': store_key,
255                'store': self.store_url,
256                'type': 'output'
257                } ]
258            } ]
259
260        topo[name] = \
261                topdl.Topology(substrates=[substr], elements=segs,
262                        attribute=[
263                            topdl.Attribute(attribute="transit", value='true'),
264                            topdl.Attribute(attribute="dynamic", value='true'),
265                            topdl.Attribute(attribute="testbed", value=tb_name),
266                            topdl.Attribute(attribute="store_keys", 
267                                value=store_key),
268                            ]
269                        )
270
271    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
272            connInfo, expid=None, tb_name=None):
273        """
274        Add attribiutes to the various elements indicating that they are to be
275        dragon connected and create a dragon segment in topo to be
276        instantiated.
277        """
278
279        def get_substrate_from_topo(name, t):
280            for s in t.substrates:
281                if s.name == name: return s
282            else: return None
283
284
285        seer_master = None
286        for m in masters.values():
287            for s in m:
288                if s.name == 'SEER':
289                    seer_master = m
290                    break
291            if seer_master: break
292
293        if seer_master:
294            mdomain = tbparams[seer_master].get('domain', '.example.com')
295            mproject = tbparams[seer_master].get('project', 'project')
296        else:
297            mdomain = '.example.com'
298            mproject = 'project'
299
300        # dn is the number of previously created direct nets on this direct
301        # testbed.  This routine creates a net numbered by dn
302        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
303        # Count the number of interfaces on this substrate in each testbed from
304        # the global topology
305        count = { }
306        node = { }
307        for e in [ i.element for i in sub.interfaces ]:
308            tb = e.get_attribute('testbed')
309            count[tb] = count.get(tb, 0) + 1
310            node[tb] = i.get_attribute('ip4_address')
311
312
313        # Set the attributes in the copies that will allow setup of direct
314        # connections.
315        for tb in tbs.keys():
316            s = get_substrate_from_topo(sub.name, topo[tb])
317            if s:
318                if not connInfo.has_key(tb):
319                    connInfo[tb] = [ ]
320
321                try:
322                    aid = tbparams[tb]['allocID']['fedid']
323                except:
324                    self.log.debug("[create_direct_substrate] " + 
325                            "Can't get alloc id for %s?" %tb)
326                    aid = None
327
328                # This may need another look, but only a service gateway will
329                # look at the active parameter, and these are only inserted to
330                # connect to a master.
331                active = "%s" % ( tb in masters)
332                info = {
333                        'type': 'transit',
334                        'member': [ {
335                            'element': i.element.name, 
336                            'interface': i.name
337                            } for i in s.interfaces \
338                                    if isinstance(i.element, topdl.Computer) ],
339                        'fedAttr': [ 
340                            { 'attribute': 'masterdomain', 'value': mdomain},
341                            { 'attribute': 'masterexperiment', 'value': 
342                                "%s/%s" % (mproject, eid)},
343                            { 'attribute': 'active', 'value': active},
344                            ],
345                        'parameter': [ {
346                            'name': 'vlan_id',
347                            'key': 'fedid:%s/vlan%d' % (expid, dn),
348                            'store': self.store_url,
349                            'type': 'input',
350                            } ]
351                        }
352                if tbs.has_key(tb):
353                    info['peer'] = tbs[tb]
354                connInfo[tb].append(info)
355
356                # Give this allocation the rights to access the key of the
357                # vlan_id
358                if aid:
359                    self.auth.set_attribute(aid, 
360                            'fedid:%s/vlan%d' % (expid, dn))
361            else:
362                raise service_error(service_error.internal,
363                        "No substrate %s in testbed %s" % (sub.name, tb))
364
365        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
366                tb_name)
367
368    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
369            segment_substrate, portals, connInfo, expid):
370        # More than one testbed is on this substrate.  Insert
371        # some portals into the subtopologies.  st == source testbed,
372        # dt == destination testbed.
373        for st in tbs.keys():
374            if not segment_substrate.has_key(st):
375                segment_substrate[st] = { }
376            if not portals.has_key(st): 
377                portals[st] = { }
378            if not connInfo.has_key(st):
379                connInfo[st] = [ ]
380            for dt in [ t for t in tbs.keys() if t != st]:
381                sproject = tbparams[st].get('project', 'project')
382                dproject = tbparams[dt].get('project', 'project')
383                sdomain = tbparams[st].get('domain', ".example.com")
384                ddomain = tbparams[dt].get('domain', ".example.com")
385                aid = tbparams[dt]['allocID']['fedid']
386
387                seer_master = None
388
389                for m in masters.values():
390                    for s in m:
391                        if s.name == 'SEER':
392                            seer_master = m
393                            break
394                    if seer_master: break
395
396                if seer_master:
397                    mdomain = tbparams[seer_master].get('domain', '.example.com')
398                    mproject = tbparams[seer_master].get('project', 'project')
399                    muser = tbparams[seer_master].get('user', 'root')
400                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
401                else:
402                    mdomain = '.example.com'
403                    mproject = 'project'
404                    muser = 'root'
405                    smbshare = 'USERS'
406
407                if (st in masters  and dt not in masters) or \
408                        (st not in masters and dt in masters):
409                    active = ("%s" % (st in masters))
410                else:
411                    active = ("%s" %(st > dt))
412
413                if not segment_substrate[st].has_key(dt):
414                    # Put a substrate and a segment for the connected
415                    # testbed in there.
416                    tsubstrate, segment_element = \
417                            self.new_portal_substrate(st, dt, eid, tbparams,
418                                    expid)
419                    segment_substrate[st][dt] = tsubstrate
420                    topo[st].substrates.append(tsubstrate)
421                    topo[st].elements.append(segment_element)
422
423                new_portal = False
424                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
425                else: dname = dt
426
427                if testbed_suffix(st): sname = "-".join(split_testbed(st))
428                else: sname = st
429
430                if portals[st].has_key(dt):
431                    # There's a portal set up to go to this destination.
432                    # See if there's room to multiplex this connection on
433                    # it.  If so, add an interface to the portal; if not,
434                    # set up to add a portal below.
435                    # [This little festival of braces is just a pop of the
436                    # last element in the list of portals between st and
437                    # dt.]
438                    portal = portals[st][dt][-1]
439                    mux = len([ i for i in portal.interface \
440                            if not i.get_attribute('portal')])
441                    if mux == self.muxmax:
442                        new_portal = True
443                        portal_type = "experiment"
444                        myname = "%stunnel%d" % (dname.lower(),
445                                len(portals[st][dt]))
446                        desthost = "%stunnel%d" % (sname.lower(), 
447                                len(portals[st][dt]))
448                    else:
449                        new_i = topdl.Interface(
450                                substrate=sub.name,
451                                attribute=[ 
452                                    topdl.Attribute(
453                                        attribute='ip4_address', 
454                                        value=tbs[dt]
455                                    )
456                                ])
457                        portal.interface.append(new_i)
458                else:
459                    # First connection to this testbed, make an empty list
460                    # and set up to add the new portal below
461                    new_portal = True
462                    portals[st][dt] = [ ]
463                    myname = "%stunnel%d" % (dname.lower(),
464                            len(portals[st][dt]))
465                    desthost = "%stunnel%d" % (sname.lower(),
466                            len(portals[st][dt]))
467
468                    if dt in masters or st in masters: portal_type = "both"
469                    else: portal_type = "experiment"
470
471                if new_portal:
472                    infs = (
473                            (segment_substrate[st][dt].name, 
474                                (('portal', 'true'),)),
475                            (sub.name, 
476                                (('ip4_address', tbs[dt]),))
477                        )
478                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
479                            masters, eid, myname, desthost, portal_type,
480                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
481
482                    topo[st].elements.append(portal)
483                    portals[st][dt].append(portal)
484                    connInfo[st].append(info)
485
486    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
487            connInfo, expid):
488        # Add to the master testbed
489        tsubstrate, segment_element = \
490                self.new_portal_substrate(st, dt, eid, tbparams, expid)
491        myname = "%stunnel" % dt
492        desthost = "%stunnel" % st
493
494        portal, info = self.new_portal_node(st, dt, tbparams, masters,
495                eid, myname, desthost, "control", 
496                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
497                conn_attrs=[], expid=expid)
498
499        topo[st].substrates.append(tsubstrate)
500        topo[st].elements.append(segment_element)
501        topo[st].elements.append(portal)
502        if not connInfo.has_key(st):
503            connInfo[st] = [ ]
504        connInfo[st].append(info)
505
506    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
507            substrate, tbparams, expid, tb_name):
508        # Add to the master testbed
509        myname = "%stunnel" % dt
510        desthost = "%s" % ip_addr(dip)
511
512        portal, info = self.new_portal_node(st, dt, tbparams, masters,
513                eid, myname, desthost, "control", 
514                ((substrate.name,(
515                    ('portal','true'),
516                    ('ip4_address', "%s" % ip_addr(myip)),)),),
517                conn_type="transit", conn_attrs=[], expid=expid)
518
519        return portal
520
521    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
522            connInfo, expid):
523        """
524        For each substrate in the main topology, find those that
525        have nodes on more than one testbed.  Insert portal nodes
526        into the copies of those substrates on the sub topologies.
527        """
528        segment_substrate = { }
529        portals = { }
530        for s in top.substrates:
531            # tbs will contain an ip address on this subsrate that is in
532            # each testbed.
533            tbs = { }
534            for i in s.interfaces:
535                e = i.element
536                tb = e.get_attribute('testbed')
537                if tb and not tbs.has_key(tb):
538                    for i in e.interface:
539                        if s in i.subs:
540                            tbs[tb]= i.get_attribute('ip4_address')
541            if len(tbs) < 2:
542                continue
543
544            base_tbs = set([testbed_base(t) for t in tbs])
545
546            # DRAGON will not create multi-site vlans yet, so we don't do multi
547            # connection direct transits yet.
548            if len(tbs) == 2 :
549                # NB: the else if on the for loop - if none of the direct
550                # transits is applicable, use the internet.
551                for d in self.direct_transit:
552                    if all([tbparams[x].has_key(d) for x in tbs]):
553                        self.create_direct_substrate(s, topo, tbs, tbparams, 
554                                masters, eid, connInfo, expid, d)
555                        break
556                else:
557                    self.insert_internet_portals(s, topo, tbs, tbparams,
558                            masters, eid, segment_substrate, portals,
559                            connInfo, expid)
560            else:
561                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
562                        eid, segment_substrate, portals, connInfo, expid)
563
564
565        # Make sure that all the service importers have a control portal back
566        # to the master for each service.
567        for mtb in [ t for t in tbparams if t in masters ]:
568            importers = set([])
569            for m in masters[mtb]:
570                importers |= set(m.importers)
571            for tb in importers:
572                if len([e for e in topo[tb].elements \
573                        if isinstance(e, topdl.Computer) and \
574                        e.get_attribute('destination_testbed') == mtb and \
575                        e.get_attribute('portal') and \
576                        e.get_attribute('portal_type') == 'both']) == 0:
577
578                    for tb_name in self.direct_transit:
579
580                        if tbparams[mtb].has_key(tb_name) \
581                                and tbparams[tb].has_key(tb_name):
582
583                            idx = len([x for x in topo.keys() \
584                                    if x.startswith(tb_name)])
585                            dip, leng = ip_allocator.allocate(4)
586                            dip += 1
587                            mip = dip+1
588                            csub = topdl.Substrate(
589                                    name="%s-control-%s" % (tb_name, tb),
590                                    capacity=topdl.Capacity(100000.0, 'max'),
591                                    attribute=[
592                                        topdl.Attribute(
593                                            attribute='portal',
594                                            value='true'
595                                            )
596                                        ]
597                                    )
598                            seg = topdl.Segment(
599                                    id= tbparams[mtb]['allocID'],
600                                    type='emulab',
601                                    uri = self.tbmap.get(testbed_base(mtb),
602                                        None),
603                                    interface=[ 
604                                        topdl.Interface(
605                                            substrate=csub.name),
606                                        ],
607                                    attribute = [
608                                        topdl.Attribute(attribute=n, value=v)
609                                            for n, v in (\
610                                                ('domain', 
611                                                    tbparams[mtb].get('domain',
612                                                        ".example.com")),
613                                                ('experiment', "%s/%s" % \
614                                                        (tbparams[mtb].get(
615                                                            'project', 
616                                                            'project'), 
617                                                            eid)),)
618                                        ],
619                                    )
620                            portal = self.new_direct_portal(tb, mtb,
621                                    masters, eid, dip, mip, idx, csub,
622                                    tbparams, expid, tb_name)
623                            topo[tb].substrates.append(csub)
624                            topo[tb].elements.append(portal)
625                            topo[tb].elements.append(seg)
626
627                            mcsub = csub.clone()
628                            seg = topdl.Segment(
629                                    id= tbparams[tb]['allocID'],
630                                    type='emulab',
631                                    uri = self.tbmap.get(testbed_base(tb),
632                                        None),
633                                    interface=[ 
634                                        topdl.Interface(
635                                            substrate=csub.name),
636                                        ],
637                                    attribute = [
638                                        topdl.Attribute(attribute=n, value=v)
639                                            for n, v in (\
640                                                ('domain', 
641                                                    tbparams[tb].get('domain',
642                                                        ".example.com")),
643                                                ('experiment', "%s/%s" % \
644                                                        (tbparams[tb].get('project', 
645                                                            'project'), 
646                                                            eid)),)
647                                        ],
648                                    )
649                            portal = self.new_direct_portal(mtb, tb, masters,
650                                    eid, mip, dip, idx, mcsub, tbparams, expid,
651                                    tb_name)
652                            topo[mtb].substrates.append(mcsub)
653                            topo[mtb].elements.append(portal)
654                            topo[mtb].elements.append(seg)
655                            for t in (mtb, tb):
656                                topo[t].incorporate_elements()
657
658                            self.create_direct_substrate(csub, topo, 
659                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
660                                    tbparams, masters, eid, connInfo,
661                                    expid, tb_name)
662                            break
663                    # This matches with the for tb_name in self.direct_transit
664                    else:
665                        self.add_control_portal(mtb, tb, masters, eid, topo, 
666                                tbparams, connInfo, expid)
667                        self.add_control_portal(tb, mtb, masters, eid, topo, 
668                                tbparams, connInfo, expid)
669
670        # Connect the portal nodes into the topologies and clear out
671        # substrates that are not in the topologies
672        for tb in tbparams:
673            topo[tb].incorporate_elements()
674            topo[tb].substrates = \
675                    [s for s in topo[tb].substrates \
676                        if len(s.interfaces) >0]
677
Note: See TracBrowser for help on using the repository browser.