source: fedd/federation/experiment_partition.py @ ee7f7e4

axis_examplecompt_changesinfo-ops
Last change on this file since ee7f7e4 was 814b5e5, checked in by Ted Faber <faber@…>, 14 years ago

Merge fixes from stable branch

  • Property mode set to 100644
File size: 19.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58
59
60    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
61            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
62            expid=None):
63        """
64        Return a new internet portal node and a dict with the connectionInfo to
65        be attached.
66        """
67        seer_master = None
68        for k, m in masters.items():
69            for s in m:
70                if s.name == 'seer':
71                    seer_master = k
72                    break
73            if seer_master: break
74
75        if seer_master:
76            mdomain = tbparams[seer_master].get('domain', '.example.com')
77            mproject = tbparams[seer_master].get('project', 'project')
78            muser = tbparams[seer_master].get('user', 'root')
79            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
80        else:
81            mdomain = '.example.com'
82            mproject = 'project'
83            muser = 'root'
84            smbshare = 'USERS'
85
86        dproject = tbparams[dt].get('project', 'project')
87        ddomain = tbparams[dt].get('domain', '.example.com')
88
89        if (st in masters and dt not in masters) or \
90                ( st not in masters and dt in masters ):
91            active = ("%s" % (st in masters))
92        else:
93            active = ("%s" % (st > dt))
94
95        ifaces = [ ]
96        for sub, attrs in iface_desc:
97            inf = topdl.Interface(
98                    name="inf%03d" % len(ifaces),
99                    substrate=sub,
100                    attribute=[
101                        topdl.Attribute(
102                            attribute=n,
103                            value = v)
104                        for n, v in attrs
105                        ]
106                    )
107            ifaces.append(inf)
108        if conn_type == "ssh":
109            try:
110                aid = tbparams[st]['allocID']['fedid']
111            except:
112                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
113                        % st)
114                aid = None
115            info = {
116                    "type" : conn_type, 
117                    "portal": myname,
118                    'fedAttr': [ 
119                            { 'attribute': 'masterdomain', 'value': mdomain},
120                            { 'attribute': 'masterexperiment', 'value': 
121                                "%s/%s" % (mproject, eid)},
122                            { 'attribute': 'active', 'value': active},
123                            # Move to SMB service description
124                            { 'attribute': 'masteruser', 'value': muser},
125                            { 'attribute': 'smbshare', 'value': smbshare},
126                        ],
127                    'parameter': [
128                        {
129                            'name': 'peer',
130                            'key': 'fedid:%s/%s' % (expid, myname),
131                            'store': self.store_url,
132                            'type': 'output',
133                        },
134                        {
135                            'name': 'ssh_port',
136                            'key': 'fedid:%s/%s-port' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'peer',
142                            'key': 'fedid:%s/%s' % (expid, desthost),
143                            'store': self.store_url,
144                            'type': 'input',
145                        },
146                        {
147                            'name': 'ssh_port',
148                            'key': 'fedid:%s/%s-port' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        ]
153                    }
154            # Give this allocation the rights to access the key of the
155            # peers
156            if aid:
157                for h in (myname, desthost):
158                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
159                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
160                            (expid, h))
161            else:
162                self.log.error("No aid for %s in new_portal_node" % st)
163        else:
164            info = None
165
166        return (topdl.Computer(
167                name=myname,
168                attribute=[ 
169                    topdl.Attribute(attribute=n,value=v)
170                        for n, v in (\
171                            ('portal', 'true'),
172                            ('portal_type', portal_type), 
173                            ('destination_testbed', dt),
174                        )
175                    ],
176                interface=ifaces,
177                ), info)
178
179    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
180        ddomain = tbparams[dt].get('domain', ".example.com")
181        dproject = tbparams[dt].get('project', 'project')
182        tsubstrate = \
183                topdl.Substrate(name='%s-%s' % (st, dt),
184                        attribute= [
185                            topdl.Attribute(
186                                attribute='portal',
187                                value='true')
188                            ]
189                        )
190        segment_element = topdl.Segment(
191                id= tbparams[dt]['allocID'],
192                type='emulab',
193                uri = self.tbmap.get(testbed_base(dt), None),
194                interface=[ 
195                    topdl.Interface(
196                        substrate=tsubstrate.name),
197                    ],
198                attribute = [
199                    topdl.Attribute(attribute=n, value=v)
200                        for n, v in (\
201                            ('domain', ddomain),
202                            ('experiment', "%s/%s" % \
203                                    (dproject, eid)),)
204                    ],
205                )
206
207        return (tsubstrate, segment_element)
208
209    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
210            tb_name):
211        if sub.capacity is None:
212            raise service_error(service_error.internal,
213                    "Cannot direct split substrate w/o capacity")
214        segs = [ ]
215        name = join_testbed(tb_name, "%d" % idx)
216        substr = topdl.Substrate(name=name, 
217                capacity=sub.capacity.clone(),
218                attribute=[ topdl.Attribute(attribute=n, value=v)
219                    for n, v, in (\
220                            ('vlan', 'unassigned%d' % idx),)])
221        store_key = 'fedid:%s/vlan%d' % (expid, idx)
222        for tb in tbs.keys():
223            seg = topdl.Segment(
224                    id = tbparams[tb]['allocID'],
225                    type='emulab',
226                    uri = self.tbmap.get(testbed_base(tb), None),
227                    interface=[ 
228                        topdl.Interface(
229                            substrate=substr.name),
230                        ],
231                    attribute=[ topdl.Attribute(
232                        attribute='%s_endpoint' % tb_name, 
233                        value=tbparams[tb][tb_name]),
234                        ]
235                    )
236            vlan_key = "%s_vlans" % tb_name
237            if vlan_key in tbparams[tb]:
238                seg.set_attribute(vlan_key, tbparams[tb][vlan_key])
239            segs.append(seg)
240
241            # Give this allocation the rights to access the key of the
242            # vlan_id
243            try:
244                aid = tbparams[tb]['allocID']['fedid']
245                self.auth.set_attribute(aid, store_key)
246            except:
247                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
248                        % tb)
249
250        connInfo[name] = [ { 
251            'type': 'transit',
252            'parameter': [ { 
253                'name': 'vlan_id',
254                'key': store_key,
255                'store': self.store_url,
256                'type': 'output'
257                } ]
258            } ]
259
260        topo[name] = \
261                topdl.Topology(substrates=[substr], elements=segs,
262                        attribute=[
263                            topdl.Attribute(attribute="transit", value='true'),
264                            topdl.Attribute(attribute="dynamic", value='true'),
265                            topdl.Attribute(attribute="testbed", value=tb_name),
266                            topdl.Attribute(attribute="store_keys", 
267                                value=store_key),
268                            ]
269                        )
270
271    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
272            connInfo, expid=None, tb_name=None):
273        """
274        Create connection information that tells which nodes are to be
275        connected to direct transits, and create an additional topology with
276        just the interconnected segments and a substrate.
277        """
278
279        def get_substrate_from_topo(name, t):
280            for s in t.substrates:
281                if s.name == name: return s
282            else: return None
283
284
285        seer_master = None
286        for k, m in masters.items():
287            for s in m:
288                if s.name == 'seer':
289                    seer_master = k
290                    break
291            if seer_master: break
292
293        if seer_master:
294            mdomain = tbparams[seer_master].get('domain', '.example.com')
295            mproject = tbparams[seer_master].get('project', 'project')
296        else:
297            mdomain = '.example.com'
298            mproject = 'project'
299
300        # dn is the number of previously created direct nets on this direct
301        # testbed.  This routine creates a net numbered by dn
302        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
303
304        # Set the attributes in the copies that will allow setup of direct
305        # connections.
306        for tb in tbs.keys():
307            s = get_substrate_from_topo(sub.name, topo[tb])
308            if s:
309                if not connInfo.has_key(tb):
310                    connInfo[tb] = [ ]
311
312                try:
313                    aid = tbparams[tb]['allocID']['fedid']
314                except:
315                    self.log.debug("[create_direct_substrate] " + 
316                            "Can't get alloc id for %s?" %tb)
317                    aid = None
318
319                # This may need another look, but only a service gateway will
320                # look at the active parameter, and these are only inserted to
321                # connect to a master.
322                active = "%s" % ( tb in masters)
323                info = {
324                        'type': 'transit',
325                        'member': [ {
326                            'element': i.element.name, 
327                            'interface': i.name
328                            } for i in s.interfaces \
329                                    if isinstance(i.element, topdl.Computer) ],
330                        'fedAttr': [ 
331                            { 'attribute': 'masterdomain', 'value': mdomain},
332                            { 'attribute': 'masterexperiment', 'value': 
333                                "%s/%s" % (mproject, eid)},
334                            { 'attribute': 'active', 'value': active},
335                            ],
336                        'parameter': [ {
337                            'name': 'vlan_id',
338                            'key': 'fedid:%s/vlan%d' % (expid, dn),
339                            'store': self.store_url,
340                            'type': 'input',
341                            } ]
342                        }
343                if tbs.has_key(tb):
344                    info['peer'] = tbs[tb]
345                connInfo[tb].append(info)
346
347                # Give this allocation the rights to access the key of the
348                # vlan_id
349                if aid:
350                    self.auth.set_attribute(aid, 
351                            'fedid:%s/vlan%d' % (expid, dn))
352            else:
353                raise service_error(service_error.internal,
354                        "No substrate %s in testbed %s" % (sub.name, tb))
355
356        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
357                tb_name)
358
359    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
360            segment_substrate, portals, connInfo, expid):
361        # More than one testbed is on this substrate.  Insert
362        # some portals into the subtopologies.  st == source testbed,
363        # dt == destination testbed.
364        for st in tbs.keys():
365            if not segment_substrate.has_key(st):
366                segment_substrate[st] = { }
367            if not portals.has_key(st): 
368                portals[st] = { }
369            if not connInfo.has_key(st):
370                connInfo[st] = [ ]
371            for dt in [ t for t in tbs.keys() if t != st]:
372                sproject = tbparams[st].get('project', 'project')
373                dproject = tbparams[dt].get('project', 'project')
374                sdomain = tbparams[st].get('domain', ".example.com")
375                ddomain = tbparams[dt].get('domain', ".example.com")
376                aid = tbparams[dt]['allocID']['fedid']
377
378                seer_master = None
379
380                for m in masters.values():
381                    for s in m:
382                        if s.name == 'SEER':
383                            seer_master = m
384                            break
385                    if seer_master: break
386
387                if seer_master:
388                    mdomain = tbparams[seer_master].get('domain', '.example.com')
389                    mproject = tbparams[seer_master].get('project', 'project')
390                    muser = tbparams[seer_master].get('user', 'root')
391                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
392                else:
393                    mdomain = '.example.com'
394                    mproject = 'project'
395                    muser = 'root'
396                    smbshare = 'USERS'
397
398                if (st in masters  and dt not in masters) or \
399                        (st not in masters and dt in masters):
400                    active = ("%s" % (st in masters))
401                else:
402                    active = ("%s" %(st > dt))
403
404                if not segment_substrate[st].has_key(dt):
405                    # Put a substrate and a segment for the connected
406                    # testbed in there.
407                    tsubstrate, segment_element = \
408                            self.new_portal_substrate(st, dt, eid, tbparams,
409                                    expid)
410                    segment_substrate[st][dt] = tsubstrate
411                    topo[st].substrates.append(tsubstrate)
412                    topo[st].elements.append(segment_element)
413
414                new_portal = False
415                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
416                else: dname = dt
417
418                if testbed_suffix(st): sname = "-".join(split_testbed(st))
419                else: sname = st
420
421                if portals[st].has_key(dt):
422                    # There's a portal set up to go to this destination.
423                    # See if there's room to multiplex this connection on
424                    # it.  If so, add an interface to the portal; if not,
425                    # set up to add a portal below.
426                    # [This little festival of braces is just a pop of the
427                    # last element in the list of portals between st and
428                    # dt.]
429                    portal = portals[st][dt][-1]
430                    mux = len([ i for i in portal.interface \
431                            if not i.get_attribute('portal')])
432                    if mux == self.muxmax:
433                        new_portal = True
434                        portal_type = "experiment"
435                        myname = "%stunnel%d" % (dname.lower(),
436                                len(portals[st][dt]))
437                        desthost = "%stunnel%d" % (sname.lower(), 
438                                len(portals[st][dt]))
439                    else:
440                        new_i = topdl.Interface(
441                                substrate=sub.name,
442                                attribute=[ 
443                                    topdl.Attribute(
444                                        attribute='ip4_address', 
445                                        value=tbs[dt]
446                                    )
447                                ])
448                        portal.interface.append(new_i)
449                else:
450                    # First connection to this testbed, make an empty list
451                    # and set up to add the new portal below
452                    new_portal = True
453                    portals[st][dt] = [ ]
454                    myname = "%stunnel%d" % (dname.lower(),
455                            len(portals[st][dt]))
456                    desthost = "%stunnel%d" % (sname.lower(),
457                            len(portals[st][dt]))
458
459                    if dt in masters or st in masters: portal_type = "both"
460                    else: portal_type = "experiment"
461
462                if new_portal:
463                    infs = (
464                            (segment_substrate[st][dt].name, 
465                                (('portal', 'true'),)),
466                            (sub.name, 
467                                (('ip4_address', tbs[dt]),))
468                        )
469                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
470                            masters, eid, myname, desthost, portal_type,
471                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
472
473                    topo[st].elements.append(portal)
474                    portals[st][dt].append(portal)
475                    connInfo[st].append(info)
476
477    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
478            connInfo, expid):
479        # Add to the master testbed
480        tsubstrate, segment_element = \
481                self.new_portal_substrate(st, dt, eid, tbparams, expid)
482        myname = "%stunnel" % dt
483        desthost = "%stunnel" % st
484
485        portal, info = self.new_portal_node(st, dt, tbparams, masters,
486                eid, myname, desthost, "control", 
487                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
488                conn_attrs=[], expid=expid)
489
490        topo[st].substrates.append(tsubstrate)
491        topo[st].elements.append(segment_element)
492        topo[st].elements.append(portal)
493        if not connInfo.has_key(st):
494            connInfo[st] = [ ]
495        connInfo[st].append(info)
496
497    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
498            substrate, tbparams, expid, tb_name):
499        # Add to the master testbed
500        if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt))
501        else: myname = "%stunnel" % dt
502
503        desthost = "%s" % ip_addr(dip)
504
505        portal, info = self.new_portal_node(st, dt, tbparams, masters,
506                eid, myname, desthost, "control", 
507                ((substrate.name,(
508                    ('portal','true'),
509                    ('ip4_address', "%s" % ip_addr(myip)),)),),
510                conn_type="transit", conn_attrs=[], expid=expid)
511
512        return portal
513
514    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
515            connInfo, expid):
516        """
517        For each substrate in the main topology, find those that
518        have nodes on more than one testbed.  Insert portal nodes
519        into the copies of those substrates on the sub topologies.
520        """
521        segment_substrate = { }
522        portals = { }
523        for s in top.substrates:
524            # tbs will contain an ip address on this subsrate that is in
525            # each testbed.
526            tbs = { }
527            for i in s.interfaces:
528                e = i.element
529                tb = e.get_attribute('testbed')
530                if tb and tb not in tbs:
531                    tbs[tb]= i.get_attribute('ip4_address')
532            if len(tbs) < 2:
533                continue
534
535            base_tbs = set([testbed_base(t) for t in tbs])
536
537            # DRAGON will not create multi-site vlans yet, so we don't do multi
538            # connection direct transits yet.
539            if len(tbs) == 2 :
540                # NB: the else if on the for loop - if none of the direct
541                # transits is applicable, use the internet.
542                for d in self.direct_transit:
543                    if all([tbparams[x].has_key(d) for x in tbs]):
544                        self.create_direct_substrate(s, topo, tbs, tbparams, 
545                                masters, eid, connInfo, expid, d)
546                        break
547                else:
548                    self.insert_internet_portals(s, topo, tbs, tbparams,
549                            masters, eid, segment_substrate, portals,
550                            connInfo, expid)
551            else:
552                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
553                        eid, segment_substrate, portals, connInfo, expid)
554
555
556        # Make sure that all the service importers have a control portal back
557        # to the master for each service.
558        for mtb in [ t for t in tbparams if t in masters ]:
559            importers = set([])
560            for m in masters[mtb]:
561                importers |= set(m.importers)
562            if mtb in importers:
563                importers.discard(mtb)
564            for tb in importers:
565                if tb not in topo:
566                    self.log.error("Importer not in experiment: %s" % tb)
567                    continue
568                if len([e for e in topo[tb].elements \
569                        if isinstance(e, topdl.Computer) and \
570                        e.get_attribute('destination_testbed') == mtb and \
571                        e.get_attribute('portal') and \
572                        e.get_attribute('portal_type') == 'both']) == 0:
573
574                    for tb_name in self.direct_transit:
575
576                        if tbparams[mtb].has_key(tb_name) \
577                                and tbparams[tb].has_key(tb_name):
578
579                            idx = len([x for x in topo.keys() \
580                                    if x.startswith(tb_name)])
581                            dip, leng = ip_allocator.allocate(4)
582                            dip += 1
583                            mip = dip+1
584                            csub = topdl.Substrate(
585                                    name="%s-control-%s" % (tb_name, tb),
586                                    capacity=topdl.Capacity(100000.0, 'max'),
587                                    attribute=[
588                                        topdl.Attribute(
589                                            attribute='portal',
590                                            value='true'
591                                            )
592                                        ]
593                                    )
594                            seg = topdl.Segment(
595                                    id= tbparams[mtb]['allocID'],
596                                    type='emulab',
597                                    uri = self.tbmap.get(testbed_base(mtb),
598                                        None),
599                                    interface=[ 
600                                        topdl.Interface(
601                                            substrate=csub.name),
602                                        ],
603                                    attribute = [
604                                        topdl.Attribute(attribute=n, value=v)
605                                            for n, v in (\
606                                                ('domain', 
607                                                    tbparams[mtb].get('domain',
608                                                        ".example.com")),
609                                                ('experiment', "%s/%s" % \
610                                                        (tbparams[mtb].get(
611                                                            'project', 
612                                                            'project'), 
613                                                            eid)),)
614                                        ],
615                                    )
616                            portal = self.new_direct_portal(tb, mtb,
617                                    masters, eid, dip, mip, idx, csub,
618                                    tbparams, expid, tb_name)
619                            topo[tb].substrates.append(csub)
620                            topo[tb].elements.append(portal)
621                            topo[tb].elements.append(seg)
622
623                            mcsub = csub.clone()
624                            seg = topdl.Segment(
625                                    id= tbparams[tb]['allocID'],
626                                    type='emulab',
627                                    uri = self.tbmap.get(testbed_base(tb),
628                                        None),
629                                    interface=[ 
630                                        topdl.Interface(
631                                            substrate=csub.name),
632                                        ],
633                                    attribute = [
634                                        topdl.Attribute(attribute=n, value=v)
635                                            for n, v in (\
636                                                ('domain', 
637                                                    tbparams[tb].get('domain',
638                                                        ".example.com")),
639                                                ('experiment', "%s/%s" % \
640                                                        (tbparams[tb].get('project', 
641                                                            'project'), 
642                                                            eid)),)
643                                        ],
644                                    )
645                            portal = self.new_direct_portal(mtb, tb, masters,
646                                    eid, mip, dip, idx, mcsub, tbparams, expid,
647                                    tb_name)
648                            topo[mtb].substrates.append(mcsub)
649                            topo[mtb].elements.append(portal)
650                            topo[mtb].elements.append(seg)
651                            for t in (mtb, tb):
652                                topo[t].incorporate_elements()
653
654                            self.create_direct_substrate(csub, topo, 
655                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
656                                    tbparams, masters, eid, connInfo,
657                                    expid, tb_name)
658                            break
659                    # This matches with the for tb_name in self.direct_transit
660                    else:
661                        self.add_control_portal(mtb, tb, masters, eid, topo, 
662                                tbparams, connInfo, expid)
663                        self.add_control_portal(tb, mtb, masters, eid, topo, 
664                                tbparams, connInfo, expid)
665
666        # Connect the portal nodes into the topologies and clear out
667        # substrates that are not in the topologies
668        for tb in tbparams:
669            topo[tb].incorporate_elements()
670            topo[tb].substrates = \
671                    [s for s in topo[tb].substrates \
672                        if len(s.interfaces) >0]
673
Note: See TracBrowser for help on using the repository browser.