source: fedd/federation/experiment_partition.py @ 0c4b12c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0c4b12c was 0c4b12c, checked in by Ted Faber <faber@…>, 14 years ago

more de mastering

  • Property mode set to 100644
File size: 17.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2):
46        """
47        Intialize the various attributes
48        """
49
50        self.log = logging.getLogger("fedd.experiment_control." + \
51                "experiment_paritition")
52        self.auth = auth
53        self.store_url = store_url
54        self.tbmap = tbmap
55        self.muxmax = muxmax
56
57
58    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
59            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
60            expid=None):
61        """
62        Return a new internet portal node and a dict with the connectionInfo to
63        be attached.
64        """
65        seer_master = None
66        for k, s in masters.items():
67            if 'SEER' in s:
68                seer_master = k
69                break
70
71        if seer_master:
72            mdomain = tbparams[seer_master].get('domain', '.example.com')
73            mproject = tbparams[seer_master].get('project', 'project')
74            muser = tbparams[seer_master].get('user', 'root')
75            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
76        else:
77            mdomain = '.example.com'
78            mproject = 'project'
79            muser = 'root'
80            smbshare = 'USERS'
81
82        dproject = tbparams[dt].get('project', 'project')
83        ddomain = tbparams[dt].get('domain', '.example.com')
84
85        if (st in masters and dt not in masters) or \
86                ( st in masters and dt not in masters ):
87            active = ("%s" % (st in masters))
88        else:
89            active = ("%s" % (st > dt))
90
91        ifaces = [ ]
92        for sub, attrs in iface_desc:
93            inf = topdl.Interface(
94                    name="inf%03d" % len(ifaces),
95                    substrate=sub,
96                    attribute=[
97                        topdl.Attribute(
98                            attribute=n,
99                            value = v)
100                        for n, v in attrs
101                        ]
102                    )
103            ifaces.append(inf)
104        if conn_type == "ssh":
105            try:
106                aid = tbparams[st]['allocID']['fedid']
107            except:
108                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
109                        % st)
110                aid = None
111            info = {
112                    "type" : conn_type, 
113                    "portal": myname,
114                    'fedAttr': [ 
115                            { 'attribute': 'masterdomain', 'value': mdomain},
116                            { 'attribute': 'masterexperiment', 'value': 
117                                "%s/%s" % (mproject, eid)},
118                            { 'attribute': 'active', 'value': active},
119                            # Move to SMB service description
120                            { 'attribute': 'masteruser', 'value': muser},
121                            { 'attribute': 'smbshare', 'value': smbshare},
122                        ],
123                    'parameter': [
124                        {
125                            'name': 'peer',
126                            'key': 'fedid:%s/%s' % (expid, myname),
127                            'store': self.store_url,
128                            'type': 'output',
129                        },
130                        {
131                            'name': 'ssh_port',
132                            'key': 'fedid:%s/%s-port' % (expid, myname),
133                            'store': self.store_url,
134                            'type': 'output',
135                        },
136                        {
137                            'name': 'peer',
138                            'key': 'fedid:%s/%s' % (expid, desthost),
139                            'store': self.store_url,
140                            'type': 'input',
141                        },
142                        {
143                            'name': 'ssh_port',
144                            'key': 'fedid:%s/%s-port' % (expid, desthost),
145                            'store': self.store_url,
146                            'type': 'input',
147                        },
148                        ]
149                    }
150            # Give this allocation the rights to access the key of the
151            # peers
152            if aid:
153                for h in (myname, desthost):
154                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
155                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
156                            (expid, h))
157            else:
158                self.log.error("No aid for %s in new_portal_node" % st)
159        else:
160            info = None
161
162        return (topdl.Computer(
163                name=myname,
164                attribute=[ 
165                    topdl.Attribute(attribute=n,value=v)
166                        for n, v in (\
167                            ('portal', 'true'),
168                            ('portal_type', portal_type), 
169                            ('destination_testbed', dt),
170                        )
171                    ],
172                interface=ifaces,
173                ), info)
174
175    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
176        ddomain = tbparams[dt].get('domain', ".example.com")
177        dproject = tbparams[dt].get('project', 'project')
178        tsubstrate = \
179                topdl.Substrate(name='%s-%s' % (st, dt),
180                        attribute= [
181                            topdl.Attribute(
182                                attribute='portal',
183                                value='true')
184                            ]
185                        )
186        segment_element = topdl.Segment(
187                id= tbparams[dt]['allocID'],
188                type='emulab',
189                uri = self.tbmap.get(dt, None),
190                interface=[ 
191                    topdl.Interface(
192                        substrate=tsubstrate.name),
193                    ],
194                attribute = [
195                    topdl.Attribute(attribute=n, value=v)
196                        for n, v in (\
197                            ('domain', ddomain),
198                            ('experiment', "%s/%s" % \
199                                    (dproject, eid)),)
200                    ],
201                )
202
203        return (tsubstrate, segment_element)
204
205    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
206        if sub.capacity is None:
207            raise service_error(service_error.internal,
208                    "Cannot DRAGON split substrate w/o capacity")
209        segs = [ ]
210        substr = topdl.Substrate(name="dragon%d" % idx, 
211                capacity=sub.capacity.clone(),
212                attribute=[ topdl.Attribute(attribute=n, value=v)
213                    for n, v, in (\
214                            ('vlan', 'unassigned%d' % idx),)])
215        name = "dragon%d" % idx
216        store_key = 'fedid:%s/vlan%d' % (expid, idx)
217        for tb in tbs.keys():
218            seg = topdl.Segment(
219                    id = tbparams[tb]['allocID'],
220                    type='emulab',
221                    uri = self.tbmap.get(tb, None),
222                    interface=[ 
223                        topdl.Interface(
224                            substrate=substr.name),
225                        ],
226                    attribute=[ topdl.Attribute(
227                        attribute='dragon_endpoint', 
228                        value=tbparams[tb]['dragon']),
229                        ]
230                    )
231            if tbparams[tb].has_key('vlans'):
232                seg.set_attribute('vlans', tbparams[tb]['vlans'])
233            segs.append(seg)
234
235            # Give this allocation the rights to access the key of the
236            # vlan_id
237            try:
238                aid = tbparams[tb]['allocID']['fedid']
239                self.auth.set_attribute(aid, store_key)
240            except:
241                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
242                        % tb)
243
244        connInfo[name] = [ { 
245            'type': 'transit',
246            'parameter': [ { 
247                'name': 'vlan_id',
248                'key': store_key,
249                'store': self.store_url,
250                'type': 'output'
251                } ]
252            } ]
253
254        topo[name] = \
255                topdl.Topology(substrates=[substr], elements=segs,
256                        attribute=[
257                            topdl.Attribute(attribute="transit", value='true'),
258                            topdl.Attribute(attribute="dynamic", value='true'),
259                            topdl.Attribute(attribute="testbed", 
260                                value='dragon'),
261                            topdl.Attribute(attribute="store_keys", 
262                                value=store_key),
263                            ]
264                        )
265
266    def create_dragon_substrate(self, sub, topo, tbs, tbparams, masters, eid,
267            connInfo, expid=None):
268        """
269        Add attribiutes to the various elements indicating that they are to be
270        dragon connected and create a dragon segment in topo to be
271        instantiated.
272        """
273
274        def get_substrate_from_topo(name, t):
275            for s in t.substrates:
276                if s.name == name: return s
277            else: return None
278
279
280        seer_master = None
281        for k, s in masters.items():
282            if 'SEER' in s:
283                seer_master = k
284                break
285
286        if seer_master:
287            mdomain = tbparams[seer_master].get('domain', '.example.com')
288            mproject = tbparams[seer_master].get('project', 'project')
289        else:
290            mdomain = '.example.com'
291            mproject = 'project'
292
293        # dn is the number of previously created dragon nets.  This routine
294        # creates a net numbered by dn
295        dn = len([x for x in topo.keys() if x.startswith('dragon')])
296        # Count the number of interfaces on this substrate in each testbed from
297        # the global topology
298        count = { }
299        node = { }
300        for e in [ i.element for i in sub.interfaces ]:
301            tb = e.get_attribute('testbed')
302            count[tb] = count.get(tb, 0) + 1
303            node[tb] = i.get_attribute('ip4_address')
304
305
306        # Set the attributes in the copies that will allow setup of dragon
307        # connections.
308        for tb in tbs.keys():
309            s = get_substrate_from_topo(sub.name, topo[tb])
310            if s:
311                if not connInfo.has_key(tb):
312                    connInfo[tb] = [ ]
313
314                try:
315                    aid = tbparams[tb]['allocID']['fedid']
316                except:
317                    self.log.debug("[creat_dragon_substrate] " + 
318                            "Can't get alloc id for %s?" %tb)
319                    aid = None
320
321                # This may need another look, but only a service gateway will
322                # look at the active parameter, and these are only inserted to
323                # connect to a master.
324                active = "%s" % ( tb in masters)
325                info = {
326                        'type': 'transit',
327                        'member': [ {
328                            'element': i.element.name[0], 
329                            'interface': i.name
330                            } for i in s.interfaces \
331                                    if isinstance(i.element, topdl.Computer) ],
332                        'fedAttr': [ 
333                            { 'attribute': 'masterdomain', 'value': mdomain},
334                            { 'attribute': 'masterexperiment', 'value': 
335                                "%s/%s" % (mproject, eid)},
336                            { 'attribute': 'active', 'value': active},
337                            ],
338                        'parameter': [ {
339                            'name': 'vlan_id',
340                            'key': 'fedid:%s/vlan%d' % (expid, dn),
341                            'store': self.store_url,
342                            'type': 'input',
343                            } ]
344                        }
345                if tbs.has_key(tb):
346                    info['peer'] = tbs[tb]
347                connInfo[tb].append(info)
348
349                # Give this allocation the rights to access the key of the
350                # vlan_id
351                if aid:
352                    self.auth.set_attribute(aid, 
353                            'fedid:%s/vlan%d' % (expid, dn))
354            else:
355                raise service_error(service_error.internal,
356                        "No substrate %s in testbed %s" % (sub.name, tb))
357
358        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
359
360    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
361            segment_substrate, portals, connInfo, expid):
362        # More than one testbed is on this substrate.  Insert
363        # some portals into the subtopologies.  st == source testbed,
364        # dt == destination testbed.
365        for st in tbs.keys():
366            if not segment_substrate.has_key(st):
367                segment_substrate[st] = { }
368            if not portals.has_key(st): 
369                portals[st] = { }
370            if not connInfo.has_key(st):
371                connInfo[st] = [ ]
372            for dt in [ t for t in tbs.keys() if t != st]:
373                sproject = tbparams[st].get('project', 'project')
374                dproject = tbparams[dt].get('project', 'project')
375                sdomain = tbparams[st].get('domain', ".example.com")
376                ddomain = tbparams[dt].get('domain', ".example.com")
377                aid = tbparams[dt]['allocID']['fedid']
378
379                seer_master = None
380                for k, s in masters.items():
381                    if 'SEER' in s:
382                        seer_master = k
383                        break
384
385                if seer_master:
386                    mdomain = tbparams[seer_master].get('domain', '.example.com')
387                    mproject = tbparams[seer_master].get('project', 'project')
388                    muser = tbparams[seer_master].get('user', 'root')
389                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
390                else:
391                    mdomain = '.example.com'
392                    mproject = 'project'
393                    muser = 'root'
394                    smbshare = 'USERS'
395
396                if (st in masters  and dt not in masters) or \
397                        (st not in masters and dt in masters):
398                    active = ("%s" % (st in masters))
399                else:
400                    active = ("%s" %(st > dt))
401
402                if not segment_substrate[st].has_key(dt):
403                    # Put a substrate and a segment for the connected
404                    # testbed in there.
405                    tsubstrate, segment_element = \
406                            self.new_portal_substrate(st, dt, eid, tbparams,
407                                    expid)
408                    segment_substrate[st][dt] = tsubstrate
409                    topo[st].substrates.append(tsubstrate)
410                    topo[st].elements.append(segment_element)
411
412                new_portal = False
413                if portals[st].has_key(dt):
414                    # There's a portal set up to go to this destination.
415                    # See if there's room to multiplex this connection on
416                    # it.  If so, add an interface to the portal; if not,
417                    # set up to add a portal below.
418                    # [This little festival of braces is just a pop of the
419                    # last element in the list of portals between st and
420                    # dt.]
421                    portal = portals[st][dt][-1]
422                    mux = len([ i for i in portal.interface \
423                            if not i.get_attribute('portal')])
424                    if mux == self.muxmax:
425                        new_portal = True
426                        portal_type = "experiment"
427                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
428                        desthost = "%stunnel%d" % (st.lower(), 
429                                len(portals[st][dt]))
430                    else:
431                        new_i = topdl.Interface(
432                                substrate=sub.name,
433                                attribute=[ 
434                                    topdl.Attribute(
435                                        attribute='ip4_address', 
436                                        value=tbs[dt]
437                                    )
438                                ])
439                        portal.interface.append(new_i)
440                else:
441                    # First connection to this testbed, make an empty list
442                    # and set up to add the new portal below
443                    new_portal = True
444                    portals[st][dt] = [ ]
445                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
446                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
447
448                    if dt in masters or st in masters: portal_type = "both"
449                    else: portal_type = "experiment"
450
451                if new_portal:
452                    infs = (
453                            (segment_substrate[st][dt].name, 
454                                (('portal', 'true'),)),
455                            (sub.name, 
456                                (('ip4_address', tbs[dt]),))
457                        )
458                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
459                            masters, eid, myname, desthost, portal_type,
460                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
461
462                    topo[st].elements.append(portal)
463                    portals[st][dt].append(portal)
464                    connInfo[st].append(info)
465
466    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
467            connInfo, expid):
468        # Add to the master testbed
469        tsubstrate, segment_element = \
470                self.new_portal_substrate(st, dt, eid, tbparams, expid)
471        myname = "%stunnel" % dt
472        desthost = "%stunnel" % st
473
474        portal, info = self.new_portal_node(st, dt, tbparams, masters,
475                eid, myname, desthost, "control", 
476                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
477                conn_attrs=[], expid=expid)
478
479        topo[st].substrates.append(tsubstrate)
480        topo[st].elements.append(segment_element)
481        topo[st].elements.append(portal)
482        if not connInfo.has_key(st):
483            connInfo[st] = [ ]
484        connInfo[st].append(info)
485
486    def new_dragon_portal(self, st, dt, masters, eid, myip, dip, idx, 
487            substrate, tbparams, expid):
488        # Add to the master testbed
489        myname = "%stunnel" % dt
490        desthost = "%s" % ip_addr(dip)
491
492        portal, info = self.new_portal_node(st, dt, tbparams, masters,
493                eid, myname, desthost, "control", 
494                ((substrate.name,(
495                    ('portal','true'),
496                    ('ip4_address', "%s" % ip_addr(myip)),)),),
497                conn_type="transit", conn_attrs=[], expid=expid)
498
499        return portal
500
501    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
502            connInfo, expid):
503        """
504        For each substrate in the main topology, find those that
505        have nodes on more than one testbed.  Insert portal nodes
506        into the copies of those substrates on the sub topologies.
507        """
508        segment_substrate = { }
509        portals = { }
510        for s in top.substrates:
511            # tbs will contain an ip address on this subsrate that is in
512            # each testbed.
513            tbs = { }
514            for i in s.interfaces:
515                e = i.element
516                tb = e.get_attribute('testbed')
517                if tb and not tbs.has_key(tb):
518                    for i in e.interface:
519                        if s in i.subs:
520                            tbs[tb]= i.get_attribute('ip4_address')
521            if len(tbs) < 2:
522                continue
523
524            # DRAGON will not create multi-site vlans yet
525            if len(tbs) == 2 and \
526                    all([tbparams[x].has_key('dragon') for x in tbs]):
527                self.create_dragon_substrate(s, topo, tbs, tbparams, 
528                        masters, eid, connInfo, expid)
529            else:
530                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
531                        eid, segment_substrate, portals, connInfo, expid)
532
533        # Make sure that all the service importers have a control portal back
534        # to the master for each service.
535        for mtb in [ t for t in tbparams if t in masters ]:
536            for tb in [ t for tb in tbparams if t != mtb]:
537                if len([e for e in topo[tb].elements \
538                        if isinstance(e, topdl.Computer) and \
539                        e.get_attribute('destination_testbed') == mtb and \
540                        e.get_attribute('portal') and \
541                        e.get_attribute('portal_type') == 'both']) == 0:
542
543                    if tbparams[mtb].has_key('dragon') \
544                            and tbparams[tb].has_key('dragon'):
545
546                        idx = len([x for x in topo.keys() \
547                                if x.startswith('dragon')])
548                        dip, leng = ip_allocator.allocate(4)
549                        dip += 1
550                        mip = dip+1
551                        csub = topdl.Substrate(
552                                name="dragon-control-%s" % tb,
553                                capacity=topdl.Capacity(100000.0, 'max'),
554                                attribute=[
555                                    topdl.Attribute(
556                                        attribute='portal',
557                                        value='true'
558                                        )
559                                    ]
560                                )
561                        seg = topdl.Segment(
562                                id= tbparams[mtb]['allocID'],
563                                type='emulab',
564                                uri = self.tbmap.get(master, None),
565                                interface=[ 
566                                    topdl.Interface(
567                                        substrate=csub.name),
568                                    ],
569                                attribute = [
570                                    topdl.Attribute(attribute=n, value=v)
571                                        for n, v in (\
572                                            ('domain', 
573                                                tbparams[mtb].get('domain',
574                                                    ".example.com")),
575                                            ('experiment', "%s/%s" % \
576                                                    (tbparams[mtb].get(
577                                                        'project', 
578                                                        'project'), 
579                                                        eid)),)
580                                    ],
581                                )
582                        portal = self.new_dragon_portal(tb, mtb,
583                                masters, eid, dip, mip, idx, csub,
584                                tbparams, expid)
585                        topo[tb].substrates.append(csub)
586                        topo[tb].elements.append(portal)
587                        topo[tb].elements.append(seg)
588
589                        mcsub = csub.clone()
590                        seg = topdl.Segment(
591                                id= tbparams[tb]['allocID'],
592                                type='emulab',
593                                uri = self.tbmap.get(tb, None),
594                                interface=[ 
595                                    topdl.Interface(
596                                        substrate=csub.name),
597                                    ],
598                                attribute = [
599                                    topdl.Attribute(attribute=n, value=v)
600                                        for n, v in (\
601                                            ('domain', 
602                                                tbparams[tb].get('domain',
603                                                    ".example.com")),
604                                            ('experiment', "%s/%s" % \
605                                                    (tbparams[tb].get('project', 
606                                                        'project'), 
607                                                        eid)),)
608                                    ],
609                                )
610                        portal = self.new_dragon_portal(mtb, tb, masters,
611                                eid, mip, dip, idx, mcsub, tbparams, expid)
612                        topo[mtb].substrates.append(mcsub)
613                        topo[mtb].elements.append(portal)
614                        topo[mtb].elements.append(seg)
615                        for t in (master, tb):
616                            topo[t].incorporate_elements()
617
618                        self.create_dragon_substrate(csub, topo, 
619                                {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
620                                tbparams, masters, eid, connInfo,
621                                expid)
622                    else:
623                        self.add_control_portal(mtb, tb, masters, eid, topo, 
624                                tbparams, connInfo, expid)
625                        self.add_control_portal(tb, mtb, masters, eid, topo, 
626                                tbparams, connInfo, expid)
627
628        # Connect the portal nodes into the topologies and clear out
629        # substrates that are not in the topologies
630        for tb in tbparams:
631            topo[tb].incorporate_elements()
632            topo[tb].substrates = \
633                    [s for s in topo[tb].substrates \
634                        if len(s.interfaces) >0]
635
Note: See TracBrowser for help on using the repository browser.