source: fedd/federation/experiment_partition.py @ f3803ea

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since f3803ea was f3803ea, checked in by Ted Faber <faber@…>, 14 years ago

missed some spots getting subtestbeds right

  • Property mode set to 100644
File size: 18.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2):
46        """
47        Intialize the various attributes
48        """
49
50        self.log = logging.getLogger("fedd.experiment_control." + \
51                "experiment_paritition")
52        self.auth = auth
53        self.store_url = store_url
54        self.tbmap = tbmap
55        self.muxmax = muxmax
56
57
58    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
59            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
60            expid=None):
61        """
62        Return a new internet portal node and a dict with the connectionInfo to
63        be attached.
64        """
65        seer_master = None
66        for m in masters.values():
67            for s in m:
68                if s.name == 'SEER':
69                    seer_master = m
70                    break
71            if seer_master: break
72
73        if seer_master:
74            mdomain = tbparams[seer_master].get('domain', '.example.com')
75            mproject = tbparams[seer_master].get('project', 'project')
76            muser = tbparams[seer_master].get('user', 'root')
77            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
78        else:
79            mdomain = '.example.com'
80            mproject = 'project'
81            muser = 'root'
82            smbshare = 'USERS'
83
84        dproject = tbparams[dt].get('project', 'project')
85        ddomain = tbparams[dt].get('domain', '.example.com')
86
87        if (st in masters and dt not in masters) or \
88                ( st not in masters and dt in masters ):
89            active = ("%s" % (st in masters))
90        else:
91            active = ("%s" % (st > dt))
92
93        print "%s %s" % (st, active)
94
95        ifaces = [ ]
96        for sub, attrs in iface_desc:
97            inf = topdl.Interface(
98                    name="inf%03d" % len(ifaces),
99                    substrate=sub,
100                    attribute=[
101                        topdl.Attribute(
102                            attribute=n,
103                            value = v)
104                        for n, v in attrs
105                        ]
106                    )
107            ifaces.append(inf)
108        if conn_type == "ssh":
109            try:
110                aid = tbparams[st]['allocID']['fedid']
111            except:
112                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
113                        % st)
114                aid = None
115            info = {
116                    "type" : conn_type, 
117                    "portal": myname,
118                    'fedAttr': [ 
119                            { 'attribute': 'masterdomain', 'value': mdomain},
120                            { 'attribute': 'masterexperiment', 'value': 
121                                "%s/%s" % (mproject, eid)},
122                            { 'attribute': 'active', 'value': active},
123                            # Move to SMB service description
124                            { 'attribute': 'masteruser', 'value': muser},
125                            { 'attribute': 'smbshare', 'value': smbshare},
126                        ],
127                    'parameter': [
128                        {
129                            'name': 'peer',
130                            'key': 'fedid:%s/%s' % (expid, myname),
131                            'store': self.store_url,
132                            'type': 'output',
133                        },
134                        {
135                            'name': 'ssh_port',
136                            'key': 'fedid:%s/%s-port' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'peer',
142                            'key': 'fedid:%s/%s' % (expid, desthost),
143                            'store': self.store_url,
144                            'type': 'input',
145                        },
146                        {
147                            'name': 'ssh_port',
148                            'key': 'fedid:%s/%s-port' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        ]
153                    }
154            # Give this allocation the rights to access the key of the
155            # peers
156            if aid:
157                for h in (myname, desthost):
158                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
159                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
160                            (expid, h))
161            else:
162                self.log.error("No aid for %s in new_portal_node" % st)
163        else:
164            info = None
165
166        return (topdl.Computer(
167                name=myname,
168                attribute=[ 
169                    topdl.Attribute(attribute=n,value=v)
170                        for n, v in (\
171                            ('portal', 'true'),
172                            ('portal_type', portal_type), 
173                            ('destination_testbed', dt),
174                        )
175                    ],
176                interface=ifaces,
177                ), info)
178
179    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
180        ddomain = tbparams[dt].get('domain', ".example.com")
181        dproject = tbparams[dt].get('project', 'project')
182        tsubstrate = \
183                topdl.Substrate(name='%s-%s' % (st, dt),
184                        attribute= [
185                            topdl.Attribute(
186                                attribute='portal',
187                                value='true')
188                            ]
189                        )
190        segment_element = topdl.Segment(
191                id= tbparams[dt]['allocID'],
192                type='emulab',
193                uri = self.tbmap.get(testbed_base(dt), None),
194                interface=[ 
195                    topdl.Interface(
196                        substrate=tsubstrate.name),
197                    ],
198                attribute = [
199                    topdl.Attribute(attribute=n, value=v)
200                        for n, v in (\
201                            ('domain', ddomain),
202                            ('experiment', "%s/%s" % \
203                                    (dproject, eid)),)
204                    ],
205                )
206
207        return (tsubstrate, segment_element)
208
209    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
210        if sub.capacity is None:
211            raise service_error(service_error.internal,
212                    "Cannot DRAGON split substrate w/o capacity")
213        segs = [ ]
214        name = join_testbed("dragon", "%d" % idx)
215        substr = topdl.Substrate(name=name, 
216                capacity=sub.capacity.clone(),
217                attribute=[ topdl.Attribute(attribute=n, value=v)
218                    for n, v, in (\
219                            ('vlan', 'unassigned%d' % idx),)])
220        store_key = 'fedid:%s/vlan%d' % (expid, idx)
221        for tb in tbs.keys():
222            seg = topdl.Segment(
223                    id = tbparams[tb]['allocID'],
224                    type='emulab',
225                    uri = self.tbmap.get(testbed_base(tb), None),
226                    interface=[ 
227                        topdl.Interface(
228                            substrate=substr.name),
229                        ],
230                    attribute=[ topdl.Attribute(
231                        attribute='dragon_endpoint', 
232                        value=tbparams[tb]['dragon']),
233                        ]
234                    )
235            if tbparams[tb].has_key('vlans'):
236                seg.set_attribute('vlans', tbparams[tb]['vlans'])
237            segs.append(seg)
238
239            # Give this allocation the rights to access the key of the
240            # vlan_id
241            try:
242                aid = tbparams[tb]['allocID']['fedid']
243                self.auth.set_attribute(aid, store_key)
244            except:
245                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
246                        % tb)
247
248        connInfo[name] = [ { 
249            'type': 'transit',
250            'parameter': [ { 
251                'name': 'vlan_id',
252                'key': store_key,
253                'store': self.store_url,
254                'type': 'output'
255                } ]
256            } ]
257
258        topo[name] = \
259                topdl.Topology(substrates=[substr], elements=segs,
260                        attribute=[
261                            topdl.Attribute(attribute="transit", value='true'),
262                            topdl.Attribute(attribute="dynamic", value='true'),
263                            topdl.Attribute(attribute="testbed", 
264                                value='dragon'),
265                            topdl.Attribute(attribute="store_keys", 
266                                value=store_key),
267                            ]
268                        )
269
270    def create_dragon_substrate(self, sub, topo, tbs, tbparams, masters, eid,
271            connInfo, expid=None):
272        """
273        Add attribiutes to the various elements indicating that they are to be
274        dragon connected and create a dragon segment in topo to be
275        instantiated.
276        """
277
278        def get_substrate_from_topo(name, t):
279            for s in t.substrates:
280                if s.name == name: return s
281            else: return None
282
283
284        seer_master = None
285        for m in masters.values():
286            for s in m:
287                if s.name == 'SEER':
288                    seer_master = m
289                    break
290            if seer_master: break
291
292        if seer_master:
293            mdomain = tbparams[seer_master].get('domain', '.example.com')
294            mproject = tbparams[seer_master].get('project', 'project')
295        else:
296            mdomain = '.example.com'
297            mproject = 'project'
298
299        # dn is the number of previously created dragon nets.  This routine
300        # creates a net numbered by dn
301        dn = len([x for x in topo.keys() if x.startswith('dragon')])
302        # Count the number of interfaces on this substrate in each testbed from
303        # the global topology
304        count = { }
305        node = { }
306        for e in [ i.element for i in sub.interfaces ]:
307            tb = e.get_attribute('testbed')
308            count[tb] = count.get(tb, 0) + 1
309            node[tb] = i.get_attribute('ip4_address')
310
311
312        # Set the attributes in the copies that will allow setup of dragon
313        # connections.
314        for tb in tbs.keys():
315            s = get_substrate_from_topo(sub.name, topo[tb])
316            if s:
317                if not connInfo.has_key(tb):
318                    connInfo[tb] = [ ]
319
320                try:
321                    aid = tbparams[tb]['allocID']['fedid']
322                except:
323                    self.log.debug("[creat_dragon_substrate] " + 
324                            "Can't get alloc id for %s?" %tb)
325                    aid = None
326
327                # This may need another look, but only a service gateway will
328                # look at the active parameter, and these are only inserted to
329                # connect to a master.
330                active = "%s" % ( tb in masters)
331                info = {
332                        'type': 'transit',
333                        'member': [ {
334                            'element': i.element.name[0], 
335                            'interface': i.name
336                            } for i in s.interfaces \
337                                    if isinstance(i.element, topdl.Computer) ],
338                        'fedAttr': [ 
339                            { 'attribute': 'masterdomain', 'value': mdomain},
340                            { 'attribute': 'masterexperiment', 'value': 
341                                "%s/%s" % (mproject, eid)},
342                            { 'attribute': 'active', 'value': active},
343                            ],
344                        'parameter': [ {
345                            'name': 'vlan_id',
346                            'key': 'fedid:%s/vlan%d' % (expid, dn),
347                            'store': self.store_url,
348                            'type': 'input',
349                            } ]
350                        }
351                if tbs.has_key(tb):
352                    info['peer'] = tbs[tb]
353                connInfo[tb].append(info)
354
355                # Give this allocation the rights to access the key of the
356                # vlan_id
357                if aid:
358                    self.auth.set_attribute(aid, 
359                            'fedid:%s/vlan%d' % (expid, dn))
360            else:
361                raise service_error(service_error.internal,
362                        "No substrate %s in testbed %s" % (sub.name, tb))
363
364        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
365
366    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
367            segment_substrate, portals, connInfo, expid):
368        # More than one testbed is on this substrate.  Insert
369        # some portals into the subtopologies.  st == source testbed,
370        # dt == destination testbed.
371        for st in tbs.keys():
372            if not segment_substrate.has_key(st):
373                segment_substrate[st] = { }
374            if not portals.has_key(st): 
375                portals[st] = { }
376            if not connInfo.has_key(st):
377                connInfo[st] = [ ]
378            for dt in [ t for t in tbs.keys() if t != st]:
379                sproject = tbparams[st].get('project', 'project')
380                dproject = tbparams[dt].get('project', 'project')
381                sdomain = tbparams[st].get('domain', ".example.com")
382                ddomain = tbparams[dt].get('domain', ".example.com")
383                aid = tbparams[dt]['allocID']['fedid']
384
385                seer_master = None
386
387                for m in masters.values():
388                    for s in m:
389                        if s.name == 'SEER':
390                            seer_master = m
391                            break
392                    if seer_master: break
393
394                if seer_master:
395                    mdomain = tbparams[seer_master].get('domain', '.example.com')
396                    mproject = tbparams[seer_master].get('project', 'project')
397                    muser = tbparams[seer_master].get('user', 'root')
398                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
399                else:
400                    mdomain = '.example.com'
401                    mproject = 'project'
402                    muser = 'root'
403                    smbshare = 'USERS'
404
405                if (st in masters  and dt not in masters) or \
406                        (st not in masters and dt in masters):
407                    active = ("%s" % (st in masters))
408                else:
409                    active = ("%s" %(st > dt))
410
411                if not segment_substrate[st].has_key(dt):
412                    # Put a substrate and a segment for the connected
413                    # testbed in there.
414                    tsubstrate, segment_element = \
415                            self.new_portal_substrate(st, dt, eid, tbparams,
416                                    expid)
417                    segment_substrate[st][dt] = tsubstrate
418                    topo[st].substrates.append(tsubstrate)
419                    topo[st].elements.append(segment_element)
420
421                new_portal = False
422                dname = "_".join(split_testbed(dt))
423                sname = "_".join(split_testbed(st))
424                if portals[st].has_key(dt):
425                    # There's a portal set up to go to this destination.
426                    # See if there's room to multiplex this connection on
427                    # it.  If so, add an interface to the portal; if not,
428                    # set up to add a portal below.
429                    # [This little festival of braces is just a pop of the
430                    # last element in the list of portals between st and
431                    # dt.]
432                    portal = portals[st][dt][-1]
433                    mux = len([ i for i in portal.interface \
434                            if not i.get_attribute('portal')])
435                    if mux == self.muxmax:
436                        new_portal = True
437                        portal_type = "experiment"
438                        myname = "%stunnel%d" % (dname.lower(),
439                                len(portals[st][dt]))
440                        desthost = "%stunnel%d" % (sname.lower(), 
441                                len(portals[st][dt]))
442                    else:
443                        new_i = topdl.Interface(
444                                substrate=sub.name,
445                                attribute=[ 
446                                    topdl.Attribute(
447                                        attribute='ip4_address', 
448                                        value=tbs[dt]
449                                    )
450                                ])
451                        portal.interface.append(new_i)
452                else:
453                    # First connection to this testbed, make an empty list
454                    # and set up to add the new portal below
455                    new_portal = True
456                    portals[st][dt] = [ ]
457                    myname = "%stunnel%d" % (dname.lower(),
458                            len(portals[st][dt]))
459                    desthost = "%stunnel%d" % (sname.lower(),
460                            len(portals[st][dt]))
461
462                    if dt in masters or st in masters: portal_type = "both"
463                    else: portal_type = "experiment"
464
465                if new_portal:
466                    infs = (
467                            (segment_substrate[st][dt].name, 
468                                (('portal', 'true'),)),
469                            (sub.name, 
470                                (('ip4_address', tbs[dt]),))
471                        )
472                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
473                            masters, eid, myname, desthost, portal_type,
474                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
475
476                    topo[st].elements.append(portal)
477                    portals[st][dt].append(portal)
478                    connInfo[st].append(info)
479
480    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
481            connInfo, expid):
482        # Add to the master testbed
483        tsubstrate, segment_element = \
484                self.new_portal_substrate(st, dt, eid, tbparams, expid)
485        myname = "%stunnel" % dt
486        desthost = "%stunnel" % st
487
488        portal, info = self.new_portal_node(st, dt, tbparams, masters,
489                eid, myname, desthost, "control", 
490                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
491                conn_attrs=[], expid=expid)
492
493        topo[st].substrates.append(tsubstrate)
494        topo[st].elements.append(segment_element)
495        topo[st].elements.append(portal)
496        if not connInfo.has_key(st):
497            connInfo[st] = [ ]
498        connInfo[st].append(info)
499
500    def new_dragon_portal(self, st, dt, masters, eid, myip, dip, idx, 
501            substrate, tbparams, expid):
502        # Add to the master testbed
503        myname = "%stunnel" % dt
504        desthost = "%s" % ip_addr(dip)
505
506        portal, info = self.new_portal_node(st, dt, tbparams, masters,
507                eid, myname, desthost, "control", 
508                ((substrate.name,(
509                    ('portal','true'),
510                    ('ip4_address', "%s" % ip_addr(myip)),)),),
511                conn_type="transit", conn_attrs=[], expid=expid)
512
513        return portal
514
515    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
516            connInfo, expid):
517        """
518        For each substrate in the main topology, find those that
519        have nodes on more than one testbed.  Insert portal nodes
520        into the copies of those substrates on the sub topologies.
521        """
522        segment_substrate = { }
523        portals = { }
524        for s in top.substrates:
525            # tbs will contain an ip address on this subsrate that is in
526            # each testbed.
527            tbs = { }
528            for i in s.interfaces:
529                e = i.element
530                tb = e.get_attribute('testbed')
531                if tb and not tbs.has_key(tb):
532                    for i in e.interface:
533                        if s in i.subs:
534                            tbs[tb]= i.get_attribute('ip4_address')
535            if len(tbs) < 2:
536                continue
537
538            base_tbs = set([testbed_base(t) for t in tbs])
539
540            # DRAGON will not create multi-site vlans yet
541            if len(tbs) == 2 and len(base_tbs) == 2 and \
542                    all([tbparams[x].has_key('dragon') for x in tbs]):
543                self.create_dragon_substrate(s, topo, tbs, tbparams, 
544                        masters, eid, connInfo, expid)
545            else:
546                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
547                        eid, segment_substrate, portals, connInfo, expid)
548
549        # Make sure that all the service importers have a control portal back
550        # to the master for each service.
551        for mtb in [ t for t in tbparams if t in masters ]:
552            importers = set([])
553            for m in masters[mtb]:
554                importers |= set(m.importers)
555            for tb in importers:
556                if len([e for e in topo[tb].elements \
557                        if isinstance(e, topdl.Computer) and \
558                        e.get_attribute('destination_testbed') == mtb and \
559                        e.get_attribute('portal') and \
560                        e.get_attribute('portal_type') == 'both']) == 0:
561
562                    if tbparams[mtb].has_key('dragon') \
563                            and tbparams[tb].has_key('dragon'):
564
565                        idx = len([x for x in topo.keys() \
566                                if x.startswith('dragon')])
567                        dip, leng = ip_allocator.allocate(4)
568                        dip += 1
569                        mip = dip+1
570                        csub = topdl.Substrate(
571                                name="dragon-control-%s" % tb,
572                                capacity=topdl.Capacity(100000.0, 'max'),
573                                attribute=[
574                                    topdl.Attribute(
575                                        attribute='portal',
576                                        value='true'
577                                        )
578                                    ]
579                                )
580                        seg = topdl.Segment(
581                                id= tbparams[mtb]['allocID'],
582                                type='emulab',
583                                uri = self.tbmap.get(testbed_base(mtb), None),
584                                interface=[ 
585                                    topdl.Interface(
586                                        substrate=csub.name),
587                                    ],
588                                attribute = [
589                                    topdl.Attribute(attribute=n, value=v)
590                                        for n, v in (\
591                                            ('domain', 
592                                                tbparams[mtb].get('domain',
593                                                    ".example.com")),
594                                            ('experiment', "%s/%s" % \
595                                                    (tbparams[mtb].get(
596                                                        'project', 
597                                                        'project'), 
598                                                        eid)),)
599                                    ],
600                                )
601                        portal = self.new_dragon_portal(tb, mtb,
602                                masters, eid, dip, mip, idx, csub,
603                                tbparams, expid)
604                        topo[tb].substrates.append(csub)
605                        topo[tb].elements.append(portal)
606                        topo[tb].elements.append(seg)
607
608                        mcsub = csub.clone()
609                        seg = topdl.Segment(
610                                id= tbparams[tb]['allocID'],
611                                type='emulab',
612                                uri = self.tbmap.get(testbed_base(tb), None),
613                                interface=[ 
614                                    topdl.Interface(
615                                        substrate=csub.name),
616                                    ],
617                                attribute = [
618                                    topdl.Attribute(attribute=n, value=v)
619                                        for n, v in (\
620                                            ('domain', 
621                                                tbparams[tb].get('domain',
622                                                    ".example.com")),
623                                            ('experiment', "%s/%s" % \
624                                                    (tbparams[tb].get('project', 
625                                                        'project'), 
626                                                        eid)),)
627                                    ],
628                                )
629                        portal = self.new_dragon_portal(mtb, tb, masters,
630                                eid, mip, dip, idx, mcsub, tbparams, expid)
631                        topo[mtb].substrates.append(mcsub)
632                        topo[mtb].elements.append(portal)
633                        topo[mtb].elements.append(seg)
634                        for t in (mtb, tb):
635                            topo[t].incorporate_elements()
636
637                        self.create_dragon_substrate(csub, topo, 
638                                {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
639                                tbparams, masters, eid, connInfo,
640                                expid)
641                    else:
642                        self.add_control_portal(mtb, tb, masters, eid, topo, 
643                                tbparams, connInfo, expid)
644                        self.add_control_portal(tb, mtb, masters, eid, topo, 
645                                tbparams, connInfo, expid)
646
647        # Connect the portal nodes into the topologies and clear out
648        # substrates that are not in the topologies
649        for tb in tbparams:
650            topo[tb].incorporate_elements()
651            topo[tb].substrates = \
652                    [s for s in topo[tb].substrates \
653                        if len(s.interfaces) >0]
654
Note: See TracBrowser for help on using the repository browser.