source: fedd/federation/experiment_partition.py @ eb117fe

Last change on this file since eb117fe was eb117fe, checked in by Ted Faber <faber@…>, 12 years ago

Avoid portal name collision

  • Property mode set to 100644
File size: 20.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from deter import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32from deter import topdl
33from deter import ip_allocator
34from deter import ip_addr
35import list_log
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2, 
46            direct_transit=None, tbactive=None):
47        """
48        Intialize the various attributes
49        """
50
51        self.log = logging.getLogger("fedd.experiment_control." + \
52                "experiment_paritition")
53        self.auth = auth
54        self.store_url = store_url
55        self.tbmap = tbmap
56        self.direct_transit = direct_transit or [ ]
57        self.muxmax = muxmax
58        self.tbactive = tbactive
59
60
61    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
62            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
63            expid=None):
64        """
65        Return a new internet portal node and a dict with the connectionInfo to
66        be attached.
67        """
68        seer_master = None
69        for k, m in masters.items():
70            for s in m:
71                if s.name == 'seer':
72                    seer_master = k
73                    break
74            if seer_master: break
75
76        if seer_master:
77            mdomain = tbparams[seer_master].get_attribute('domain', 
78                    '.example.com')
79            mproject = tbparams[seer_master].get_attribute('project', 
80                    'project')
81            muser = tbparams[seer_master].get_attribute('user', 'root')
82            smbshare = tbparams[seer_master].get_attribute('smbshare', 'USERS')
83        else:
84            mdomain = '.example.com'
85            mproject = 'project'
86            muser = 'root'
87            smbshare = 'USERS'
88
89        dproject = tbparams[dt].get_attribute('project', 'project')
90        ddomain = tbparams[dt].get_attribute('domain', '.example.com')
91
92        if (st in self.tbactive and dt not in self.tbactive) or \
93                ( st not in self.tbactive and dt in self.tbactive ):
94            active = ("%s" % (st in self.tbactive))
95            nat_partner = ("%s" % (dt in self.tbactive))
96        elif (st in masters and dt not in masters) or \
97                ( st not in masters and dt in masters ):
98            active = ("%s" % (st in masters))
99            nat_partner = "%s" % False
100        else:
101            active = ("%s" % (st > dt))
102            nat_partner = "%s" % False
103
104        ifaces = [ ]
105        for sub, attrs in iface_desc:
106            inf = topdl.Interface(
107                    name="inf%03d" % len(ifaces),
108                    substrate=sub,
109                    attribute=[
110                        topdl.Attribute(
111                            attribute=n,
112                            value = v)
113                        for n, v in attrs
114                        ]
115                    )
116            ifaces.append(inf)
117        if conn_type == "ssh":
118            try:
119                aid = tbparams[st].allocID
120            except:
121                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
122                        % st)
123                aid = None
124            info = {
125                    "type" : conn_type, 
126                    "portal": myname,
127                    'fedAttr': [ 
128                            { 'attribute': 'masterdomain', 'value': mdomain},
129                            { 'attribute': 'masterexperiment', 'value': 
130                                "%s/%s" % (mproject, eid)},
131                            { 'attribute': 'active', 'value': active},
132                            { 'attribute': 'nat_partner', 'value': nat_partner},
133                            # Move to SMB service description
134                            { 'attribute': 'masteruser', 'value': muser},
135                            { 'attribute': 'smbshare', 'value': smbshare},
136                        ],
137                    'parameter': [
138                        {
139                            'name': 'peer',
140                            'key': 'fedid:%s/%s' % (expid, myname),
141                            'store': self.store_url,
142                            'type': 'output',
143                        },
144                        {
145                            'name': 'ssh_port',
146                            'key': 'fedid:%s/%s-port' % (expid, myname),
147                            'store': self.store_url,
148                            'type': 'output',
149                        },
150                        {
151                            'name': 'peer',
152                            'key': 'fedid:%s/%s' % (expid, desthost),
153                            'store': self.store_url,
154                            'type': 'input',
155                        },
156                        {
157                            'name': 'ssh_port',
158                            'key': 'fedid:%s/%s-port' % (expid, desthost),
159                            'store': self.store_url,
160                            'type': 'input',
161                        },
162                        ]
163                    }
164            # Give this allocation the rights to access the key of the
165            # peers
166            if aid:
167                for h in (myname, desthost):
168                    # PNNL debug
169                    self.log.debug('Set attribute: %s %s %s' % (aid, expid, h))
170                    try:
171                        self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
172                        self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
173                                (expid, h))
174                    except RuntimeError, e:
175                        raise service_error(service_error.internal, "%s" % e)
176            else:
177                self.log.error("No aid for %s in new_portal_node" % st)
178        else:
179            info = None
180
181        return (topdl.Computer(
182                name=myname,
183                attribute=[ 
184                    topdl.Attribute(attribute=n,value=v)
185                        for n, v in (\
186                            ('portal', 'true'),
187                            ('portal_type', portal_type), 
188                            ('destination_testbed', dt),
189                        )
190                    ],
191                interface=ifaces,
192                ), info)
193
194    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
195        ddomain = tbparams[dt].get_attribute('domain', ".example.com")
196        dproject = tbparams[dt].get_attribute('project', 'project')
197        tsubstrate = \
198                topdl.Substrate(name='%s-%s' % (st, dt),
199                        attribute= [
200                            topdl.Attribute(
201                                attribute='portal',
202                                value='true')
203                            ]
204                        )
205        segment_element = topdl.Segment(
206                id= {'fedid': tbparams[dt].allocID },
207                type='emulab',
208                uri = self.tbmap.get(testbed_base(dt), None),
209                interface=[ 
210                    topdl.Interface(
211                        substrate=tsubstrate.name),
212                    ],
213                attribute = [
214                    topdl.Attribute(attribute=n, value=v)
215                        for n, v in (\
216                            ('domain', ddomain),
217                            ('experiment', "%s/%s" % \
218                                    (dproject, eid)),)
219                    ],
220                )
221
222        return (tsubstrate, segment_element)
223
224    def new_direct_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid,
225            tb_name):
226        if sub.capacity is None:
227            raise service_error(service_error.internal,
228                    "Cannot direct split substrate w/o capacity")
229        segs = [ ]
230        name = join_testbed(tb_name, "%d" % idx)
231        substr = topdl.Substrate(name=name, 
232                capacity=sub.capacity.clone(),
233                attribute=[ topdl.Attribute(attribute=n, value=v)
234                    for n, v, in (\
235                            ('vlan', 'unassigned%d' % idx),)])
236        store_key = 'fedid:%s/vlan%d' % (expid, idx)
237        for tb in tbs.keys():
238            seg = topdl.Segment(
239                    id = { 'fedid':tbparams[tb].allocID },
240                    type='emulab',
241                    uri = self.tbmap.get(testbed_base(tb), None),
242                    interface=[ 
243                        topdl.Interface(
244                            substrate=substr.name),
245                        ],
246                    attribute=[ topdl.Attribute(
247                        attribute='%s_endpoint' % tb_name, 
248                        value=tbparams[tb].get_attribute(tb_name)),
249                        ]
250                    )
251            vlan_key = "%s_vlans" % tb_name
252            if tbparams[tb].get_attribute(vlan_key) is not None:
253                seg.set_attribute(vlan_key, 
254                        tbparams[tb].get_attribute(vlan_key))
255            segs.append(seg)
256
257            # Give this allocation the rights to access the key of the
258            # vlan_id
259            try:
260                aid = tbparams[tb].allocID
261                self.auth.set_attribute(aid, store_key)
262            except:
263                self.log.debug("[new_direct_topo] Can't get alloc id for %s?"\
264                        % tb)
265
266        connInfo[name] = [ { 
267            'type': 'transit',
268            'parameter': [ { 
269                'name': 'vlan_id',
270                'key': store_key,
271                'store': self.store_url,
272                'type': 'output'
273                } ]
274            } ]
275
276        topo[name] = \
277                topdl.Topology(substrates=[substr], elements=segs,
278                        attribute=[
279                            topdl.Attribute(attribute="transit", value='true'),
280                            topdl.Attribute(attribute="dynamic", value='true'),
281                            topdl.Attribute(attribute="testbed", value=tb_name),
282                            topdl.Attribute(attribute="store_keys", 
283                                value=store_key),
284                            ]
285                        )
286
287    def create_direct_substrate(self, sub, topo, tbs, tbparams, masters, eid,
288            connInfo, expid=None, tb_name=None):
289        """
290        Create connection information that tells which nodes are to be
291        connected to direct transits, and create an additional topology with
292        just the interconnected segments and a substrate.
293        """
294
295        def get_substrate_from_topo(name, t):
296            for s in t.substrates:
297                if s.name == name: return s
298            else: return None
299
300
301        seer_master = None
302        for k, m in masters.items():
303            for s in m:
304                if s.name == 'seer':
305                    seer_master = k
306                    break
307            if seer_master: break
308
309        if seer_master:
310            mdomain = tbparams[seer_master].get_attribute('domain',
311                    '.example.com')
312            mproject = tbparams[seer_master].get_attribute('project', 'project')
313        else:
314            mdomain = '.example.com'
315            mproject = 'project'
316
317        # dn is the number of previously created direct nets on this direct
318        # testbed.  This routine creates a net numbered by dn
319        dn = len([x for x in topo.keys() if x.startswith(tb_name)])
320
321        # Set the attributes in the copies that will allow setup of direct
322        # connections.
323        for tb in tbs.keys():
324            s = get_substrate_from_topo(sub.name, topo[tb])
325            if s:
326                if not connInfo.has_key(tb):
327                    connInfo[tb] = [ ]
328
329                try:
330                    aid = tbparams[tb].allocID
331                except:
332                    self.log.debug("[create_direct_substrate] " + 
333                            "Can't get alloc id for %s?" %tb)
334                    aid = None
335
336                # This may need another look, but only a service gateway will
337                # look at the active parameter, and these are only inserted to
338                # connect to a master.
339                active = "%s" % ( tb in masters)
340                info = {
341                        'type': 'transit',
342                        'member': [ {
343                            'element': i.element.name, 
344                            'interface': i.name
345                            } for i in s.interfaces \
346                                    if isinstance(i.element, topdl.Computer) ],
347                        'fedAttr': [ 
348                            { 'attribute': 'masterdomain', 'value': mdomain},
349                            { 'attribute': 'masterexperiment', 'value': 
350                                "%s/%s" % (mproject, eid)},
351                            { 'attribute': 'active', 'value': active},
352                            ],
353                        'parameter': [ {
354                            'name': 'vlan_id',
355                            'key': 'fedid:%s/vlan%d' % (expid, dn),
356                            'store': self.store_url,
357                            'type': 'input',
358                            } ]
359                        }
360                if tbs.has_key(tb):
361                    info['peer'] = tbs[tb]
362                connInfo[tb].append(info)
363
364                # Give this allocation the rights to access the key of the
365                # vlan_id
366                if aid:
367                    self.auth.set_attribute(aid, 
368                            'fedid:%s/vlan%d' % (expid, dn))
369            else:
370                raise service_error(service_error.internal,
371                        "No substrate %s in testbed %s" % (sub.name, tb))
372
373        self.new_direct_topo(dn, sub, topo, tbs, tbparams, connInfo, expid,
374                tb_name)
375
376    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
377            segment_substrate, portals, connInfo, expid):
378        # More than one testbed is on this substrate.  Insert
379        # some portals into the subtopologies.  st == source testbed,
380        # dt == destination testbed.
381        for st in tbs.keys():
382            if not segment_substrate.has_key(st):
383                segment_substrate[st] = { }
384            if not portals.has_key(st): 
385                portals[st] = { }
386            if not connInfo.has_key(st):
387                connInfo[st] = [ ]
388            for dt in [ t for t in tbs.keys() if t != st]:
389                sproject = tbparams[st].get_attribute('project', 'project')
390                dproject = tbparams[dt].get_attribute('project', 'project')
391                sdomain = tbparams[st].get_attribute('domain', ".example.com")
392                ddomain = tbparams[dt].get_attribute('domain', ".example.com")
393                aid = tbparams[dt].allocID
394
395                seer_master = None
396
397                for m in masters.values():
398                    for s in m:
399                        if s.name == 'SEER':
400                            seer_master = m
401                            break
402                    if seer_master: break
403
404                if seer_master:
405                    mdomain = tbparams[seer_master].get_attribute('domain',
406                            '.example.com')
407                    mproject = tbparams[seer_master].get_attribute('project', 
408                            'project')
409                    muser = tbparams[seer_master].get_attribute('user', 'root')
410                    smbshare = tbparams[seer_master].get_attribute('smbshare',
411                            'USERS')
412                else:
413                    mdomain = '.example.com'
414                    mproject = 'project'
415                    muser = 'root'
416                    smbshare = 'USERS'
417
418                if (st in masters  and dt not in masters) or \
419                        (st not in masters and dt in masters):
420                    active = ("%s" % (st in masters))
421                else:
422                    active = ("%s" %(st > dt))
423
424                if not segment_substrate[st].has_key(dt):
425                    # Put a substrate and a segment for the connected
426                    # testbed in there.
427                    tsubstrate, segment_element = \
428                            self.new_portal_substrate(st, dt, eid, tbparams,
429                                    expid)
430                    segment_substrate[st][dt] = tsubstrate
431                    topo[st].substrates.append(tsubstrate)
432                    topo[st].elements.append(segment_element)
433
434                new_portal = False
435                if testbed_suffix(dt): dname = "-".join(split_testbed(dt))
436                else: dname = dt
437
438                if testbed_suffix(st): sname = "-".join(split_testbed(st))
439                else: sname = st
440
441                if portals[st].has_key(dt):
442                    # There's a portal set up to go to this destination.
443                    # See if there's room to multiplex this connection on
444                    # it.  If so, add an interface to the portal; if not,
445                    # set up to add a portal below.
446                    # [This little festival of braces is just a pop of the
447                    # last element in the list of portals between st and
448                    # dt.]
449                    portal = portals[st][dt][-1]
450                    mux = len([ i for i in portal.interface \
451                            if not i.get_attribute('portal')])
452                    if mux == self.muxmax:
453                        new_portal = True
454                        portal_type = "experiment"
455                        myname = "%s%stunnel%d" % (sname.lower(), dname.lower(),
456                                len(portals[st][dt]))
457                        desthost = "%s%stunnel%d" % (dname.lower(),
458                                sname.lower(), len(portals[st][dt]))
459                    else:
460                        new_i = topdl.Interface(
461                                substrate=sub.name,
462                                attribute=[ 
463                                    topdl.Attribute(
464                                        attribute='ip4_address', 
465                                        value=tbs[dt]
466                                    )
467                                ])
468                        portal.interface.append(new_i)
469                else:
470                    # First connection to this testbed, make an empty list
471                    # and set up to add the new portal below
472                    new_portal = True
473                    portals[st][dt] = [ ]
474                    myname = "%s%stunnel%d" % (sname.lower(), dname.lower(),
475                            len(portals[st][dt]))
476                    desthost = "%s%stunnel%d" % (dname.lower(), sname.lower(),
477                            len(portals[st][dt]))
478
479                    if dt in masters or st in masters: portal_type = "both"
480                    else: portal_type = "experiment"
481
482                if new_portal:
483                    infs = (
484                            (segment_substrate[st][dt].name, 
485                                (('portal', 'true'),)),
486                            (sub.name, 
487                                (('ip4_address', tbs[dt]),))
488                        )
489                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
490                            masters, eid, myname, desthost, portal_type,
491                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
492
493                    topo[st].elements.append(portal)
494                    portals[st][dt].append(portal)
495                    connInfo[st].append(info)
496
497    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
498            connInfo, expid):
499        # Add to the master testbed
500        tsubstrate, segment_element = \
501                self.new_portal_substrate(st, dt, eid, tbparams, expid)
502        myname = "%s%stunnel" % (st, dt)
503        desthost = "%s%stunnel" % (dt, st)
504
505        portal, info = self.new_portal_node(st, dt, tbparams, masters,
506                eid, myname, desthost, "control", 
507                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
508                conn_attrs=[], expid=expid)
509
510        topo[st].substrates.append(tsubstrate)
511        topo[st].elements.append(segment_element)
512        topo[st].elements.append(portal)
513        if not connInfo.has_key(st):
514            connInfo[st] = [ ]
515        connInfo[st].append(info)
516
517    def new_direct_portal(self, st, dt, masters, eid, myip, dip, idx, 
518            substrate, tbparams, expid, tb_name):
519        # Add to the master testbed
520        if testbed_suffix(dt): myname = "%s%stunnel" % \
521                ("-".join(split_testbed(dt)), st)
522        else: myname = "%s%stunnel" % (dt, st)
523
524        desthost = "%s" % ip_addr(dip)
525
526        portal, info = self.new_portal_node(st, dt, tbparams, masters,
527                eid, myname, desthost, "control", 
528                ((substrate.name,(
529                    ('portal','true'),
530                    ('ip4_address', "%s" % ip_addr(myip)),)),),
531                conn_type="transit", conn_attrs=[], expid=expid)
532
533        return portal
534
535    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
536            connInfo, expid):
537        """
538        For each substrate in the main topology, find those that
539        have nodes on more than one testbed.  Insert portal nodes
540        into the copies of those substrates on the sub topologies.
541        """
542        segment_substrate = { }
543        portals = { }
544        for s in top.substrates:
545            # tbs will contain an ip address on this subsrate that is in
546            # each testbed.
547            tbs = { }
548            for i in s.interfaces:
549                e = i.element
550                tb = e.get_attribute('testbed')
551                if tb and tb not in tbs:
552                    tbs[tb]= i.get_attribute('ip4_address')
553            if len(tbs) < 2:
554                continue
555
556            base_tbs = set([testbed_base(t) for t in tbs])
557
558            # DRAGON will not create multi-site vlans yet, so we don't do multi
559            # connection direct transits yet.
560            if len(tbs) == 2 :
561                # NB: the else if on the for loop - if none of the direct
562                # transits is applicable, use the internet.
563                for d in self.direct_transit:
564                    if all([tbparams[x].get_attribute(d) for x in tbs]):
565                        self.create_direct_substrate(s, topo, tbs, tbparams, 
566                                masters, eid, connInfo, expid, d)
567                        break
568                else:
569                    self.insert_internet_portals(s, topo, tbs, tbparams,
570                            masters, eid, segment_substrate, portals,
571                            connInfo, expid)
572            else:
573                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
574                        eid, segment_substrate, portals, connInfo, expid)
575
576
577        # Make sure that all the service importers have a control portal back
578        # to the master for each service.
579        for mtb in [ t for t in tbparams if t in masters ]:
580            importers = set([])
581            for m in masters[mtb]:
582                importers |= set(m.importers)
583            if mtb in importers:
584                importers.discard(mtb)
585            for tb in importers:
586                if tb not in topo:
587                    self.log.error("Importer not in experiment: %s" % tb)
588                    continue
589                if len([e for e in topo[tb].elements \
590                        if isinstance(e, topdl.Computer) and \
591                        e.get_attribute('destination_testbed') == mtb and \
592                        e.get_attribute('portal') and \
593                        e.get_attribute('portal_type') == 'both']) == 0:
594
595                    for tb_name in self.direct_transit:
596
597                        if tbparams[mtb].get_attribute(tb_name) \
598                                and tbparams[tb].get_attribute(tb_name):
599
600                            idx = len([x for x in topo.keys() \
601                                    if x.startswith(tb_name)])
602                            dip, leng = ip_allocator.allocate(4)
603                            dip += 1
604                            mip = dip+1
605                            csub = topdl.Substrate(
606                                    name="%s-control-%s" % (tb_name, tb),
607                                    capacity=topdl.Capacity(100000.0, 'max'),
608                                    attribute=[
609                                        topdl.Attribute(
610                                            attribute='portal',
611                                            value='true'
612                                            )
613                                        ]
614                                    )
615                            seg = topdl.Segment(
616                                    id= {'fedid': tbparams[mtb].allocID},
617                                    type='emulab',
618                                    uri = self.tbmap.get(testbed_base(mtb),
619                                        None),
620                                    interface=[ 
621                                        topdl.Interface(
622                                            substrate=csub.name),
623                                        ],
624                                    attribute = [
625                                        topdl.Attribute(attribute=n, value=v)
626                                            for n, v in (\
627                                                ('domain', 
628                                                    tbparams[mtb].get_attribute(
629                                                        'domain',
630                                                        ".example.com")),
631                                                ('experiment', "%s/%s" % \
632                                                        (tbparams[mtb].get_attribute(
633                                                            'project', 
634                                                            'project'), 
635                                                            eid)),)
636                                        ],
637                                    )
638                            portal = self.new_direct_portal(tb, mtb,
639                                    masters, eid, dip, mip, idx, csub,
640                                    tbparams, expid, tb_name)
641                            topo[tb].substrates.append(csub)
642                            topo[tb].elements.append(portal)
643                            topo[tb].elements.append(seg)
644
645                            mcsub = csub.clone()
646                            seg = topdl.Segment(
647                                    id= { 'fedid': tbparams[tb].allocID},
648                                    type='emulab',
649                                    uri = self.tbmap.get(testbed_base(tb),
650                                        None),
651                                    interface=[ 
652                                        topdl.Interface(
653                                            substrate=csub.name),
654                                        ],
655                                    attribute = [
656                                        topdl.Attribute(attribute=n, value=v)
657                                            for n, v in (\
658                                                ('domain', 
659                                                    tbparams[tb].get_attribute(
660                                                        'domain',
661                                                        ".example.com")),
662                                                ('experiment', "%s/%s" % \
663                                                        (tbparams[tb].get_attribute(
664                                                            'project', 
665                                                            'project'), 
666                                                            eid)),)
667                                        ],
668                                    )
669                            portal = self.new_direct_portal(mtb, tb, masters,
670                                    eid, mip, dip, idx, mcsub, tbparams, expid,
671                                    tb_name)
672                            topo[mtb].substrates.append(mcsub)
673                            topo[mtb].elements.append(portal)
674                            topo[mtb].elements.append(seg)
675                            for t in (mtb, tb):
676                                topo[t].incorporate_elements()
677
678                            self.create_direct_substrate(csub, topo, 
679                                    {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
680                                    tbparams, masters, eid, connInfo,
681                                    expid, tb_name)
682                            break
683                    # This matches with the for tb_name in self.direct_transit
684                    else:
685                        self.add_control_portal(mtb, tb, masters, eid, topo, 
686                                tbparams, connInfo, expid)
687                        self.add_control_portal(tb, mtb, masters, eid, topo, 
688                                tbparams, connInfo, expid)
689
690        # Connect the portal nodes into the topologies and clear out
691        # substrates that are not in the topologies
692        for tb in tbparams:
693            topo[tb].incorporate_elements()
694            topo[tb].substrates = \
695                    [s for s in topo[tb].substrates \
696                        if len(s.interfaces) >0]
697
Note: See TracBrowser for help on using the repository browser.