source: fedd/federation/experiment_partition.py @ 4ffa6f8

compt_changes
Last change on this file since 4ffa6f8 was 4ffa6f8, checked in by Ted Faber <faber@…>, 12 years ago

Add support for nat_portal parameter. Remove old half-assed active
endpoints

  • Property mode set to 100644
File size: 19.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from deter import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32from deter import topdl
33from deter import ip_allocator
34from deter import ip_addr
35import list_log
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None, tbactive=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58        self.tbactive = tbactive
59
60
61    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
62            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
63            expid=None):
64        """
65        Return a new internet portal node and a dict with the connectionInfo to
66        be attached.
67        """
68        seer_master = None
69        for k, m in masters.items():
70            for s in m:
71                if s.name == 'seer':
72                    seer_master = k
73                    break
74            if seer_master: break
75
76        if seer_master:
77            mdomain = tbparams[seer_master].get_attribute('domain', 
78                    '.example.com')
79            mproject = tbparams[seer_master].get_attribute('project', 
80                    'project')
81            muser = tbparams[seer_master].get_attribute('user', 'root')
82            smbshare = tbparams[seer_master].get_attribute('smbshare', 'USERS')
83        else:
84            mdomain = '.example.com'
85            mproject = 'project'
86            muser = 'root'
87            smbshare = 'USERS'
88
89        dproject = tbparams[dt].get_attribute('project', 'project')
90        ddomain = tbparams[dt].get_attribute('domain', '.example.com')
91
92        if (st in self.tbactive and dt not in self.tbactive) or \
93                ( st not in self.tbactive and dt in self.tbactive ):
94            active = ("%s" % (st in self.tbactive))
95            nat_partner = ("%s" % (dt in self.tbactive))
96        elif (st in masters and dt not in masters) or \
97                ( st not in masters and dt in masters ):
98            active = ("%s" % (st in masters))
99            nat_partner = "%s" % False
100        else:
101            active = ("%s" % (st > dt))
102            nat_partner = "%s" % False
103
104        ifaces = [ ]
105        for sub, attrs in iface_desc:
106            inf = topdl.Interface(
107                    name="inf%03d" % len(ifaces),
108                    substrate=sub,
109                    attribute=[
110                        topdl.Attribute(
111                            attribute=n,
112                            value = v)
113                        for n, v in attrs
114                        ]
115                    )
116            ifaces.append(inf)
117        if conn_type == "ssh":
118            try:
119                aid = tbparams[st].allocID
120            except:
121                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
122                        % st)
123                aid = None
124            info = {
125                    "type" : conn_type, 
126                    "portal": myname,
127                    'fedAttr': [ 
128                            { 'attribute': 'masterdomain', 'value': mdomain},
129                            { 'attribute': 'masterexperiment', 'value': 
130                                "%s/%s" % (mproject, eid)},
131                            { 'attribute': 'active', 'value': active},
132                            { 'attribute': 'nat_partner', 'value': nat_partner},
133                            # Move to SMB service description
134                            { 'attribute': 'masteruser', 'value': muser},
135                            { 'attribute': 'smbshare', 'value': smbshare},
136                        ],
137                    'parameter': [
138                        {
139                            'name': 'peer',
140                            'key': 'fedid:%s/%s' % (expid, myname),
141                            'store': self.store_url,
142                            'type': 'output',
143                        },
144                        {
145                            'name': 'ssh_port',
146                            'key': 'fedid:%s/%s-port' % (expid, myname),
147                            'store': self.store_url,
148                            'type': 'output',
149                        },
150                        {
151                            'name': 'peer',
152                            'key': 'fedid:%s/%s' % (expid, desthost),
153                            'store': self.store_url,
154                            'type': 'input',
155                        },
156                        {
157                            'name': 'ssh_port',
158                            'key': 'fedid:%s/%s-port' % (expid, desthost),
159                            'store': self.store_url,
160                            'type': 'input',
161                        },
162                        ]
163                    }
164            # Give this allocation the rights to access the key of the
165            # peers
166            if aid:
167                for h in (myname, desthost):
168                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
169                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
170                            (expid, h))
171            else:
172                self.log.error("No aid for %s in new_portal_node" % st)
173        else:
174            info = None
175
176        return (topdl.Computer(
177                name=myname,
178                attribute=[ 
179                    topdl.Attribute(attribute=n,value=v)
180                        for n, v in (\
181                            ('portal', 'true'),
182                            ('portal_type', portal_type), 
183                            ('destination_testbed', dt),
184                        )
185                    ],
186                interface=ifaces,
187                ), info)
188
189    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
190        ddomain = tbparams[dt].get_attribute('domain', ".example.com")
191        dproject = tbparams[dt].get_attribute('project', 'project')
192        tsubstrate = \
193                topdl.Substrate(name='%s-%s' % (st, dt),
194                        attribute= [
195                            topdl.Attribute(
196                                attribute='portal',
197                                value='true')
198                            ]
199                        )
200        segment_element = topdl.Segment(
201                id= {'fedid': tbparams[dt].allocID },
202                type='emulab',
203                uri = self.tbmap.get(testbed_base(dt), None),
204                interface=[ 
205                    topdl.Interface(
206                        substrate=tsubstrate.name),
207                    ],
208                attribute = [
209                    topdl.Attribute(attribute=n, value=v)
210                        for n, v in (\
211                            ('domain', ddomain),
212                            ('experiment', "%s/%s" % \
213                                    (dproject, eid)),)
214                    ],
215                )
216
217        return (tsubstrate, segment_element)
218
219    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
220            tb_name):
221        if sub.capacity is None:
222            raise service_error(service_error.internal,
223                    "Cannot direct split substrate w/o capacity")
224        segs = [ ]
225        name = join_testbed(tb_name, "%d" % idx)
226        substr = topdl.Substrate(name=name, 
227                capacity=sub.capacity.clone(),
228                attribute=[ topdl.Attribute(attribute=n, value=v)
229                    for n, v, in (\
230                            ('vlan', 'unassigned%d' % idx),)])
231        store_key = 'fedid:%s/vlan%d' % (expid, idx)
232        for tb in tbs.keys():
233            seg = topdl.Segment(
234                    id = { 'fedid':tbparams[tb].allocID },
235                    type='emulab',
236                    uri = self.tbmap.get(testbed_base(tb), None),
237                    interface=[ 
238                        topdl.Interface(
239                            substrate=substr.name),
240                        ],
241                    attribute=[ topdl.Attribute(
242                        attribute='%s_endpoint' % tb_name, 
243                        value=tbparams[tb].get_attribute(tb_name)),
244                        ]
245                    )
246            vlan_key = "%s_vlans" % tb_name
247            if tbparams[tb].get_attribute(vlan_key) is not None:
248                seg.set_attribute(vlan_key, 
249                        tbparams[tb].get_attribute(vlan_key))
250            segs.append(seg)
251
252            # Give this allocation the rights to access the key of the
253            # vlan_id
254            try:
255                aid = tbparams[tb].allocID
256                self.auth.set_attribute(aid, store_key)
257            except:
258                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
259                        % tb)
260
261        connInfo[name] = [ { 
262            'type': 'transit',
263            'parameter': [ { 
264                'name': 'vlan_id',
265                'key': store_key,
266                'store': self.store_url,
267                'type': 'output'
268                } ]
269            } ]
270
271        topo[name] = \
272                topdl.Topology(substrates=[substr], elements=segs,
273                        attribute=[
274                            topdl.Attribute(attribute="transit", value='true'),
275                            topdl.Attribute(attribute="dynamic", value='true'),
276                            topdl.Attribute(attribute="testbed", value=tb_name),
277                            topdl.Attribute(attribute="store_keys", 
278                                value=store_key),
279                            ]
280                        )
281
282    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
283            connInfo, expid=None, tb_name=None):
284        """
285        Create connection information that tells which nodes are to be
286        connected to direct transits, and create an additional topology with
287        just the interconnected segments and a substrate.
288        """
289
290        def get_substrate_from_topo(name, t):
291            for s in t.substrates:
292                if s.name == name: return s
293            else: return None
294
295
296        seer_master = None
297        for k, m in masters.items():
298            for s in m:
299                if s.name == 'seer':
300                    seer_master = k
301                    break
302            if seer_master: break
303
304        if seer_master:
305            mdomain = tbparams[seer_master].get_attribute('domain',
306                    '.example.com')
307            mproject = tbparams[seer_master].get_attribute('project', 'project')
308        else:
309            mdomain = '.example.com'
310            mproject = 'project'
311
312        # dn is the number of previously created direct nets on this direct
313        # testbed.  This routine creates a net numbered by dn
314        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
315
316        # Set the attributes in the copies that will allow setup of direct
317        # connections.
318        for tb in tbs.keys():
319            s = get_substrate_from_topo(sub.name, topo[tb])
320            if s:
321                if not connInfo.has_key(tb):
322                    connInfo[tb] = [ ]
323
324                try:
325                    aid = tbparams[tb].allocID
326                except:
327                    self.log.debug("[create_direct_substrate] " + 
328                            "Can't get alloc id for %s?" %tb)
329                    aid = None
330
331                # This may need another look, but only a service gateway will
332                # look at the active parameter, and these are only inserted to
333                # connect to a master.
334                active = "%s" % ( tb in masters)
335                info = {
336                        'type': 'transit',
337                        'member': [ {
338                            'element': i.element.name, 
339                            'interface': i.name
340                            } for i in s.interfaces \
341                                    if isinstance(i.element, topdl.Computer) ],
342                        'fedAttr': [ 
343                            { 'attribute': 'masterdomain', 'value': mdomain},
344                            { 'attribute': 'masterexperiment', 'value': 
345                                "%s/%s" % (mproject, eid)},
346                            { 'attribute': 'active', 'value': active},
347                            ],
348                        'parameter': [ {
349                            'name': 'vlan_id',
350                            'key': 'fedid:%s/vlan%d' % (expid, dn),
351                            'store': self.store_url,
352                            'type': 'input',
353                            } ]
354                        }
355                if tbs.has_key(tb):
356                    info['peer'] = tbs[tb]
357                connInfo[tb].append(info)
358
359                # Give this allocation the rights to access the key of the
360                # vlan_id
361                if aid:
362                    self.auth.set_attribute(aid, 
363                            'fedid:%s/vlan%d' % (expid, dn))
364            else:
365                raise service_error(service_error.internal,
366                        "No substrate %s in testbed %s" % (sub.name, tb))
367
368        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
369                tb_name)
370
371    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
372            segment_substrate, portals, connInfo, expid):
373        # More than one testbed is on this substrate.  Insert
374        # some portals into the subtopologies.  st == source testbed,
375        # dt == destination testbed.
376        for st in tbs.keys():
377            if not segment_substrate.has_key(st):
378                segment_substrate[st] = { }
379            if not portals.has_key(st): 
380                portals[st] = { }
381            if not connInfo.has_key(st):
382                connInfo[st] = [ ]
383            for dt in [ t for t in tbs.keys() if t != st]:
384                sproject = tbparams[st].get_attribute('project', 'project')
385                dproject = tbparams[dt].get_attribute('project', 'project')
386                sdomain = tbparams[st].get_attribute('domain', ".example.com")
387                ddomain = tbparams[dt].get_attribute('domain', ".example.com")
388                aid = tbparams[dt].allocID
389
390                seer_master = None
391
392                for m in masters.values():
393                    for s in m:
394                        if s.name == 'SEER':
395                            seer_master = m
396                            break
397                    if seer_master: break
398
399                if seer_master:
400                    mdomain = tbparams[seer_master].get_attribute('domain',
401                            '.example.com')
402                    mproject = tbparams[seer_master].get_attribute('project', 
403                            'project')
404                    muser = tbparams[seer_master].get_attribute('user', 'root')
405                    smbshare = tbparams[seer_master].get_attribute('smbshare',
406                            'USERS')
407                else:
408                    mdomain = '.example.com'
409                    mproject = 'project'
410                    muser = 'root'
411                    smbshare = 'USERS'
412
413                if (st in masters  and dt not in masters) or \
414                        (st not in masters and dt in masters):
415                    active = ("%s" % (st in masters))
416                else:
417                    active = ("%s" %(st > dt))
418
419                if not segment_substrate[st].has_key(dt):
420                    # Put a substrate and a segment for the connected
421                    # testbed in there.
422                    tsubstrate, segment_element = \
423                            self.new_portal_substrate(st, dt, eid, tbparams,
424                                    expid)
425                    segment_substrate[st][dt] = tsubstrate
426                    topo[st].substrates.append(tsubstrate)
427                    topo[st].elements.append(segment_element)
428
429                new_portal = False
430                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
431                else: dname = dt
432
433                if testbed_suffix(st): sname = "-".join(split_testbed(st))
434                else: sname = st
435
436                if portals[st].has_key(dt):
437                    # There's a portal set up to go to this destination.
438                    # See if there's room to multiplex this connection on
439                    # it.  If so, add an interface to the portal; if not,
440                    # set up to add a portal below.
441                    # [This little festival of braces is just a pop of the
442                    # last element in the list of portals between st and
443                    # dt.]
444                    portal = portals[st][dt][-1]
445                    mux = len([ i for i in portal.interface \
446                            if not i.get_attribute('portal')])
447                    if mux == self.muxmax:
448                        new_portal = True
449                        portal_type = "experiment"
450                        myname = "%stunnel%d" % (dname.lower(),
451                                len(portals[st][dt]))
452                        desthost = "%stunnel%d" % (sname.lower(), 
453                                len(portals[st][dt]))
454                    else:
455                        new_i = topdl.Interface(
456                                substrate=sub.name,
457                                attribute=[ 
458                                    topdl.Attribute(
459                                        attribute='ip4_address', 
460                                        value=tbs[dt]
461                                    )
462                                ])
463                        portal.interface.append(new_i)
464                else:
465                    # First connection to this testbed, make an empty list
466                    # and set up to add the new portal below
467                    new_portal = True
468                    portals[st][dt] = [ ]
469                    myname = "%stunnel%d" % (dname.lower(),
470                            len(portals[st][dt]))
471                    desthost = "%stunnel%d" % (sname.lower(),
472                            len(portals[st][dt]))
473
474                    if dt in masters or st in masters: portal_type = "both"
475                    else: portal_type = "experiment"
476
477                if new_portal:
478                    infs = (
479                            (segment_substrate[st][dt].name, 
480                                (('portal', 'true'),)),
481                            (sub.name, 
482                                (('ip4_address', tbs[dt]),))
483                        )
484                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
485                            masters, eid, myname, desthost, portal_type,
486                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
487
488                    topo[st].elements.append(portal)
489                    portals[st][dt].append(portal)
490                    connInfo[st].append(info)
491
492    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
493            connInfo, expid):
494        # Add to the master testbed
495        tsubstrate, segment_element = \
496                self.new_portal_substrate(st, dt, eid, tbparams, expid)
497        myname = "%stunnel" % dt
498        desthost = "%stunnel" % st
499
500        portal, info = self.new_portal_node(st, dt, tbparams, masters,
501                eid, myname, desthost, "control", 
502                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
503                conn_attrs=[], expid=expid)
504
505        topo[st].substrates.append(tsubstrate)
506        topo[st].elements.append(segment_element)
507        topo[st].elements.append(portal)
508        if not connInfo.has_key(st):
509            connInfo[st] = [ ]
510        connInfo[st].append(info)
511
512    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
513            substrate, tbparams, expid, tb_name):
514        # Add to the master testbed
515        if testbed_suffix(dt): myname = "%stunnel" % "-".join(split_testbed(dt))
516        else: myname = "%stunnel" % dt
517
518        desthost = "%s" % ip_addr(dip)
519
520        portal, info = self.new_portal_node(st, dt, tbparams, masters,
521                eid, myname, desthost, "control", 
522                ((substrate.name,(
523                    ('portal','true'),
524                    ('ip4_address', "%s" % ip_addr(myip)),)),),
525                conn_type="transit", conn_attrs=[], expid=expid)
526
527        return portal
528
529    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
530            connInfo, expid):
531        """
532        For each substrate in the main topology, find those that
533        have nodes on more than one testbed.  Insert portal nodes
534        into the copies of those substrates on the sub topologies.
535        """
536        segment_substrate = { }
537        portals = { }
538        for s in top.substrates:
539            # tbs will contain an ip address on this subsrate that is in
540            # each testbed.
541            tbs = { }
542            for i in s.interfaces:
543                e = i.element
544                tb = e.get_attribute('testbed')
545                if tb and tb not in tbs:
546                    tbs[tb]= i.get_attribute('ip4_address')
547            if len(tbs) < 2:
548                continue
549
550            base_tbs = set([testbed_base(t) for t in tbs])
551
552            # DRAGON will not create multi-site vlans yet, so we don't do multi
553            # connection direct transits yet.
554            if len(tbs) == 2 :
555                # NB: the else if on the for loop - if none of the direct
556                # transits is applicable, use the internet.
557                for d in self.direct_transit:
558                    if all([tbparams[x].get_attribute(d) for x in tbs]):
559                        self.create_direct_substrate(s, topo, tbs, tbparams, 
560                                masters, eid, connInfo, expid, d)
561                        break
562                else:
563                    self.insert_internet_portals(s, topo, tbs, tbparams,
564                            masters, eid, segment_substrate, portals,
565                            connInfo, expid)
566            else:
567                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
568                        eid, segment_substrate, portals, connInfo, expid)
569
570
571        # Make sure that all the service importers have a control portal back
572        # to the master for each service.
573        for mtb in [ t for t in tbparams if t in masters ]:
574            importers = set([])
575            for m in masters[mtb]:
576                importers |= set(m.importers)
577            if mtb in importers:
578                importers.discard(mtb)
579            for tb in importers:
580                if tb not in topo:
581                    self.log.error("Importer not in experiment: %s" % tb)
582                    continue
583                if len([e for e in topo[tb].elements \
584                        if isinstance(e, topdl.Computer) and \
585                        e.get_attribute('destination_testbed') == mtb and \
586                        e.get_attribute('portal') and \
587                        e.get_attribute('portal_type') == 'both']) == 0:
588
589                    for tb_name in self.direct_transit:
590
591                        if tbparams[mtb].get_attribute(tb_name) \
592                                and tbparams[tb].get_attribute(tb_name):
593
594                            idx = len([x for x in topo.keys() \
595                                    if x.startswith(tb_name)])
596                            dip, leng = ip_allocator.allocate(4)
597                            dip += 1
598                            mip = dip+1
599                            csub = topdl.Substrate(
600                                    name="%s-control-%s" % (tb_name, tb),
601                                    capacity=topdl.Capacity(100000.0, 'max'),
602                                    attribute=[
603                                        topdl.Attribute(
604                                            attribute='portal',
605                                            value='true'
606                                            )
607                                        ]
608                                    )
609                            seg = topdl.Segment(
610                                    id= {'fedid': tbparams[mtb].allocID},
611                                    type='emulab',
612                                    uri = self.tbmap.get(testbed_base(mtb),
613                                        None),
614                                    interface=[ 
615                                        topdl.Interface(
616                                            substrate=csub.name),
617                                        ],
618                                    attribute = [
619                                        topdl.Attribute(attribute=n, value=v)
620                                            for n, v in (\
621                                                ('domain', 
622                                                    tbparams[mtb].get_attribute(
623                                                        'domain',
624                                                        ".example.com")),
625                                                ('experiment', "%s/%s" % \
626                                                        (tbparams[mtb].get_attribute(
627                                                            'project', 
628                                                            'project'), 
629                                                            eid)),)
630                                        ],
631                                    )
632                            portal = self.new_direct_portal(tb, mtb,
633                                    masters, eid, dip, mip, idx, csub,
634                                    tbparams, expid, tb_name)
635                            topo[tb].substrates.append(csub)
636                            topo[tb].elements.append(portal)
637                            topo[tb].elements.append(seg)
638
639                            mcsub = csub.clone()
640                            seg = topdl.Segment(
641                                    id= { 'fedid': tbparams[tb].allocID},
642                                    type='emulab',
643                                    uri = self.tbmap.get(testbed_base(tb),
644                                        None),
645                                    interface=[ 
646                                        topdl.Interface(
647                                            substrate=csub.name),
648                                        ],
649                                    attribute = [
650                                        topdl.Attribute(attribute=n, value=v)
651                                            for n, v in (\
652                                                ('domain', 
653                                                    tbparams[tb].get_attribute(
654                                                        'domain',
655                                                        ".example.com")),
656                                                ('experiment', "%s/%s" % \
657                                                        (tbparams[tb].get_attribute(
658                                                            'project', 
659                                                            'project'), 
660                                                            eid)),)
661                                        ],
662                                    )
663                            portal = self.new_direct_portal(mtb, tb, masters,
664                                    eid, mip, dip, idx, mcsub, tbparams, expid,
665                                    tb_name)
666                            topo[mtb].substrates.append(mcsub)
667                            topo[mtb].elements.append(portal)
668                            topo[mtb].elements.append(seg)
669                            for t in (mtb, tb):
670                                topo[t].incorporate_elements()
671
672                            self.create_direct_substrate(csub, topo, 
673                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
674                                    tbparams, masters, eid, connInfo,
675                                    expid, tb_name)
676                            break
677                    # This matches with the for tb_name in self.direct_transit
678                    else:
679                        self.add_control_portal(mtb, tb, masters, eid, topo, 
680                                tbparams, connInfo, expid)
681                        self.add_control_portal(tb, mtb, masters, eid, topo, 
682                                tbparams, connInfo, expid)
683
684        # Connect the portal nodes into the topologies and clear out
685        # substrates that are not in the topologies
686        for tb in tbparams:
687            topo[tb].incorporate_elements()
688            topo[tb].substrates = \
689                    [s for s in topo[tb].substrates \
690                        if len(s.interfaces) >0]
691
Note: See TracBrowser for help on using the repository browser.