source: fedd/federation/experiment_partition.py @ 0863dd1

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0863dd1 was 0863dd1, checked in by Ted Faber <faber@…>, 14 years ago

a few lingering element name bugs.

  • Property mode set to 100644
File size: 19.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58
59
60    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
61            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
62            expid=None):
63        """
64        Return a new internet portal node and a dict with the connectionInfo to
65        be attached.
66        """
67        seer_master = None
68        for m in masters.values():
69            for s in m:
70                if s.name == 'SEER':
71                    seer_master = m
72                    break
73            if seer_master: break
74
75        if seer_master:
76            mdomain = tbparams[seer_master].get('domain', '.example.com')
77            mproject = tbparams[seer_master].get('project', 'project')
78            muser = tbparams[seer_master].get('user', 'root')
79            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
80        else:
81            mdomain = '.example.com'
82            mproject = 'project'
83            muser = 'root'
84            smbshare = 'USERS'
85
86        dproject = tbparams[dt].get('project', 'project')
87        ddomain = tbparams[dt].get('domain', '.example.com')
88
89        if (st in masters and dt not in masters) or \
90                ( st not in masters and dt in masters ):
91            active = ("%s" % (st in masters))
92        else:
93            active = ("%s" % (st > dt))
94
95        ifaces = [ ]
96        for sub, attrs in iface_desc:
97            inf = topdl.Interface(
98                    name="inf%03d" % len(ifaces),
99                    substrate=sub,
100                    attribute=[
101                        topdl.Attribute(
102                            attribute=n,
103                            value = v)
104                        for n, v in attrs
105                        ]
106                    )
107            ifaces.append(inf)
108        if conn_type == "ssh":
109            try:
110                aid = tbparams[st]['allocID']['fedid']
111            except:
112                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
113                        % st)
114                aid = None
115            info = {
116                    "type" : conn_type, 
117                    "portal": myname,
118                    'fedAttr': [ 
119                            { 'attribute': 'masterdomain', 'value': mdomain},
120                            { 'attribute': 'masterexperiment', 'value': 
121                                "%s/%s" % (mproject, eid)},
122                            { 'attribute': 'active', 'value': active},
123                            # Move to SMB service description
124                            { 'attribute': 'masteruser', 'value': muser},
125                            { 'attribute': 'smbshare', 'value': smbshare},
126                        ],
127                    'parameter': [
128                        {
129                            'name': 'peer',
130                            'key': 'fedid:%s/%s' % (expid, myname),
131                            'store': self.store_url,
132                            'type': 'output',
133                        },
134                        {
135                            'name': 'ssh_port',
136                            'key': 'fedid:%s/%s-port' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'peer',
142                            'key': 'fedid:%s/%s' % (expid, desthost),
143                            'store': self.store_url,
144                            'type': 'input',
145                        },
146                        {
147                            'name': 'ssh_port',
148                            'key': 'fedid:%s/%s-port' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        ]
153                    }
154            # Give this allocation the rights to access the key of the
155            # peers
156            if aid:
157                for h in (myname, desthost):
158                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
159                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
160                            (expid, h))
161            else:
162                self.log.error("No aid for %s in new_portal_node" % st)
163        else:
164            info = None
165
166        return (topdl.Computer(
167                name=myname,
168                attribute=[ 
169                    topdl.Attribute(attribute=n,value=v)
170                        for n, v in (\
171                            ('portal', 'true'),
172                            ('portal_type', portal_type), 
173                            ('destination_testbed', dt),
174                        )
175                    ],
176                interface=ifaces,
177                ), info)
178
179    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
180        ddomain = tbparams[dt].get('domain', ".example.com")
181        dproject = tbparams[dt].get('project', 'project')
182        tsubstrate = \
183                topdl.Substrate(name='%s-%s' % (st, dt),
184                        attribute= [
185                            topdl.Attribute(
186                                attribute='portal',
187                                value='true')
188                            ]
189                        )
190        segment_element = topdl.Segment(
191                id= tbparams[dt]['allocID'],
192                type='emulab',
193                uri = self.tbmap.get(testbed_base(dt), None),
194                interface=[ 
195                    topdl.Interface(
196                        substrate=tsubstrate.name),
197                    ],
198                attribute = [
199                    topdl.Attribute(attribute=n, value=v)
200                        for n, v in (\
201                            ('domain', ddomain),
202                            ('experiment', "%s/%s" % \
203                                    (dproject, eid)),)
204                    ],
205                )
206
207        return (tsubstrate, segment_element)
208
209    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
210            tb_name):
211        if sub.capacity is None:
212            raise service_error(service_error.internal,
213                    "Cannot DRAGON split substrate w/o capacity")
214        segs = [ ]
215        name = join_testbed(tb_name, "%d" % idx)
216        substr = topdl.Substrate(name=name, 
217                capacity=sub.capacity.clone(),
218                attribute=[ topdl.Attribute(attribute=n, value=v)
219                    for n, v, in (\
220                            ('vlan', 'unassigned%d' % idx),)])
221        store_key = 'fedid:%s/vlan%d' % (expid, idx)
222        for tb in tbs.keys():
223            seg = topdl.Segment(
224                    id = tbparams[tb]['allocID'],
225                    type='emulab',
226                    uri = self.tbmap.get(testbed_base(tb), None),
227                    interface=[ 
228                        topdl.Interface(
229                            substrate=substr.name),
230                        ],
231                    attribute=[ topdl.Attribute(
232                        attribute='%s_endpoint' % tb_name, 
233                        value=tbparams[tb][tb_name]),
234                        ]
235                    )
236            vlan_key = "%s_vlans" % tb_name
237            if vlan_key in tbparams[tb]:
238                seg.set_attribute(vlan_key, tbparams[tb][vlan_key])
239            segs.append(seg)
240
241            # Give this allocation the rights to access the key of the
242            # vlan_id
243            try:
244                aid = tbparams[tb]['allocID']['fedid']
245                self.auth.set_attribute(aid, store_key)
246            except:
247                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
248                        % tb)
249
250        connInfo[name] = [ { 
251            'type': 'transit',
252            'parameter': [ { 
253                'name': 'vlan_id',
254                'key': store_key,
255                'store': self.store_url,
256                'type': 'output'
257                } ]
258            } ]
259
260        topo[name] = \
261                topdl.Topology(substrates=[substr], elements=segs,
262                        attribute=[
263                            topdl.Attribute(attribute="transit", value='true'),
264                            topdl.Attribute(attribute="dynamic", value='true'),
265                            topdl.Attribute(attribute="testbed", value=tb_name),
266                            topdl.Attribute(attribute="store_keys", 
267                                value=store_key),
268                            ]
269                        )
270
271    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
272            connInfo, expid=None, tb_name=None):
273        """
274        Add attribiutes to the various elements indicating that they are to be
275        dragon connected and create a dragon segment in topo to be
276        instantiated.
277        """
278
279        def get_substrate_from_topo(name, t):
280            for s in t.substrates:
281                if s.name == name: return s
282            else: return None
283
284
285        seer_master = None
286        for m in masters.values():
287            for s in m:
288                if s.name == 'SEER':
289                    seer_master = m
290                    break
291            if seer_master: break
292
293        if seer_master:
294            mdomain = tbparams[seer_master].get('domain', '.example.com')
295            mproject = tbparams[seer_master].get('project', 'project')
296        else:
297            mdomain = '.example.com'
298            mproject = 'project'
299
300        # dn is the number of previously created direct nets on this direct
301        # testbed.  This routine creates a net numbered by dn
302        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
303        # Count the number of interfaces on this substrate in each testbed from
304        # the global topology
305        count = { }
306        node = { }
307        for e in [ i.element for i in sub.interfaces ]:
308            tb = e.get_attribute('testbed')
309            count[tb] = count.get(tb, 0) + 1
310            node[tb] = i.get_attribute('ip4_address')
311
312
313        # Set the attributes in the copies that will allow setup of direct
314        # connections.
315        for tb in tbs.keys():
316            s = get_substrate_from_topo(sub.name, topo[tb])
317            if s:
318                if not connInfo.has_key(tb):
319                    connInfo[tb] = [ ]
320
321                try:
322                    aid = tbparams[tb]['allocID']['fedid']
323                except:
324                    self.log.debug("[create_direct_substrate] " + 
325                            "Can't get alloc id for %s?" %tb)
326                    aid = None
327
328                # This may need another look, but only a service gateway will
329                # look at the active parameter, and these are only inserted to
330                # connect to a master.
331                active = "%s" % ( tb in masters)
332                info = {
333                        'type': 'transit',
334                        'member': [ {
335                            'element': i.element.name, 
336                            'interface': i.name
337                            } for i in s.interfaces \
338                                    if isinstance(i.element, topdl.Computer) ],
339                        'fedAttr': [ 
340                            { 'attribute': 'masterdomain', 'value': mdomain},
341                            { 'attribute': 'masterexperiment', 'value': 
342                                "%s/%s" % (mproject, eid)},
343                            { 'attribute': 'active', 'value': active},
344                            ],
345                        'parameter': [ {
346                            'name': 'vlan_id',
347                            'key': 'fedid:%s/vlan%d' % (expid, dn),
348                            'store': self.store_url,
349                            'type': 'input',
350                            } ]
351                        }
352                if tbs.has_key(tb):
353                    info['peer'] = tbs[tb]
354                connInfo[tb].append(info)
355
356                # Give this allocation the rights to access the key of the
357                # vlan_id
358                if aid:
359                    self.auth.set_attribute(aid, 
360                            'fedid:%s/vlan%d' % (expid, dn))
361            else:
362                raise service_error(service_error.internal,
363                        "No substrate %s in testbed %s" % (sub.name, tb))
364
365        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
366                tb_name)
367
368    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
369            segment_substrate, portals, connInfo, expid):
370        # More than one testbed is on this substrate.  Insert
371        # some portals into the subtopologies.  st == source testbed,
372        # dt == destination testbed.
373        for st in tbs.keys():
374            if not segment_substrate.has_key(st):
375                segment_substrate[st] = { }
376            if not portals.has_key(st): 
377                portals[st] = { }
378            if not connInfo.has_key(st):
379                connInfo[st] = [ ]
380            for dt in [ t for t in tbs.keys() if t != st]:
381                sproject = tbparams[st].get('project', 'project')
382                dproject = tbparams[dt].get('project', 'project')
383                sdomain = tbparams[st].get('domain', ".example.com")
384                ddomain = tbparams[dt].get('domain', ".example.com")
385                aid = tbparams[dt]['allocID']['fedid']
386
387                seer_master = None
388
389                for m in masters.values():
390                    for s in m:
391                        if s.name == 'SEER':
392                            seer_master = m
393                            break
394                    if seer_master: break
395
396                if seer_master:
397                    mdomain = tbparams[seer_master].get('domain', '.example.com')
398                    mproject = tbparams[seer_master].get('project', 'project')
399                    muser = tbparams[seer_master].get('user', 'root')
400                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
401                else:
402                    mdomain = '.example.com'
403                    mproject = 'project'
404                    muser = 'root'
405                    smbshare = 'USERS'
406
407                if (st in masters  and dt not in masters) or \
408                        (st not in masters and dt in masters):
409                    active = ("%s" % (st in masters))
410                else:
411                    active = ("%s" %(st > dt))
412
413                if not segment_substrate[st].has_key(dt):
414                    # Put a substrate and a segment for the connected
415                    # testbed in there.
416                    tsubstrate, segment_element = \
417                            self.new_portal_substrate(st, dt, eid, tbparams,
418                                    expid)
419                    segment_substrate[st][dt] = tsubstrate
420                    topo[st].substrates.append(tsubstrate)
421                    topo[st].elements.append(segment_element)
422
423                new_portal = False
424                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
425                else: dname = dt
426
427                if testbed_suffix(st): sname = "-".join(split_testbed(st))
428                else: sname = st
429
430                if portals[st].has_key(dt):
431                    # There's a portal set up to go to this destination.
432                    # See if there's room to multiplex this connection on
433                    # it.  If so, add an interface to the portal; if not,
434                    # set up to add a portal below.
435                    # [This little festival of braces is just a pop of the
436                    # last element in the list of portals between st and
437                    # dt.]
438                    portal = portals[st][dt][-1]
439                    mux = len([ i for i in portal.interface \
440                            if not i.get_attribute('portal')])
441                    if mux == self.muxmax:
442                        new_portal = True
443                        portal_type = "experiment"
444                        myname = "%stunnel%d" % (dname.lower(),
445                                len(portals[st][dt]))
446                        desthost = "%stunnel%d" % (sname.lower(), 
447                                len(portals[st][dt]))
448                    else:
449                        new_i = topdl.Interface(
450                                substrate=sub.name,
451                                attribute=[ 
452                                    topdl.Attribute(
453                                        attribute='ip4_address', 
454                                        value=tbs[dt]
455                                    )
456                                ])
457                        portal.interface.append(new_i)
458                else:
459                    # First connection to this testbed, make an empty list
460                    # and set up to add the new portal below
461                    new_portal = True
462                    portals[st][dt] = [ ]
463                    myname = "%stunnel%d" % (dname.lower(),
464                            len(portals[st][dt]))
465                    desthost = "%stunnel%d" % (sname.lower(),
466                            len(portals[st][dt]))
467
468                    if dt in masters or st in masters: portal_type = "both"
469                    else: portal_type = "experiment"
470
471                if new_portal:
472                    infs = (
473                            (segment_substrate[st][dt].name, 
474                                (('portal', 'true'),)),
475                            (sub.name, 
476                                (('ip4_address', tbs[dt]),))
477                        )
478                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
479                            masters, eid, myname, desthost, portal_type,
480                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
481
482                    topo[st].elements.append(portal)
483                    portals[st][dt].append(portal)
484                    connInfo[st].append(info)
485
486    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
487            connInfo, expid):
488        # Add to the master testbed
489        tsubstrate, segment_element = \
490                self.new_portal_substrate(st, dt, eid, tbparams, expid)
491        myname = "%stunnel" % dt
492        desthost = "%stunnel" % st
493
494        portal, info = self.new_portal_node(st, dt, tbparams, masters,
495                eid, myname, desthost, "control", 
496                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
497                conn_attrs=[], expid=expid)
498
499        topo[st].substrates.append(tsubstrate)
500        topo[st].elements.append(segment_element)
501        topo[st].elements.append(portal)
502        if not connInfo.has_key(st):
503            connInfo[st] = [ ]
504        connInfo[st].append(info)
505
506    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
507            substrate, tbparams, expid, tb_name):
508        # Add to the master testbed
509        if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt))
510        else: myname = "%stunnel" % dt
511
512        desthost = "%s" % ip_addr(dip)
513
514        portal, info = self.new_portal_node(st, dt, tbparams, masters,
515                eid, myname, desthost, "control", 
516                ((substrate.name,(
517                    ('portal','true'),
518                    ('ip4_address', "%s" % ip_addr(myip)),)),),
519                conn_type="transit", conn_attrs=[], expid=expid)
520
521        return portal
522
523    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
524            connInfo, expid):
525        """
526        For each substrate in the main topology, find those that
527        have nodes on more than one testbed.  Insert portal nodes
528        into the copies of those substrates on the sub topologies.
529        """
530        segment_substrate = { }
531        portals = { }
532        for s in top.substrates:
533            # tbs will contain an ip address on this subsrate that is in
534            # each testbed.
535            tbs = { }
536            for i in s.interfaces:
537                e = i.element
538                tb = e.get_attribute('testbed')
539                if tb and not tbs.has_key(tb):
540                    for i in e.interface:
541                        if s in i.subs:
542                            tbs[tb]= i.get_attribute('ip4_address')
543            if len(tbs) < 2:
544                continue
545
546            base_tbs = set([testbed_base(t) for t in tbs])
547
548            # DRAGON will not create multi-site vlans yet, so we don't do multi
549            # connection direct transits yet.
550            if len(tbs) == 2 :
551                # NB: the else if on the for loop - if none of the direct
552                # transits is applicable, use the internet.
553                for d in self.direct_transit:
554                    if all([tbparams[x].has_key(d) for x in tbs]):
555                        self.create_direct_substrate(s, topo, tbs, tbparams, 
556                                masters, eid, connInfo, expid, d)
557                        break
558                else:
559                    self.insert_internet_portals(s, topo, tbs, tbparams,
560                            masters, eid, segment_substrate, portals,
561                            connInfo, expid)
562            else:
563                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
564                        eid, segment_substrate, portals, connInfo, expid)
565
566
567        # Make sure that all the service importers have a control portal back
568        # to the master for each service.
569        for mtb in [ t for t in tbparams if t in masters ]:
570            importers = set([])
571            for m in masters[mtb]:
572                importers |= set(m.importers)
573            if mtb in importers:
574                importers.discard(mtb)
575            for tb in importers:
576                if len([e for e in topo[tb].elements \
577                        if isinstance(e, topdl.Computer) and \
578                        e.get_attribute('destination_testbed') == mtb and \
579                        e.get_attribute('portal') and \
580                        e.get_attribute('portal_type') == 'both']) == 0:
581
582                    for tb_name in self.direct_transit:
583
584                        if tbparams[mtb].has_key(tb_name) \
585                                and tbparams[tb].has_key(tb_name):
586
587                            idx = len([x for x in topo.keys() \
588                                    if x.startswith(tb_name)])
589                            dip, leng = ip_allocator.allocate(4)
590                            dip += 1
591                            mip = dip+1
592                            csub = topdl.Substrate(
593                                    name="%s-control-%s" % (tb_name, tb),
594                                    capacity=topdl.Capacity(100000.0, 'max'),
595                                    attribute=[
596                                        topdl.Attribute(
597                                            attribute='portal',
598                                            value='true'
599                                            )
600                                        ]
601                                    )
602                            seg = topdl.Segment(
603                                    id= tbparams[mtb]['allocID'],
604                                    type='emulab',
605                                    uri = self.tbmap.get(testbed_base(mtb),
606                                        None),
607                                    interface=[ 
608                                        topdl.Interface(
609                                            substrate=csub.name),
610                                        ],
611                                    attribute = [
612                                        topdl.Attribute(attribute=n, value=v)
613                                            for n, v in (\
614                                                ('domain', 
615                                                    tbparams[mtb].get('domain',
616                                                        ".example.com")),
617                                                ('experiment', "%s/%s" % \
618                                                        (tbparams[mtb].get(
619                                                            'project', 
620                                                            'project'), 
621                                                            eid)),)
622                                        ],
623                                    )
624                            portal = self.new_direct_portal(tb, mtb,
625                                    masters, eid, dip, mip, idx, csub,
626                                    tbparams, expid, tb_name)
627                            topo[tb].substrates.append(csub)
628                            topo[tb].elements.append(portal)
629                            topo[tb].elements.append(seg)
630
631                            mcsub = csub.clone()
632                            seg = topdl.Segment(
633                                    id= tbparams[tb]['allocID'],
634                                    type='emulab',
635                                    uri = self.tbmap.get(testbed_base(tb),
636                                        None),
637                                    interface=[ 
638                                        topdl.Interface(
639                                            substrate=csub.name),
640                                        ],
641                                    attribute = [
642                                        topdl.Attribute(attribute=n, value=v)
643                                            for n, v in (\
644                                                ('domain', 
645                                                    tbparams[tb].get('domain',
646                                                        ".example.com")),
647                                                ('experiment', "%s/%s" % \
648                                                        (tbparams[tb].get('project', 
649                                                            'project'), 
650                                                            eid)),)
651                                        ],
652                                    )
653                            portal = self.new_direct_portal(mtb, tb, masters,
654                                    eid, mip, dip, idx, mcsub, tbparams, expid,
655                                    tb_name)
656                            topo[mtb].substrates.append(mcsub)
657                            topo[mtb].elements.append(portal)
658                            topo[mtb].elements.append(seg)
659                            for t in (mtb, tb):
660                                topo[t].incorporate_elements()
661
662                            self.create_direct_substrate(csub, topo, 
663                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
664                                    tbparams, masters, eid, connInfo,
665                                    expid, tb_name)
666                            break
667                    # This matches with the for tb_name in self.direct_transit
668                    else:
669                        self.add_control_portal(mtb, tb, masters, eid, topo, 
670                                tbparams, connInfo, expid)
671                        self.add_control_portal(tb, mtb, masters, eid, topo, 
672                                tbparams, connInfo, expid)
673
674        # Connect the portal nodes into the topologies and clear out
675        # substrates that are not in the topologies
676        for tb in tbparams:
677            topo[tb].incorporate_elements()
678            topo[tb].substrates = \
679                    [s for s in topo[tb].substrates \
680                        if len(s.interfaces) >0]
681
Note: See TracBrowser for help on using the repository browser.