source: fedd/federation/experiment_partition.py @ d2e86f6

compt_changesinfo-ops
Last change on this file since d2e86f6 was 9294673, checked in by Ted Faber <faber@…>, 13 years ago

Turn (most) of another free floating dict into a class.

  • Property mode set to 100644
File size: 19.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58
59
60    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
61            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
62            expid=None):
63        """
64        Return a new internet portal node and a dict with the connectionInfo to
65        be attached.
66        """
67        seer_master = None
68        for k, m in masters.items():
69            for s in m:
70                if s.name == 'seer':
71                    seer_master = k
72                    break
73            if seer_master: break
74
75        if seer_master:
76            mdomain = tbparams[seer_master].get_attribute('domain', 
77                    '.example.com')
78            mproject = tbparams[seer_master].get_attribute('project', 
79                    'project')
80            muser = tbparams[seer_master].get_attribute('user', 'root')
81            smbshare = tbparams[seer_master].get_attribute('smbshare', 'USERS')
82        else:
83            mdomain = '.example.com'
84            mproject = 'project'
85            muser = 'root'
86            smbshare = 'USERS'
87
88        dproject = tbparams[dt].get_attribute('project', 'project')
89        ddomain = tbparams[dt].get_attribute('domain', '.example.com')
90
91        if (st in masters and dt not in masters) or \
92                ( st not in masters and dt in masters ):
93            active = ("%s" % (st in masters))
94        else:
95            active = ("%s" % (st > dt))
96
97        ifaces = [ ]
98        for sub, attrs in iface_desc:
99            inf = topdl.Interface(
100                    name="inf%03d" % len(ifaces),
101                    substrate=sub,
102                    attribute=[
103                        topdl.Attribute(
104                            attribute=n,
105                            value = v)
106                        for n, v in attrs
107                        ]
108                    )
109            ifaces.append(inf)
110        if conn_type == "ssh":
111            try:
112                aid = tbparams[st].allocID
113            except:
114                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
115                        % st)
116                aid = None
117            info = {
118                    "type" : conn_type, 
119                    "portal": myname,
120                    'fedAttr': [ 
121                            { 'attribute': 'masterdomain', 'value': mdomain},
122                            { 'attribute': 'masterexperiment', 'value': 
123                                "%s/%s" % (mproject, eid)},
124                            { 'attribute': 'active', 'value': active},
125                            # Move to SMB service description
126                            { 'attribute': 'masteruser', 'value': muser},
127                            { 'attribute': 'smbshare', 'value': smbshare},
128                        ],
129                    'parameter': [
130                        {
131                            'name': 'peer',
132                            'key': 'fedid:%s/%s' % (expid, myname),
133                            'store': self.store_url,
134                            'type': 'output',
135                        },
136                        {
137                            'name': 'ssh_port',
138                            'key': 'fedid:%s/%s-port' % (expid, myname),
139                            'store': self.store_url,
140                            'type': 'output',
141                        },
142                        {
143                            'name': 'peer',
144                            'key': 'fedid:%s/%s' % (expid, desthost),
145                            'store': self.store_url,
146                            'type': 'input',
147                        },
148                        {
149                            'name': 'ssh_port',
150                            'key': 'fedid:%s/%s-port' % (expid, desthost),
151                            'store': self.store_url,
152                            'type': 'input',
153                        },
154                        ]
155                    }
156            # Give this allocation the rights to access the key of the
157            # peers
158            if aid:
159                for h in (myname, desthost):
160                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
161                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
162                            (expid, h))
163            else:
164                self.log.error("No aid for %s in new_portal_node" % st)
165        else:
166            info = None
167
168        return (topdl.Computer(
169                name=myname,
170                attribute=[ 
171                    topdl.Attribute(attribute=n,value=v)
172                        for n, v in (\
173                            ('portal', 'true'),
174                            ('portal_type', portal_type), 
175                            ('destination_testbed', dt),
176                        )
177                    ],
178                interface=ifaces,
179                ), info)
180
181    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
182        ddomain = tbparams[dt].get_attribute('domain', ".example.com")
183        dproject = tbparams[dt].get_attribute('project', 'project')
184        tsubstrate = \
185                topdl.Substrate(name='%s-%s' % (st, dt),
186                        attribute= [
187                            topdl.Attribute(
188                                attribute='portal',
189                                value='true')
190                            ]
191                        )
192        segment_element = topdl.Segment(
193                id= {'fedid': tbparams[dt].allocID },
194                type='emulab',
195                uri = self.tbmap.get(testbed_base(dt), None),
196                interface=[ 
197                    topdl.Interface(
198                        substrate=tsubstrate.name),
199                    ],
200                attribute = [
201                    topdl.Attribute(attribute=n, value=v)
202                        for n, v in (\
203                            ('domain', ddomain),
204                            ('experiment', "%s/%s" % \
205                                    (dproject, eid)),)
206                    ],
207                )
208
209        return (tsubstrate, segment_element)
210
211    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
212            tb_name):
213        if sub.capacity is None:
214            raise service_error(service_error.internal,
215                    "Cannot direct split substrate w/o capacity")
216        segs = [ ]
217        name = join_testbed(tb_name, "%d" % idx)
218        substr = topdl.Substrate(name=name, 
219                capacity=sub.capacity.clone(),
220                attribute=[ topdl.Attribute(attribute=n, value=v)
221                    for n, v, in (\
222                            ('vlan', 'unassigned%d' % idx),)])
223        store_key = 'fedid:%s/vlan%d' % (expid, idx)
224        for tb in tbs.keys():
225            seg = topdl.Segment(
226                    id = { 'fedid':tbparams[tb].allocID },
227                    type='emulab',
228                    uri = self.tbmap.get(testbed_base(tb), None),
229                    interface=[ 
230                        topdl.Interface(
231                            substrate=substr.name),
232                        ],
233                    attribute=[ topdl.Attribute(
234                        attribute='%s_endpoint' % tb_name, 
235                        value=tbparams[tb].get_attribute(tb_name)),
236                        ]
237                    )
238            vlan_key = "%s_vlans" % tb_name
239            if tbparams[tb].get_attribute(vlan_key) is not None:
240                seg.set_attribute(vlan_key, 
241                        tbparams[tb].get_attribute(vlan_key))
242            segs.append(seg)
243
244            # Give this allocation the rights to access the key of the
245            # vlan_id
246            try:
247                aid = tbparams[tb].allocID
248                self.auth.set_attribute(aid, store_key)
249            except:
250                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
251                        % tb)
252
253        connInfo[name] = [ { 
254            'type': 'transit',
255            'parameter': [ { 
256                'name': 'vlan_id',
257                'key': store_key,
258                'store': self.store_url,
259                'type': 'output'
260                } ]
261            } ]
262
263        topo[name] = \
264                topdl.Topology(substrates=[substr], elements=segs,
265                        attribute=[
266                            topdl.Attribute(attribute="transit", value='true'),
267                            topdl.Attribute(attribute="dynamic", value='true'),
268                            topdl.Attribute(attribute="testbed", value=tb_name),
269                            topdl.Attribute(attribute="store_keys", 
270                                value=store_key),
271                            ]
272                        )
273
274    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
275            connInfo, expid=None, tb_name=None):
276        """
277        Create connection information that tells which nodes are to be
278        connected to direct transits, and create an additional topology with
279        just the interconnected segments and a substrate.
280        """
281
282        def get_substrate_from_topo(name, t):
283            for s in t.substrates:
284                if s.name == name: return s
285            else: return None
286
287
288        seer_master = None
289        for k, m in masters.items():
290            for s in m:
291                if s.name == 'seer':
292                    seer_master = k
293                    break
294            if seer_master: break
295
296        if seer_master:
297            mdomain = tbparams[seer_master].get_attribute('domain',
298                    '.example.com')
299            mproject = tbparams[seer_master].get_attribute('project', 'project')
300        else:
301            mdomain = '.example.com'
302            mproject = 'project'
303
304        # dn is the number of previously created direct nets on this direct
305        # testbed.  This routine creates a net numbered by dn
306        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
307
308        # Set the attributes in the copies that will allow setup of direct
309        # connections.
310        for tb in tbs.keys():
311            s = get_substrate_from_topo(sub.name, topo[tb])
312            if s:
313                if not connInfo.has_key(tb):
314                    connInfo[tb] = [ ]
315
316                try:
317                    aid = tbparams[tb].allocID
318                except:
319                    self.log.debug("[create_direct_substrate] " + 
320                            "Can't get alloc id for %s?" %tb)
321                    aid = None
322
323                # This may need another look, but only a service gateway will
324                # look at the active parameter, and these are only inserted to
325                # connect to a master.
326                active = "%s" % ( tb in masters)
327                info = {
328                        'type': 'transit',
329                        'member': [ {
330                            'element': i.element.name, 
331                            'interface': i.name
332                            } for i in s.interfaces \
333                                    if isinstance(i.element, topdl.Computer) ],
334                        'fedAttr': [ 
335                            { 'attribute': 'masterdomain', 'value': mdomain},
336                            { 'attribute': 'masterexperiment', 'value': 
337                                "%s/%s" % (mproject, eid)},
338                            { 'attribute': 'active', 'value': active},
339                            ],
340                        'parameter': [ {
341                            'name': 'vlan_id',
342                            'key': 'fedid:%s/vlan%d' % (expid, dn),
343                            'store': self.store_url,
344                            'type': 'input',
345                            } ]
346                        }
347                if tbs.has_key(tb):
348                    info['peer'] = tbs[tb]
349                connInfo[tb].append(info)
350
351                # Give this allocation the rights to access the key of the
352                # vlan_id
353                if aid:
354                    self.auth.set_attribute(aid, 
355                            'fedid:%s/vlan%d' % (expid, dn))
356            else:
357                raise service_error(service_error.internal,
358                        "No substrate %s in testbed %s" % (sub.name, tb))
359
360        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
361                tb_name)
362
363    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
364            segment_substrate, portals, connInfo, expid):
365        # More than one testbed is on this substrate.  Insert
366        # some portals into the subtopologies.  st == source testbed,
367        # dt == destination testbed.
368        for st in tbs.keys():
369            if not segment_substrate.has_key(st):
370                segment_substrate[st] = { }
371            if not portals.has_key(st): 
372                portals[st] = { }
373            if not connInfo.has_key(st):
374                connInfo[st] = [ ]
375            for dt in [ t for t in tbs.keys() if t != st]:
376                sproject = tbparams[st].get_attribute('project', 'project')
377                dproject = tbparams[dt].get_attribute('project', 'project')
378                sdomain = tbparams[st].get_attribute('domain', ".example.com")
379                ddomain = tbparams[dt].get_attribute('domain', ".example.com")
380                aid = tbparams[dt].allocID
381
382                seer_master = None
383
384                for m in masters.values():
385                    for s in m:
386                        if s.name == 'SEER':
387                            seer_master = m
388                            break
389                    if seer_master: break
390
391                if seer_master:
392                    mdomain = tbparams[seer_master].get_attribute('domain',
393                            '.example.com')
394                    mproject = tbparams[seer_master].get_attribute('project', 
395                            'project')
396                    muser = tbparams[seer_master].get_attribute('user', 'root')
397                    smbshare = tbparams[seer_master].get_attribute('smbshare',
398                            'USERS')
399                else:
400                    mdomain = '.example.com'
401                    mproject = 'project'
402                    muser = 'root'
403                    smbshare = 'USERS'
404
405                if (st in masters  and dt not in masters) or \
406                        (st not in masters and dt in masters):
407                    active = ("%s" % (st in masters))
408                else:
409                    active = ("%s" %(st > dt))
410
411                if not segment_substrate[st].has_key(dt):
412                    # Put a substrate and a segment for the connected
413                    # testbed in there.
414                    tsubstrate, segment_element = \
415                            self.new_portal_substrate(st, dt, eid, tbparams,
416                                    expid)
417                    segment_substrate[st][dt] = tsubstrate
418                    topo[st].substrates.append(tsubstrate)
419                    topo[st].elements.append(segment_element)
420
421                new_portal = False
422                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
423                else: dname = dt
424
425                if testbed_suffix(st): sname = "-".join(split_testbed(st))
426                else: sname = st
427
428                if portals[st].has_key(dt):
429                    # There's a portal set up to go to this destination.
430                    # See if there's room to multiplex this connection on
431                    # it.  If so, add an interface to the portal; if not,
432                    # set up to add a portal below.
433                    # [This little festival of braces is just a pop of the
434                    # last element in the list of portals between st and
435                    # dt.]
436                    portal = portals[st][dt][-1]
437                    mux = len([ i for i in portal.interface \
438                            if not i.get_attribute('portal')])
439                    if mux == self.muxmax:
440                        new_portal = True
441                        portal_type = "experiment"
442                        myname = "%stunnel%d" % (dname.lower(),
443                                len(portals[st][dt]))
444                        desthost = "%stunnel%d" % (sname.lower(), 
445                                len(portals[st][dt]))
446                    else:
447                        new_i = topdl.Interface(
448                                substrate=sub.name,
449                                attribute=[ 
450                                    topdl.Attribute(
451                                        attribute='ip4_address', 
452                                        value=tbs[dt]
453                                    )
454                                ])
455                        portal.interface.append(new_i)
456                else:
457                    # First connection to this testbed, make an empty list
458                    # and set up to add the new portal below
459                    new_portal = True
460                    portals[st][dt] = [ ]
461                    myname = "%stunnel%d" % (dname.lower(),
462                            len(portals[st][dt]))
463                    desthost = "%stunnel%d" % (sname.lower(),
464                            len(portals[st][dt]))
465
466                    if dt in masters or st in masters: portal_type = "both"
467                    else: portal_type = "experiment"
468
469                if new_portal:
470                    infs = (
471                            (segment_substrate[st][dt].name, 
472                                (('portal', 'true'),)),
473                            (sub.name, 
474                                (('ip4_address', tbs[dt]),))
475                        )
476                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
477                            masters, eid, myname, desthost, portal_type,
478                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
479
480                    topo[st].elements.append(portal)
481                    portals[st][dt].append(portal)
482                    connInfo[st].append(info)
483
484    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
485            connInfo, expid):
486        # Add to the master testbed
487        tsubstrate, segment_element = \
488                self.new_portal_substrate(st, dt, eid, tbparams, expid)
489        myname = "%stunnel" % dt
490        desthost = "%stunnel" % st
491
492        portal, info = self.new_portal_node(st, dt, tbparams, masters,
493                eid, myname, desthost, "control", 
494                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
495                conn_attrs=[], expid=expid)
496
497        topo[st].substrates.append(tsubstrate)
498        topo[st].elements.append(segment_element)
499        topo[st].elements.append(portal)
500        if not connInfo.has_key(st):
501            connInfo[st] = [ ]
502        connInfo[st].append(info)
503
504    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
505            substrate, tbparams, expid, tb_name):
506        # Add to the master testbed
507        if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt))
508        else: myname = "%stunnel" % dt
509
510        desthost = "%s" % ip_addr(dip)
511
512        portal, info = self.new_portal_node(st, dt, tbparams, masters,
513                eid, myname, desthost, "control", 
514                ((substrate.name,(
515                    ('portal','true'),
516                    ('ip4_address', "%s" % ip_addr(myip)),)),),
517                conn_type="transit", conn_attrs=[], expid=expid)
518
519        return portal
520
521    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
522            connInfo, expid):
523        """
524        For each substrate in the main topology, find those that
525        have nodes on more than one testbed.  Insert portal nodes
526        into the copies of those substrates on the sub topologies.
527        """
528        segment_substrate = { }
529        portals = { }
530        for s in top.substrates:
531            # tbs will contain an ip address on this subsrate that is in
532            # each testbed.
533            tbs = { }
534            for i in s.interfaces:
535                e = i.element
536                tb = e.get_attribute('testbed')
537                if tb and tb not in tbs:
538                    tbs[tb]= i.get_attribute('ip4_address')
539            if len(tbs) < 2:
540                continue
541
542            base_tbs = set([testbed_base(t) for t in tbs])
543
544            # DRAGON will not create multi-site vlans yet, so we don't do multi
545            # connection direct transits yet.
546            if len(tbs) == 2 :
547                # NB: the else if on the for loop - if none of the direct
548                # transits is applicable, use the internet.
549                for d in self.direct_transit:
550                    if all([tbparams[x].get_attribute(d) for x in tbs]):
551                        self.create_direct_substrate(s, topo, tbs, tbparams, 
552                                masters, eid, connInfo, expid, d)
553                        break
554                else:
555                    self.insert_internet_portals(s, topo, tbs, tbparams,
556                            masters, eid, segment_substrate, portals,
557                            connInfo, expid)
558            else:
559                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
560                        eid, segment_substrate, portals, connInfo, expid)
561
562
563        # Make sure that all the service importers have a control portal back
564        # to the master for each service.
565        for mtb in [ t for t in tbparams if t in masters ]:
566            importers = set([])
567            for m in masters[mtb]:
568                importers |= set(m.importers)
569            if mtb in importers:
570                importers.discard(mtb)
571            for tb in importers:
572                if tb not in topo:
573                    self.log.error("Importer not in experiment: %s" % tb)
574                    continue
575                if len([e for e in topo[tb].elements \
576                        if isinstance(e, topdl.Computer) and \
577                        e.get_attribute('destination_testbed') == mtb and \
578                        e.get_attribute('portal') and \
579                        e.get_attribute('portal_type') == 'both']) == 0:
580
581                    for tb_name in self.direct_transit:
582
583                        if tbparams[mtb].get_attribute(tb_name) \
584                                and tbparams[tb].get_attribute(tb_name):
585
586                            idx = len([x for x in topo.keys() \
587                                    if x.startswith(tb_name)])
588                            dip, leng = ip_allocator.allocate(4)
589                            dip += 1
590                            mip = dip+1
591                            csub = topdl.Substrate(
592                                    name="%s-control-%s" % (tb_name, tb),
593                                    capacity=topdl.Capacity(100000.0, 'max'),
594                                    attribute=[
595                                        topdl.Attribute(
596                                            attribute='portal',
597                                            value='true'
598                                            )
599                                        ]
600                                    )
601                            seg = topdl.Segment(
602                                    id= {'fedid': tbparams[mtb].allocID},
603                                    type='emulab',
604                                    uri = self.tbmap.get(testbed_base(mtb),
605                                        None),
606                                    interface=[ 
607                                        topdl.Interface(
608                                            substrate=csub.name),
609                                        ],
610                                    attribute = [
611                                        topdl.Attribute(attribute=n, value=v)
612                                            for n, v in (\
613                                                ('domain', 
614                                                    tbparams[mtb].getattribute(
615                                                        'domain',
616                                                        ".example.com")),
617                                                ('experiment', "%s/%s" % \
618                                                        (tbparams[mtb].get_attribute(
619                                                            'project', 
620                                                            'project'), 
621                                                            eid)),)
622                                        ],
623                                    )
624                            portal = self.new_direct_portal(tb, mtb,
625                                    masters, eid, dip, mip, idx, csub,
626                                    tbparams, expid, tb_name)
627                            topo[tb].substrates.append(csub)
628                            topo[tb].elements.append(portal)
629                            topo[tb].elements.append(seg)
630
631                            mcsub = csub.clone()
632                            seg = topdl.Segment(
633                                    id= { 'fedid': tbparams[tb].allocID},
634                                    type='emulab',
635                                    uri = self.tbmap.get(testbed_base(tb),
636                                        None),
637                                    interface=[ 
638                                        topdl.Interface(
639                                            substrate=csub.name),
640                                        ],
641                                    attribute = [
642                                        topdl.Attribute(attribute=n, value=v)
643                                            for n, v in (\
644                                                ('domain', 
645                                                    tbparams[tb].get_attribute(
646                                                        'domain',
647                                                        ".example.com")),
648                                                ('experiment', "%s/%s" % \
649                                                        (tbparams[tb].get_attribute(
650                                                            'project', 
651                                                            'project'), 
652                                                            eid)),)
653                                        ],
654                                    )
655                            portal = self.new_direct_portal(mtb, tb, masters,
656                                    eid, mip, dip, idx, mcsub, tbparams, expid,
657                                    tb_name)
658                            topo[mtb].substrates.append(mcsub)
659                            topo[mtb].elements.append(portal)
660                            topo[mtb].elements.append(seg)
661                            for t in (mtb, tb):
662                                topo[t].incorporate_elements()
663
664                            self.create_direct_substrate(csub, topo, 
665                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
666                                    tbparams, masters, eid, connInfo,
667                                    expid, tb_name)
668                            break
669                    # This matches with the for tb_name in self.direct_transit
670                    else:
671                        self.add_control_portal(mtb, tb, masters, eid, topo, 
672                                tbparams, connInfo, expid)
673                        self.add_control_portal(tb, mtb, masters, eid, topo, 
674                                tbparams, connInfo, expid)
675
676        # Connect the portal nodes into the topologies and clear out
677        # substrates that are not in the topologies
678        for tb in tbparams:
679            topo[tb].incorporate_elements()
680            topo[tb].substrates = \
681                    [s for s in topo[tb].substrates \
682                        if len(s.interfaces) >0]
683
Note: See TracBrowser for help on using the repository browser.