source: fedd/federation/experiment_partition.py @ 27fbf2f

compt_changes
Last change on this file since 27fbf2f was 27fbf2f, checked in by Ted Faber <faber@…>, 12 years ago

More debug

  • Property mode set to 100644
File size: 20.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from deter import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32from deter import topdl
33from deter import ip_allocator
34from deter import ip_addr
35import list_log
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None, tbactive=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58        self.tbactive = tbactive
59
60
61    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
62            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
63            expid=None):
64        """
65        Return a new internet portal node and a dict with the connectionInfo to
66        be attached.
67        """
68        seer_master = None
69        for k, m in masters.items():
70            for s in m:
71                if s.name == 'seer':
72                    seer_master = k
73                    break
74            if seer_master: break
75
76        if seer_master:
77            mdomain = tbparams[seer_master].get_attribute('domain', 
78                    '.example.com')
79            mproject = tbparams[seer_master].get_attribute('project', 
80                    'project')
81            muser = tbparams[seer_master].get_attribute('user', 'root')
82            smbshare = tbparams[seer_master].get_attribute('smbshare', 'USERS')
83        else:
84            mdomain = '.example.com'
85            mproject = 'project'
86            muser = 'root'
87            smbshare = 'USERS'
88
89        dproject = tbparams[dt].get_attribute('project', 'project')
90        ddomain = tbparams[dt].get_attribute('domain', '.example.com')
91
92        if (st in self.tbactive and dt not in self.tbactive) or \
93                ( st not in self.tbactive and dt in self.tbactive ):
94            active = ("%s" % (st in self.tbactive))
95            nat_partner = ("%s" % (dt in self.tbactive))
96        elif (st in masters and dt not in masters) or \
97                ( st not in masters and dt in masters ):
98            active = ("%s" % (st in masters))
99            nat_partner = "%s" % False
100        else:
101            active = ("%s" % (st > dt))
102            nat_partner = "%s" % False
103
104        ifaces = [ ]
105        for sub, attrs in iface_desc:
106            inf = topdl.Interface(
107                    name="inf%03d" % len(ifaces),
108                    substrate=sub,
109                    attribute=[
110                        topdl.Attribute(
111                            attribute=n,
112                            value = v)
113                        for n, v in attrs
114                        ]
115                    )
116            ifaces.append(inf)
117        if conn_type == "ssh":
118            try:
119                aid = tbparams[st].allocID
120            except:
121                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
122                        % st)
123                aid = None
124            info = {
125                    "type" : conn_type, 
126                    "portal": myname,
127                    'fedAttr': [ 
128                            { 'attribute': 'masterdomain', 'value': mdomain},
129                            { 'attribute': 'masterexperiment', 'value': 
130                                "%s/%s" % (mproject, eid)},
131                            { 'attribute': 'active', 'value': active},
132                            { 'attribute': 'nat_partner', 'value': nat_partner},
133                            # Move to SMB service description
134                            { 'attribute': 'masteruser', 'value': muser},
135                            { 'attribute': 'smbshare', 'value': smbshare},
136                        ],
137                    'parameter': [
138                        {
139                            'name': 'peer',
140                            'key': 'fedid:%s/%s' % (expid, myname),
141                            'store': self.store_url,
142                            'type': 'output',
143                        },
144                        {
145                            'name': 'ssh_port',
146                            'key': 'fedid:%s/%s-port' % (expid, myname),
147                            'store': self.store_url,
148                            'type': 'output',
149                        },
150                        {
151                            'name': 'peer',
152                            'key': 'fedid:%s/%s' % (expid, desthost),
153                            'store': self.store_url,
154                            'type': 'input',
155                        },
156                        {
157                            'name': 'ssh_port',
158                            'key': 'fedid:%s/%s-port' % (expid, desthost),
159                            'store': self.store_url,
160                            'type': 'input',
161                        },
162                        ]
163                    }
164            # Give this allocation the rights to access the key of the
165            # peers
166            if aid:
167                for h in (myname, desthost):
168                    # PNNL debug
169                    self.log.debug('Set attribute: %s %s %s' % (aid, expid, h))
170                    try:
171                        self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
172                        self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
173                                (expid, h))
174                    except RuntimeError, e:
175                        raise service_error(service_error.internal, "%s" % e)
176            else:
177                self.log.error("No aid for %s in new_portal_node" % st)
178        else:
179            info = None
180
181        return (topdl.Computer(
182                name=myname,
183                attribute=[ 
184                    topdl.Attribute(attribute=n,value=v)
185                        for n, v in (\
186                            ('portal', 'true'),
187                            ('portal_type', portal_type), 
188                            ('destination_testbed', dt),
189                        )
190                    ],
191                interface=ifaces,
192                ), info)
193
194    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
195        ddomain = tbparams[dt].get_attribute('domain', ".example.com")
196        dproject = tbparams[dt].get_attribute('project', 'project')
197        tsubstrate = \
198                topdl.Substrate(name='%s-%s' % (st, dt),
199                        attribute= [
200                            topdl.Attribute(
201                                attribute='portal',
202                                value='true')
203                            ]
204                        )
205        segment_element = topdl.Segment(
206                id= {'fedid': tbparams[dt].allocID },
207                type='emulab',
208                uri = self.tbmap.get(testbed_base(dt), None),
209                interface=[ 
210                    topdl.Interface(
211                        substrate=tsubstrate.name),
212                    ],
213                attribute = [
214                    topdl.Attribute(attribute=n, value=v)
215                        for n, v in (\
216                            ('domain', ddomain),
217                            ('experiment', "%s/%s" % \
218                                    (dproject, eid)),)
219                    ],
220                )
221
222        return (tsubstrate, segment_element)
223
224    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
225            tb_name):
226        if sub.capacity is None:
227            raise service_error(service_error.internal,
228                    "Cannot direct split substrate w/o capacity")
229        segs = [ ]
230        name = join_testbed(tb_name, "%d" % idx)
231        substr = topdl.Substrate(name=name, 
232                capacity=sub.capacity.clone(),
233                attribute=[ topdl.Attribute(attribute=n, value=v)
234                    for n, v, in (\
235                            ('vlan', 'unassigned%d' % idx),)])
236        store_key = 'fedid:%s/vlan%d' % (expid, idx)
237        for tb in tbs.keys():
238            seg = topdl.Segment(
239                    id = { 'fedid':tbparams[tb].allocID },
240                    type='emulab',
241                    uri = self.tbmap.get(testbed_base(tb), None),
242                    interface=[ 
243                        topdl.Interface(
244                            substrate=substr.name),
245                        ],
246                    attribute=[ topdl.Attribute(
247                        attribute='%s_endpoint' % tb_name, 
248                        value=tbparams[tb].get_attribute(tb_name)),
249                        ]
250                    )
251            vlan_key = "%s_vlans" % tb_name
252            if tbparams[tb].get_attribute(vlan_key) is not None:
253                seg.set_attribute(vlan_key, 
254                        tbparams[tb].get_attribute(vlan_key))
255            segs.append(seg)
256
257            # Give this allocation the rights to access the key of the
258            # vlan_id
259            try:
260                aid = tbparams[tb].allocID
261                self.auth.set_attribute(aid, store_key)
262            except:
263                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
264                        % tb)
265
266        connInfo[name] = [ { 
267            'type': 'transit',
268            'parameter': [ { 
269                'name': 'vlan_id',
270                'key': store_key,
271                'store': self.store_url,
272                'type': 'output'
273                } ]
274            } ]
275
276        topo[name] = \
277                topdl.Topology(substrates=[substr], elements=segs,
278                        attribute=[
279                            topdl.Attribute(attribute="transit", value='true'),
280                            topdl.Attribute(attribute="dynamic", value='true'),
281                            topdl.Attribute(attribute="testbed", value=tb_name),
282                            topdl.Attribute(attribute="store_keys", 
283                                value=store_key),
284                            ]
285                        )
286
287    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
288            connInfo, expid=None, tb_name=None):
289        """
290        Create connection information that tells which nodes are to be
291        connected to direct transits, and create an additional topology with
292        just the interconnected segments and a substrate.
293        """
294
295        def get_substrate_from_topo(name, t):
296            for s in t.substrates:
297                if s.name == name: return s
298            else: return None
299
300
301        seer_master = None
302        for k, m in masters.items():
303            for s in m:
304                if s.name == 'seer':
305                    seer_master = k
306                    break
307            if seer_master: break
308
309        if seer_master:
310            mdomain = tbparams[seer_master].get_attribute('domain',
311                    '.example.com')
312            mproject = tbparams[seer_master].get_attribute('project', 'project')
313        else:
314            mdomain = '.example.com'
315            mproject = 'project'
316
317        # dn is the number of previously created direct nets on this direct
318        # testbed.  This routine creates a net numbered by dn
319        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
320
321        # Set the attributes in the copies that will allow setup of direct
322        # connections.
323        for tb in tbs.keys():
324            s = get_substrate_from_topo(sub.name, topo[tb])
325            if s:
326                if not connInfo.has_key(tb):
327                    connInfo[tb] = [ ]
328
329                try:
330                    aid = tbparams[tb].allocID
331                except:
332                    self.log.debug("[create_direct_substrate] " + 
333                            "Can't get alloc id for %s?" %tb)
334                    aid = None
335
336                # This may need another look, but only a service gateway will
337                # look at the active parameter, and these are only inserted to
338                # connect to a master.
339                active = "%s" % ( tb in masters)
340                info = {
341                        'type': 'transit',
342                        'member': [ {
343                            'element': i.element.name, 
344                            'interface': i.name
345                            } for i in s.interfaces \
346                                    if isinstance(i.element, topdl.Computer) ],
347                        'fedAttr': [ 
348                            { 'attribute': 'masterdomain', 'value': mdomain},
349                            { 'attribute': 'masterexperiment', 'value': 
350                                "%s/%s" % (mproject, eid)},
351                            { 'attribute': 'active', 'value': active},
352                            ],
353                        'parameter': [ {
354                            'name': 'vlan_id',
355                            'key': 'fedid:%s/vlan%d' % (expid, dn),
356                            'store': self.store_url,
357                            'type': 'input',
358                            } ]
359                        }
360                if tbs.has_key(tb):
361                    info['peer'] = tbs[tb]
362                connInfo[tb].append(info)
363
364                # Give this allocation the rights to access the key of the
365                # vlan_id
366                if aid:
367                    self.auth.set_attribute(aid, 
368                            'fedid:%s/vlan%d' % (expid, dn))
369            else:
370                raise service_error(service_error.internal,
371                        "No substrate %s in testbed %s" % (sub.name, tb))
372
373        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
374                tb_name)
375
376    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
377            segment_substrate, portals, connInfo, expid):
378        # More than one testbed is on this substrate.  Insert
379        # some portals into the subtopologies.  st == source testbed,
380        # dt == destination testbed.
381        for st in tbs.keys():
382            if not segment_substrate.has_key(st):
383                segment_substrate[st] = { }
384            if not portals.has_key(st): 
385                portals[st] = { }
386            if not connInfo.has_key(st):
387                connInfo[st] = [ ]
388            for dt in [ t for t in tbs.keys() if t != st]:
389                sproject = tbparams[st].get_attribute('project', 'project')
390                dproject = tbparams[dt].get_attribute('project', 'project')
391                sdomain = tbparams[st].get_attribute('domain', ".example.com")
392                ddomain = tbparams[dt].get_attribute('domain', ".example.com")
393                aid = tbparams[dt].allocID
394
395                seer_master = None
396
397                for m in masters.values():
398                    for s in m:
399                        if s.name == 'SEER':
400                            seer_master = m
401                            break
402                    if seer_master: break
403
404                if seer_master:
405                    mdomain = tbparams[seer_master].get_attribute('domain',
406                            '.example.com')
407                    mproject = tbparams[seer_master].get_attribute('project', 
408                            'project')
409                    muser = tbparams[seer_master].get_attribute('user', 'root')
410                    smbshare = tbparams[seer_master].get_attribute('smbshare',
411                            'USERS')
412                else:
413                    mdomain = '.example.com'
414                    mproject = 'project'
415                    muser = 'root'
416                    smbshare = 'USERS'
417
418                if (st in masters  and dt not in masters) or \
419                        (st not in masters and dt in masters):
420                    active = ("%s" % (st in masters))
421                else:
422                    active = ("%s" %(st > dt))
423
424                if not segment_substrate[st].has_key(dt):
425                    # Put a substrate and a segment for the connected
426                    # testbed in there.
427                    tsubstrate, segment_element = \
428                            self.new_portal_substrate(st, dt, eid, tbparams,
429                                    expid)
430                    segment_substrate[st][dt] = tsubstrate
431                    topo[st].substrates.append(tsubstrate)
432                    topo[st].elements.append(segment_element)
433
434                new_portal = False
435                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
436                else: dname = dt
437
438                if testbed_suffix(st): sname = "-".join(split_testbed(st))
439                else: sname = st
440
441                if portals[st].has_key(dt):
442                    # There's a portal set up to go to this destination.
443                    # See if there's room to multiplex this connection on
444                    # it.  If so, add an interface to the portal; if not,
445                    # set up to add a portal below.
446                    # [This little festival of braces is just a pop of the
447                    # last element in the list of portals between st and
448                    # dt.]
449                    portal = portals[st][dt][-1]
450                    mux = len([ i for i in portal.interface \
451                            if not i.get_attribute('portal')])
452                    if mux == self.muxmax:
453                        new_portal = True
454                        portal_type = "experiment"
455                        myname = "%stunnel%d" % (dname.lower(),
456                                len(portals[st][dt]))
457                        desthost = "%stunnel%d" % (sname.lower(), 
458                                len(portals[st][dt]))
459                    else:
460                        new_i = topdl.Interface(
461                                substrate=sub.name,
462                                attribute=[ 
463                                    topdl.Attribute(
464                                        attribute='ip4_address', 
465                                        value=tbs[dt]
466                                    )
467                                ])
468                        portal.interface.append(new_i)
469                else:
470                    # First connection to this testbed, make an empty list
471                    # and set up to add the new portal below
472                    new_portal = True
473                    portals[st][dt] = [ ]
474                    myname = "%stunnel%d" % (dname.lower(),
475                            len(portals[st][dt]))
476                    desthost = "%stunnel%d" % (sname.lower(),
477                            len(portals[st][dt]))
478
479                    if dt in masters or st in masters: portal_type = "both"
480                    else: portal_type = "experiment"
481
482                if new_portal:
483                    infs = (
484                            (segment_substrate[st][dt].name, 
485                                (('portal', 'true'),)),
486                            (sub.name, 
487                                (('ip4_address', tbs[dt]),))
488                        )
489                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
490                            masters, eid, myname, desthost, portal_type,
491                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
492
493                    topo[st].elements.append(portal)
494                    portals[st][dt].append(portal)
495                    connInfo[st].append(info)
496
497    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
498            connInfo, expid):
499        # Add to the master testbed
500        tsubstrate, segment_element = \
501                self.new_portal_substrate(st, dt, eid, tbparams, expid)
502        myname = "%stunnel" % dt
503        desthost = "%stunnel" % st
504
505        portal, info = self.new_portal_node(st, dt, tbparams, masters,
506                eid, myname, desthost, "control", 
507                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
508                conn_attrs=[], expid=expid)
509
510        topo[st].substrates.append(tsubstrate)
511        topo[st].elements.append(segment_element)
512        topo[st].elements.append(portal)
513        if not connInfo.has_key(st):
514            connInfo[st] = [ ]
515        connInfo[st].append(info)
516
517    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
518            substrate, tbparams, expid, tb_name):
519        # Add to the master testbed
520        if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt))
521        else: myname = "%stunnel" % dt
522
523        desthost = "%s" % ip_addr(dip)
524
525        portal, info = self.new_portal_node(st, dt, tbparams, masters,
526                eid, myname, desthost, "control", 
527                ((substrate.name,(
528                    ('portal','true'),
529                    ('ip4_address', "%s" % ip_addr(myip)),)),),
530                conn_type="transit", conn_attrs=[], expid=expid)
531
532        return portal
533
534    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
535            connInfo, expid):
536        """
537        For each substrate in the main topology, find those that
538        have nodes on more than one testbed.  Insert portal nodes
539        into the copies of those substrates on the sub topologies.
540        """
541        segment_substrate = { }
542        portals = { }
543        for s in top.substrates:
544            # tbs will contain an ip address on this subsrate that is in
545            # each testbed.
546            tbs = { }
547            for i in s.interfaces:
548                e = i.element
549                tb = e.get_attribute('testbed')
550                if tb and tb not in tbs:
551                    tbs[tb]= i.get_attribute('ip4_address')
552            if len(tbs) < 2:
553                continue
554
555            base_tbs = set([testbed_base(t) for t in tbs])
556
557            # DRAGON will not create multi-site vlans yet, so we don't do multi
558            # connection direct transits yet.
559            if len(tbs) == 2 :
560                # NB: the else if on the for loop - if none of the direct
561                # transits is applicable, use the internet.
562                for d in self.direct_transit:
563                    if all([tbparams[x].get_attribute(d) for x in tbs]):
564                        self.create_direct_substrate(s, topo, tbs, tbparams, 
565                                masters, eid, connInfo, expid, d)
566                        break
567                else:
568                    self.insert_internet_portals(s, topo, tbs, tbparams,
569                            masters, eid, segment_substrate, portals,
570                            connInfo, expid)
571            else:
572                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
573                        eid, segment_substrate, portals, connInfo, expid)
574
575
576        # Make sure that all the service importers have a control portal back
577        # to the master for each service.
578        for mtb in [ t for t in tbparams if t in masters ]:
579            importers = set([])
580            for m in masters[mtb]:
581                importers |= set(m.importers)
582            if mtb in importers:
583                importers.discard(mtb)
584            for tb in importers:
585                if tb not in topo:
586                    self.log.error("Importer not in experiment: %s" % tb)
587                    continue
588                if len([e for e in topo[tb].elements \
589                        if isinstance(e, topdl.Computer) and \
590                        e.get_attribute('destination_testbed') == mtb and \
591                        e.get_attribute('portal') and \
592                        e.get_attribute('portal_type') == 'both']) == 0:
593
594                    for tb_name in self.direct_transit:
595
596                        if tbparams[mtb].get_attribute(tb_name) \
597                                and tbparams[tb].get_attribute(tb_name):
598
599                            idx = len([x for x in topo.keys() \
600                                    if x.startswith(tb_name)])
601                            dip, leng = ip_allocator.allocate(4)
602                            dip += 1
603                            mip = dip+1
604                            csub = topdl.Substrate(
605                                    name="%s-control-%s" % (tb_name, tb),
606                                    capacity=topdl.Capacity(100000.0, 'max'),
607                                    attribute=[
608                                        topdl.Attribute(
609                                            attribute='portal',
610                                            value='true'
611                                            )
612                                        ]
613                                    )
614                            seg = topdl.Segment(
615                                    id= {'fedid': tbparams[mtb].allocID},
616                                    type='emulab',
617                                    uri = self.tbmap.get(testbed_base(mtb),
618                                        None),
619                                    interface=[ 
620                                        topdl.Interface(
621                                            substrate=csub.name),
622                                        ],
623                                    attribute = [
624                                        topdl.Attribute(attribute=n, value=v)
625                                            for n, v in (\
626                                                ('domain', 
627                                                    tbparams[mtb].get_attribute(
628                                                        'domain',
629                                                        ".example.com")),
630                                                ('experiment', "%s/%s" % \
631                                                        (tbparams[mtb].get_attribute(
632                                                            'project', 
633                                                            'project'), 
634                                                            eid)),)
635                                        ],
636                                    )
637                            portal = self.new_direct_portal(tb, mtb,
638                                    masters, eid, dip, mip, idx, csub,
639                                    tbparams, expid, tb_name)
640                            topo[tb].substrates.append(csub)
641                            topo[tb].elements.append(portal)
642                            topo[tb].elements.append(seg)
643
644                            mcsub = csub.clone()
645                            seg = topdl.Segment(
646                                    id= { 'fedid': tbparams[tb].allocID},
647                                    type='emulab',
648                                    uri = self.tbmap.get(testbed_base(tb),
649                                        None),
650                                    interface=[ 
651                                        topdl.Interface(
652                                            substrate=csub.name),
653                                        ],
654                                    attribute = [
655                                        topdl.Attribute(attribute=n, value=v)
656                                            for n, v in (\
657                                                ('domain', 
658                                                    tbparams[tb].get_attribute(
659                                                        'domain',
660                                                        ".example.com")),
661                                                ('experiment', "%s/%s" % \
662                                                        (tbparams[tb].get_attribute(
663                                                            'project', 
664                                                            'project'), 
665                                                            eid)),)
666                                        ],
667                                    )
668                            portal = self.new_direct_portal(mtb, tb, masters,
669                                    eid, mip, dip, idx, mcsub, tbparams, expid,
670                                    tb_name)
671                            topo[mtb].substrates.append(mcsub)
672                            topo[mtb].elements.append(portal)
673                            topo[mtb].elements.append(seg)
674                            for t in (mtb, tb):
675                                topo[t].incorporate_elements()
676
677                            self.create_direct_substrate(csub, topo, 
678                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
679                                    tbparams, masters, eid, connInfo,
680                                    expid, tb_name)
681                            break
682                    # This matches with the for tb_name in self.direct_transit
683                    else:
684                        self.add_control_portal(mtb, tb, masters, eid, topo, 
685                                tbparams, connInfo, expid)
686                        self.add_control_portal(tb, mtb, masters, eid, topo, 
687                                tbparams, connInfo, expid)
688
689        # Connect the portal nodes into the topologies and clear out
690        # substrates that are not in the topologies
691        for tb in tbparams:
692            topo[tb].incorporate_elements()
693            topo[tb].substrates = \
694                    [s for s in topo[tb].substrates \
695                        if len(s.interfaces) >0]
696
Note: See TracBrowser for help on using the repository browser.