source: fedd/federation/experiment_partition.py @ 17c2f7b

compt_changes
Last change on this file since 17c2f7b was a11eda5, checked in by Ted Faber <faber@…>, 13 years ago

Add support for testbeds to indicate preference for outgoing portal
connections.

  • Property mode set to 100644
File size: 19.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from deter import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32from deter import topdl
33from deter import ip_allocator
34from deter import ip_addr
35import list_log
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None, tbactive=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58        self.tbactive = tbactive
59
60
61    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
62            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
63            expid=None):
64        """
65        Return a new internet portal node and a dict with the connectionInfo to
66        be attached.
67        """
68        seer_master = None
69        for k, m in masters.items():
70            for s in m:
71                if s.name == 'seer':
72                    seer_master = k
73                    break
74            if seer_master: break
75
76        if seer_master:
77            mdomain = tbparams[seer_master].get_attribute('domain', 
78                    '.example.com')
79            mproject = tbparams[seer_master].get_attribute('project', 
80                    'project')
81            muser = tbparams[seer_master].get_attribute('user', 'root')
82            smbshare = tbparams[seer_master].get_attribute('smbshare', 'USERS')
83        else:
84            mdomain = '.example.com'
85            mproject = 'project'
86            muser = 'root'
87            smbshare = 'USERS'
88
89        dproject = tbparams[dt].get_attribute('project', 'project')
90        ddomain = tbparams[dt].get_attribute('domain', '.example.com')
91
92        if (st in self.tbactive and dt not in self.tbactive) or \
93                ( st not in self.tbactive and dt in self.tbactive ):
94            active = ("%s" % (st in self.tbactive))
95        elif (st in masters and dt not in masters) or \
96                ( st not in masters and dt in masters ):
97            active = ("%s" % (st in masters))
98        else:
99            active = ("%s" % (st > dt))
100
101        ifaces = [ ]
102        for sub, attrs in iface_desc:
103            inf = topdl.Interface(
104                    name="inf%03d" % len(ifaces),
105                    substrate=sub,
106                    attribute=[
107                        topdl.Attribute(
108                            attribute=n,
109                            value = v)
110                        for n, v in attrs
111                        ]
112                    )
113            ifaces.append(inf)
114        if conn_type == "ssh":
115            try:
116                aid = tbparams[st].allocID
117            except:
118                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
119                        % st)
120                aid = None
121            info = {
122                    "type" : conn_type, 
123                    "portal": myname,
124                    'fedAttr': [ 
125                            { 'attribute': 'masterdomain', 'value': mdomain},
126                            { 'attribute': 'masterexperiment', 'value': 
127                                "%s/%s" % (mproject, eid)},
128                            { 'attribute': 'active', 'value': active},
129                            # Move to SMB service description
130                            { 'attribute': 'masteruser', 'value': muser},
131                            { 'attribute': 'smbshare', 'value': smbshare},
132                        ],
133                    'parameter': [
134                        {
135                            'name': 'peer',
136                            'key': 'fedid:%s/%s' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'ssh_port',
142                            'key': 'fedid:%s/%s-port' % (expid, myname),
143                            'store': self.store_url,
144                            'type': 'output',
145                        },
146                        {
147                            'name': 'peer',
148                            'key': 'fedid:%s/%s' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        {
153                            'name': 'ssh_port',
154                            'key': 'fedid:%s/%s-port' % (expid, desthost),
155                            'store': self.store_url,
156                            'type': 'input',
157                        },
158                        ]
159                    }
160            # Give this allocation the rights to access the key of the
161            # peers
162            if aid:
163                for h in (myname, desthost):
164                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
165                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
166                            (expid, h))
167            else:
168                self.log.error("No aid for %s in new_portal_node" % st)
169        else:
170            info = None
171
172        return (topdl.Computer(
173                name=myname,
174                attribute=[ 
175                    topdl.Attribute(attribute=n,value=v)
176                        for n, v in (\
177                            ('portal', 'true'),
178                            ('portal_type', portal_type), 
179                            ('destination_testbed', dt),
180                        )
181                    ],
182                interface=ifaces,
183                ), info)
184
185    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
186        ddomain = tbparams[dt].get_attribute('domain', ".example.com")
187        dproject = tbparams[dt].get_attribute('project', 'project')
188        tsubstrate = \
189                topdl.Substrate(name='%s-%s' % (st, dt),
190                        attribute= [
191                            topdl.Attribute(
192                                attribute='portal',
193                                value='true')
194                            ]
195                        )
196        segment_element = topdl.Segment(
197                id= {'fedid': tbparams[dt].allocID },
198                type='emulab',
199                uri = self.tbmap.get(testbed_base(dt), None),
200                interface=[ 
201                    topdl.Interface(
202                        substrate=tsubstrate.name),
203                    ],
204                attribute = [
205                    topdl.Attribute(attribute=n, value=v)
206                        for n, v in (\
207                            ('domain', ddomain),
208                            ('experiment', "%s/%s" % \
209                                    (dproject, eid)),)
210                    ],
211                )
212
213        return (tsubstrate, segment_element)
214
215    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
216            tb_name):
217        if sub.capacity is None:
218            raise service_error(service_error.internal,
219                    "Cannot direct split substrate w/o capacity")
220        segs = [ ]
221        name = join_testbed(tb_name, "%d" % idx)
222        substr = topdl.Substrate(name=name, 
223                capacity=sub.capacity.clone(),
224                attribute=[ topdl.Attribute(attribute=n, value=v)
225                    for n, v, in (\
226                            ('vlan', 'unassigned%d' % idx),)])
227        store_key = 'fedid:%s/vlan%d' % (expid, idx)
228        for tb in tbs.keys():
229            seg = topdl.Segment(
230                    id = { 'fedid':tbparams[tb].allocID },
231                    type='emulab',
232                    uri = self.tbmap.get(testbed_base(tb), None),
233                    interface=[ 
234                        topdl.Interface(
235                            substrate=substr.name),
236                        ],
237                    attribute=[ topdl.Attribute(
238                        attribute='%s_endpoint' % tb_name, 
239                        value=tbparams[tb].get_attribute(tb_name)),
240                        ]
241                    )
242            vlan_key = "%s_vlans" % tb_name
243            if tbparams[tb].get_attribute(vlan_key) is not None:
244                seg.set_attribute(vlan_key, 
245                        tbparams[tb].get_attribute(vlan_key))
246            segs.append(seg)
247
248            # Give this allocation the rights to access the key of the
249            # vlan_id
250            try:
251                aid = tbparams[tb].allocID
252                self.auth.set_attribute(aid, store_key)
253            except:
254                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
255                        % tb)
256
257        connInfo[name] = [ { 
258            'type': 'transit',
259            'parameter': [ { 
260                'name': 'vlan_id',
261                'key': store_key,
262                'store': self.store_url,
263                'type': 'output'
264                } ]
265            } ]
266
267        topo[name] = \
268                topdl.Topology(substrates=[substr], elements=segs,
269                        attribute=[
270                            topdl.Attribute(attribute="transit", value='true'),
271                            topdl.Attribute(attribute="dynamic", value='true'),
272                            topdl.Attribute(attribute="testbed", value=tb_name),
273                            topdl.Attribute(attribute="store_keys", 
274                                value=store_key),
275                            ]
276                        )
277
278    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
279            connInfo, expid=None, tb_name=None):
280        """
281        Create connection information that tells which nodes are to be
282        connected to direct transits, and create an additional topology with
283        just the interconnected segments and a substrate.
284        """
285
286        def get_substrate_from_topo(name, t):
287            for s in t.substrates:
288                if s.name == name: return s
289            else: return None
290
291
292        seer_master = None
293        for k, m in masters.items():
294            for s in m:
295                if s.name == 'seer':
296                    seer_master = k
297                    break
298            if seer_master: break
299
300        if seer_master:
301            mdomain = tbparams[seer_master].get_attribute('domain',
302                    '.example.com')
303            mproject = tbparams[seer_master].get_attribute('project', 'project')
304        else:
305            mdomain = '.example.com'
306            mproject = 'project'
307
308        # dn is the number of previously created direct nets on this direct
309        # testbed.  This routine creates a net numbered by dn
310        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
311
312        # Set the attributes in the copies that will allow setup of direct
313        # connections.
314        for tb in tbs.keys():
315            s = get_substrate_from_topo(sub.name, topo[tb])
316            if s:
317                if not connInfo.has_key(tb):
318                    connInfo[tb] = [ ]
319
320                try:
321                    aid = tbparams[tb].allocID
322                except:
323                    self.log.debug("[create_direct_substrate] " + 
324                            "Can't get alloc id for %s?" %tb)
325                    aid = None
326
327                # This may need another look, but only a service gateway will
328                # look at the active parameter, and these are only inserted to
329                # connect to a master.
330                active = "%s" % ( tb in masters)
331                info = {
332                        'type': 'transit',
333                        'member': [ {
334                            'element': i.element.name, 
335                            'interface': i.name
336                            } for i in s.interfaces \
337                                    if isinstance(i.element, topdl.Computer) ],
338                        'fedAttr': [ 
339                            { 'attribute': 'masterdomain', 'value': mdomain},
340                            { 'attribute': 'masterexperiment', 'value': 
341                                "%s/%s" % (mproject, eid)},
342                            { 'attribute': 'active', 'value': active},
343                            ],
344                        'parameter': [ {
345                            'name': 'vlan_id',
346                            'key': 'fedid:%s/vlan%d' % (expid, dn),
347                            'store': self.store_url,
348                            'type': 'input',
349                            } ]
350                        }
351                if tbs.has_key(tb):
352                    info['peer'] = tbs[tb]
353                connInfo[tb].append(info)
354
355                # Give this allocation the rights to access the key of the
356                # vlan_id
357                if aid:
358                    self.auth.set_attribute(aid, 
359                            'fedid:%s/vlan%d' % (expid, dn))
360            else:
361                raise service_error(service_error.internal,
362                        "No substrate %s in testbed %s" % (sub.name, tb))
363
364        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
365                tb_name)
366
367    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
368            segment_substrate, portals, connInfo, expid):
369        # More than one testbed is on this substrate.  Insert
370        # some portals into the subtopologies.  st == source testbed,
371        # dt == destination testbed.
372        for st in tbs.keys():
373            if not segment_substrate.has_key(st):
374                segment_substrate[st] = { }
375            if not portals.has_key(st): 
376                portals[st] = { }
377            if not connInfo.has_key(st):
378                connInfo[st] = [ ]
379            for dt in [ t for t in tbs.keys() if t != st]:
380                sproject = tbparams[st].get_attribute('project', 'project')
381                dproject = tbparams[dt].get_attribute('project', 'project')
382                sdomain = tbparams[st].get_attribute('domain', ".example.com")
383                ddomain = tbparams[dt].get_attribute('domain', ".example.com")
384                aid = tbparams[dt].allocID
385
386                seer_master = None
387
388                for m in masters.values():
389                    for s in m:
390                        if s.name == 'SEER':
391                            seer_master = m
392                            break
393                    if seer_master: break
394
395                if seer_master:
396                    mdomain = tbparams[seer_master].get_attribute('domain',
397                            '.example.com')
398                    mproject = tbparams[seer_master].get_attribute('project', 
399                            'project')
400                    muser = tbparams[seer_master].get_attribute('user', 'root')
401                    smbshare = tbparams[seer_master].get_attribute('smbshare',
402                            'USERS')
403                else:
404                    mdomain = '.example.com'
405                    mproject = 'project'
406                    muser = 'root'
407                    smbshare = 'USERS'
408
409                if (st in masters  and dt not in masters) or \
410                        (st not in masters and dt in masters):
411                    active = ("%s" % (st in masters))
412                else:
413                    active = ("%s" %(st > dt))
414
415                if not segment_substrate[st].has_key(dt):
416                    # Put a substrate and a segment for the connected
417                    # testbed in there.
418                    tsubstrate, segment_element = \
419                            self.new_portal_substrate(st, dt, eid, tbparams,
420                                    expid)
421                    segment_substrate[st][dt] = tsubstrate
422                    topo[st].substrates.append(tsubstrate)
423                    topo[st].elements.append(segment_element)
424
425                new_portal = False
426                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
427                else: dname = dt
428
429                if testbed_suffix(st): sname = "-".join(split_testbed(st))
430                else: sname = st
431
432                if portals[st].has_key(dt):
433                    # There's a portal set up to go to this destination.
434                    # See if there's room to multiplex this connection on
435                    # it.  If so, add an interface to the portal; if not,
436                    # set up to add a portal below.
437                    # [This little festival of braces is just a pop of the
438                    # last element in the list of portals between st and
439                    # dt.]
440                    portal = portals[st][dt][-1]
441                    mux = len([ i for i in portal.interface \
442                            if not i.get_attribute('portal')])
443                    if mux == self.muxmax:
444                        new_portal = True
445                        portal_type = "experiment"
446                        myname = "%stunnel%d" % (dname.lower(),
447                                len(portals[st][dt]))
448                        desthost = "%stunnel%d" % (sname.lower(), 
449                                len(portals[st][dt]))
450                    else:
451                        new_i = topdl.Interface(
452                                substrate=sub.name,
453                                attribute=[ 
454                                    topdl.Attribute(
455                                        attribute='ip4_address', 
456                                        value=tbs[dt]
457                                    )
458                                ])
459                        portal.interface.append(new_i)
460                else:
461                    # First connection to this testbed, make an empty list
462                    # and set up to add the new portal below
463                    new_portal = True
464                    portals[st][dt] = [ ]
465                    myname = "%stunnel%d" % (dname.lower(),
466                            len(portals[st][dt]))
467                    desthost = "%stunnel%d" % (sname.lower(),
468                            len(portals[st][dt]))
469
470                    if dt in masters or st in masters: portal_type = "both"
471                    else: portal_type = "experiment"
472
473                if new_portal:
474                    infs = (
475                            (segment_substrate[st][dt].name, 
476                                (('portal', 'true'),)),
477                            (sub.name, 
478                                (('ip4_address', tbs[dt]),))
479                        )
480                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
481                            masters, eid, myname, desthost, portal_type,
482                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
483
484                    topo[st].elements.append(portal)
485                    portals[st][dt].append(portal)
486                    connInfo[st].append(info)
487
488    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
489            connInfo, expid):
490        # Add to the master testbed
491        tsubstrate, segment_element = \
492                self.new_portal_substrate(st, dt, eid, tbparams, expid)
493        myname = "%stunnel" % dt
494        desthost = "%stunnel" % st
495
496        portal, info = self.new_portal_node(st, dt, tbparams, masters,
497                eid, myname, desthost, "control", 
498                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
499                conn_attrs=[], expid=expid)
500
501        topo[st].substrates.append(tsubstrate)
502        topo[st].elements.append(segment_element)
503        topo[st].elements.append(portal)
504        if not connInfo.has_key(st):
505            connInfo[st] = [ ]
506        connInfo[st].append(info)
507
508    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
509            substrate, tbparams, expid, tb_name):
510        # Add to the master testbed
511        if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt))
512        else: myname = "%stunnel" % dt
513
514        desthost = "%s" % ip_addr(dip)
515
516        portal, info = self.new_portal_node(st, dt, tbparams, masters,
517                eid, myname, desthost, "control", 
518                ((substrate.name,(
519                    ('portal','true'),
520                    ('ip4_address', "%s" % ip_addr(myip)),)),),
521                conn_type="transit", conn_attrs=[], expid=expid)
522
523        return portal
524
525    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
526            connInfo, expid):
527        """
528        For each substrate in the main topology, find those that
529        have nodes on more than one testbed.  Insert portal nodes
530        into the copies of those substrates on the sub topologies.
531        """
532        segment_substrate = { }
533        portals = { }
534        for s in top.substrates:
535            # tbs will contain an ip address on this subsrate that is in
536            # each testbed.
537            tbs = { }
538            for i in s.interfaces:
539                e = i.element
540                tb = e.get_attribute('testbed')
541                if tb and tb not in tbs:
542                    tbs[tb]= i.get_attribute('ip4_address')
543            if len(tbs) < 2:
544                continue
545
546            base_tbs = set([testbed_base(t) for t in tbs])
547
548            # DRAGON will not create multi-site vlans yet, so we don't do multi
549            # connection direct transits yet.
550            if len(tbs) == 2 :
551                # NB: the else if on the for loop - if none of the direct
552                # transits is applicable, use the internet.
553                for d in self.direct_transit:
554                    if all([tbparams[x].get_attribute(d) for x in tbs]):
555                        self.create_direct_substrate(s, topo, tbs, tbparams, 
556                                masters, eid, connInfo, expid, d)
557                        break
558                else:
559                    self.insert_internet_portals(s, topo, tbs, tbparams,
560                            masters, eid, segment_substrate, portals,
561                            connInfo, expid)
562            else:
563                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
564                        eid, segment_substrate, portals, connInfo, expid)
565
566
567        # Make sure that all the service importers have a control portal back
568        # to the master for each service.
569        for mtb in [ t for t in tbparams if t in masters ]:
570            importers = set([])
571            for m in masters[mtb]:
572                importers |= set(m.importers)
573            if mtb in importers:
574                importers.discard(mtb)
575            for tb in importers:
576                if tb not in topo:
577                    self.log.error("Importer not in experiment: %s" % tb)
578                    continue
579                if len([e for e in topo[tb].elements \
580                        if isinstance(e, topdl.Computer) and \
581                        e.get_attribute('destination_testbed') == mtb and \
582                        e.get_attribute('portal') and \
583                        e.get_attribute('portal_type') == 'both']) == 0:
584
585                    for tb_name in self.direct_transit:
586
587                        if tbparams[mtb].get_attribute(tb_name) \
588                                and tbparams[tb].get_attribute(tb_name):
589
590                            idx = len([x for x in topo.keys() \
591                                    if x.startswith(tb_name)])
592                            dip, leng = ip_allocator.allocate(4)
593                            dip += 1
594                            mip = dip+1
595                            csub = topdl.Substrate(
596                                    name="%s-control-%s" % (tb_name, tb),
597                                    capacity=topdl.Capacity(100000.0, 'max'),
598                                    attribute=[
599                                        topdl.Attribute(
600                                            attribute='portal',
601                                            value='true'
602                                            )
603                                        ]
604                                    )
605                            seg = topdl.Segment(
606                                    id= {'fedid': tbparams[mtb].allocID},
607                                    type='emulab',
608                                    uri = self.tbmap.get(testbed_base(mtb),
609                                        None),
610                                    interface=[ 
611                                        topdl.Interface(
612                                            substrate=csub.name),
613                                        ],
614                                    attribute = [
615                                        topdl.Attribute(attribute=n, value=v)
616                                            for n, v in (\
617                                                ('domain', 
618                                                    tbparams[mtb].get_attribute(
619                                                        'domain',
620                                                        ".example.com")),
621                                                ('experiment', "%s/%s" % \
622                                                        (tbparams[mtb].get_attribute(
623                                                            'project', 
624                                                            'project'), 
625                                                            eid)),)
626                                        ],
627                                    )
628                            portal = self.new_direct_portal(tb, mtb,
629                                    masters, eid, dip, mip, idx, csub,
630                                    tbparams, expid, tb_name)
631                            topo[tb].substrates.append(csub)
632                            topo[tb].elements.append(portal)
633                            topo[tb].elements.append(seg)
634
635                            mcsub = csub.clone()
636                            seg = topdl.Segment(
637                                    id= { 'fedid': tbparams[tb].allocID},
638                                    type='emulab',
639                                    uri = self.tbmap.get(testbed_base(tb),
640                                        None),
641                                    interface=[ 
642                                        topdl.Interface(
643                                            substrate=csub.name),
644                                        ],
645                                    attribute = [
646                                        topdl.Attribute(attribute=n, value=v)
647                                            for n, v in (\
648                                                ('domain', 
649                                                    tbparams[tb].get_attribute(
650                                                        'domain',
651                                                        ".example.com")),
652                                                ('experiment', "%s/%s" % \
653                                                        (tbparams[tb].get_attribute(
654                                                            'project', 
655                                                            'project'), 
656                                                            eid)),)
657                                        ],
658                                    )
659                            portal = self.new_direct_portal(mtb, tb, masters,
660                                    eid, mip, dip, idx, mcsub, tbparams, expid,
661                                    tb_name)
662                            topo[mtb].substrates.append(mcsub)
663                            topo[mtb].elements.append(portal)
664                            topo[mtb].elements.append(seg)
665                            for t in (mtb, tb):
666                                topo[t].incorporate_elements()
667
668                            self.create_direct_substrate(csub, topo, 
669                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
670                                    tbparams, masters, eid, connInfo,
671                                    expid, tb_name)
672                            break
673                    # This matches with the for tb_name in self.direct_transit
674                    else:
675                        self.add_control_portal(mtb, tb, masters, eid, topo, 
676                                tbparams, connInfo, expid)
677                        self.add_control_portal(tb, mtb, masters, eid, topo, 
678                                tbparams, connInfo, expid)
679
680        # Connect the portal nodes into the topologies and clear out
681        # substrates that are not in the topologies
682        for tb in tbparams:
683            topo[tb].incorporate_elements()
684            topo[tb].substrates = \
685                    [s for s in topo[tb].substrates \
686                        if len(s.interfaces) >0]
687
Note: See TracBrowser for help on using the repository browser.