source: fedd/federation/experiment_partition.py @ 9a8cd92

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 9a8cd92 was 43197eb, checked in by Ted Faber <faber@…>, 15 years ago

better service handling including project_export psuedo service done more or less right- tested on dry runs

  • Property mode set to 100644
File size: 18.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2):
46        """
47        Intialize the various attributes
48        """
49
50        self.log = logging.getLogger("fedd.experiment_control." + \
51                "experiment_paritition")
52        self.auth = auth
53        self.store_url = store_url
54        self.tbmap = tbmap
55        self.muxmax = muxmax
56
57
58    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
59            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
60            expid=None):
61        """
62        Return a new internet portal node and a dict with the connectionInfo to
63        be attached.
64        """
65        seer_master = None
66        for m in masters.values():
67            for s in m:
68                if s.name == 'SEER':
69                    seer_master = m
70                    break
71            if seer_master: break
72
73        if seer_master:
74            mdomain = tbparams[seer_master].get('domain', '.example.com')
75            mproject = tbparams[seer_master].get('project', 'project')
76            muser = tbparams[seer_master].get('user', 'root')
77            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
78        else:
79            mdomain = '.example.com'
80            mproject = 'project'
81            muser = 'root'
82            smbshare = 'USERS'
83
84        dproject = tbparams[dt].get('project', 'project')
85        ddomain = tbparams[dt].get('domain', '.example.com')
86
87        if (st in masters and dt not in masters) or \
88                ( st not in masters and dt in masters ):
89            active = ("%s" % (st in masters))
90        else:
91            active = ("%s" % (st > dt))
92
93        print "%s %s" % (st, active)
94
95        ifaces = [ ]
96        for sub, attrs in iface_desc:
97            inf = topdl.Interface(
98                    name="inf%03d" % len(ifaces),
99                    substrate=sub,
100                    attribute=[
101                        topdl.Attribute(
102                            attribute=n,
103                            value = v)
104                        for n, v in attrs
105                        ]
106                    )
107            ifaces.append(inf)
108        if conn_type == "ssh":
109            try:
110                aid = tbparams[st]['allocID']['fedid']
111            except:
112                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
113                        % st)
114                aid = None
115            info = {
116                    "type" : conn_type, 
117                    "portal": myname,
118                    'fedAttr': [ 
119                            { 'attribute': 'masterdomain', 'value': mdomain},
120                            { 'attribute': 'masterexperiment', 'value': 
121                                "%s/%s" % (mproject, eid)},
122                            { 'attribute': 'active', 'value': active},
123                            # Move to SMB service description
124                            { 'attribute': 'masteruser', 'value': muser},
125                            { 'attribute': 'smbshare', 'value': smbshare},
126                        ],
127                    'parameter': [
128                        {
129                            'name': 'peer',
130                            'key': 'fedid:%s/%s' % (expid, myname),
131                            'store': self.store_url,
132                            'type': 'output',
133                        },
134                        {
135                            'name': 'ssh_port',
136                            'key': 'fedid:%s/%s-port' % (expid, myname),
137                            'store': self.store_url,
138                            'type': 'output',
139                        },
140                        {
141                            'name': 'peer',
142                            'key': 'fedid:%s/%s' % (expid, desthost),
143                            'store': self.store_url,
144                            'type': 'input',
145                        },
146                        {
147                            'name': 'ssh_port',
148                            'key': 'fedid:%s/%s-port' % (expid, desthost),
149                            'store': self.store_url,
150                            'type': 'input',
151                        },
152                        ]
153                    }
154            # Give this allocation the rights to access the key of the
155            # peers
156            if aid:
157                for h in (myname, desthost):
158                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
159                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
160                            (expid, h))
161            else:
162                self.log.error("No aid for %s in new_portal_node" % st)
163        else:
164            info = None
165
166        return (topdl.Computer(
167                name=myname,
168                attribute=[ 
169                    topdl.Attribute(attribute=n,value=v)
170                        for n, v in (\
171                            ('portal', 'true'),
172                            ('portal_type', portal_type), 
173                            ('destination_testbed', dt),
174                        )
175                    ],
176                interface=ifaces,
177                ), info)
178
179    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
180        ddomain = tbparams[dt].get('domain', ".example.com")
181        dproject = tbparams[dt].get('project', 'project')
182        tsubstrate = \
183                topdl.Substrate(name='%s-%s' % (st, dt),
184                        attribute= [
185                            topdl.Attribute(
186                                attribute='portal',
187                                value='true')
188                            ]
189                        )
190        segment_element = topdl.Segment(
191                id= tbparams[dt]['allocID'],
192                type='emulab',
193                uri = self.tbmap.get(dt, None),
194                interface=[ 
195                    topdl.Interface(
196                        substrate=tsubstrate.name),
197                    ],
198                attribute = [
199                    topdl.Attribute(attribute=n, value=v)
200                        for n, v in (\
201                            ('domain', ddomain),
202                            ('experiment', "%s/%s" % \
203                                    (dproject, eid)),)
204                    ],
205                )
206
207        return (tsubstrate, segment_element)
208
209    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
210        if sub.capacity is None:
211            raise service_error(service_error.internal,
212                    "Cannot DRAGON split substrate w/o capacity")
213        segs = [ ]
214        substr = topdl.Substrate(name="dragon%d" % idx, 
215                capacity=sub.capacity.clone(),
216                attribute=[ topdl.Attribute(attribute=n, value=v)
217                    for n, v, in (\
218                            ('vlan', 'unassigned%d' % idx),)])
219        name = "dragon%d" % idx
220        store_key = 'fedid:%s/vlan%d' % (expid, idx)
221        for tb in tbs.keys():
222            seg = topdl.Segment(
223                    id = tbparams[tb]['allocID'],
224                    type='emulab',
225                    uri = self.tbmap.get(tb, None),
226                    interface=[ 
227                        topdl.Interface(
228                            substrate=substr.name),
229                        ],
230                    attribute=[ topdl.Attribute(
231                        attribute='dragon_endpoint', 
232                        value=tbparams[tb]['dragon']),
233                        ]
234                    )
235            if tbparams[tb].has_key('vlans'):
236                seg.set_attribute('vlans', tbparams[tb]['vlans'])
237            segs.append(seg)
238
239            # Give this allocation the rights to access the key of the
240            # vlan_id
241            try:
242                aid = tbparams[tb]['allocID']['fedid']
243                self.auth.set_attribute(aid, store_key)
244            except:
245                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
246                        % tb)
247
248        connInfo[name] = [ { 
249            'type': 'transit',
250            'parameter': [ { 
251                'name': 'vlan_id',
252                'key': store_key,
253                'store': self.store_url,
254                'type': 'output'
255                } ]
256            } ]
257
258        topo[name] = \
259                topdl.Topology(substrates=[substr], elements=segs,
260                        attribute=[
261                            topdl.Attribute(attribute="transit", value='true'),
262                            topdl.Attribute(attribute="dynamic", value='true'),
263                            topdl.Attribute(attribute="testbed", 
264                                value='dragon'),
265                            topdl.Attribute(attribute="store_keys", 
266                                value=store_key),
267                            ]
268                        )
269
270    def create_dragon_substrate(self, sub, topo, tbs, tbparams, masters, eid,
271            connInfo, expid=None):
272        """
273        Add attribiutes to the various elements indicating that they are to be
274        dragon connected and create a dragon segment in topo to be
275        instantiated.
276        """
277
278        def get_substrate_from_topo(name, t):
279            for s in t.substrates:
280                if s.name == name: return s
281            else: return None
282
283
284        seer_master = None
285        for m in masters.values():
286            for s in m:
287                if s.name == 'SEER':
288                    seer_master = m
289                    break
290            if seer_master: break
291
292        if seer_master:
293            mdomain = tbparams[seer_master].get('domain', '.example.com')
294            mproject = tbparams[seer_master].get('project', 'project')
295        else:
296            mdomain = '.example.com'
297            mproject = 'project'
298
299        # dn is the number of previously created dragon nets.  This routine
300        # creates a net numbered by dn
301        dn = len([x for x in topo.keys() if x.startswith('dragon')])
302        # Count the number of interfaces on this substrate in each testbed from
303        # the global topology
304        count = { }
305        node = { }
306        for e in [ i.element for i in sub.interfaces ]:
307            tb = e.get_attribute('testbed')
308            count[tb] = count.get(tb, 0) + 1
309            node[tb] = i.get_attribute('ip4_address')
310
311
312        # Set the attributes in the copies that will allow setup of dragon
313        # connections.
314        for tb in tbs.keys():
315            s = get_substrate_from_topo(sub.name, topo[tb])
316            if s:
317                if not connInfo.has_key(tb):
318                    connInfo[tb] = [ ]
319
320                try:
321                    aid = tbparams[tb]['allocID']['fedid']
322                except:
323                    self.log.debug("[creat_dragon_substrate] " + 
324                            "Can't get alloc id for %s?" %tb)
325                    aid = None
326
327                # This may need another look, but only a service gateway will
328                # look at the active parameter, and these are only inserted to
329                # connect to a master.
330                active = "%s" % ( tb in masters)
331                info = {
332                        'type': 'transit',
333                        'member': [ {
334                            'element': i.element.name[0], 
335                            'interface': i.name
336                            } for i in s.interfaces \
337                                    if isinstance(i.element, topdl.Computer) ],
338                        'fedAttr': [ 
339                            { 'attribute': 'masterdomain', 'value': mdomain},
340                            { 'attribute': 'masterexperiment', 'value': 
341                                "%s/%s" % (mproject, eid)},
342                            { 'attribute': 'active', 'value': active},
343                            ],
344                        'parameter': [ {
345                            'name': 'vlan_id',
346                            'key': 'fedid:%s/vlan%d' % (expid, dn),
347                            'store': self.store_url,
348                            'type': 'input',
349                            } ]
350                        }
351                if tbs.has_key(tb):
352                    info['peer'] = tbs[tb]
353                connInfo[tb].append(info)
354
355                # Give this allocation the rights to access the key of the
356                # vlan_id
357                if aid:
358                    self.auth.set_attribute(aid, 
359                            'fedid:%s/vlan%d' % (expid, dn))
360            else:
361                raise service_error(service_error.internal,
362                        "No substrate %s in testbed %s" % (sub.name, tb))
363
364        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
365
366    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
367            segment_substrate, portals, connInfo, expid):
368        # More than one testbed is on this substrate.  Insert
369        # some portals into the subtopologies.  st == source testbed,
370        # dt == destination testbed.
371        for st in tbs.keys():
372            if not segment_substrate.has_key(st):
373                segment_substrate[st] = { }
374            if not portals.has_key(st): 
375                portals[st] = { }
376            if not connInfo.has_key(st):
377                connInfo[st] = [ ]
378            for dt in [ t for t in tbs.keys() if t != st]:
379                sproject = tbparams[st].get('project', 'project')
380                dproject = tbparams[dt].get('project', 'project')
381                sdomain = tbparams[st].get('domain', ".example.com")
382                ddomain = tbparams[dt].get('domain', ".example.com")
383                aid = tbparams[dt]['allocID']['fedid']
384
385                seer_master = None
386
387                for m in masters.values():
388                    for s in m:
389                        if s.name == 'SEER':
390                            seer_master = m
391                            break
392                    if seer_master: break
393
394                if seer_master:
395                    mdomain = tbparams[seer_master].get('domain', '.example.com')
396                    mproject = tbparams[seer_master].get('project', 'project')
397                    muser = tbparams[seer_master].get('user', 'root')
398                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
399                else:
400                    mdomain = '.example.com'
401                    mproject = 'project'
402                    muser = 'root'
403                    smbshare = 'USERS'
404
405                if (st in masters  and dt not in masters) or \
406                        (st not in masters and dt in masters):
407                    active = ("%s" % (st in masters))
408                else:
409                    active = ("%s" %(st > dt))
410
411                if not segment_substrate[st].has_key(dt):
412                    # Put a substrate and a segment for the connected
413                    # testbed in there.
414                    tsubstrate, segment_element = \
415                            self.new_portal_substrate(st, dt, eid, tbparams,
416                                    expid)
417                    segment_substrate[st][dt] = tsubstrate
418                    topo[st].substrates.append(tsubstrate)
419                    topo[st].elements.append(segment_element)
420
421                new_portal = False
422                if portals[st].has_key(dt):
423                    # There's a portal set up to go to this destination.
424                    # See if there's room to multiplex this connection on
425                    # it.  If so, add an interface to the portal; if not,
426                    # set up to add a portal below.
427                    # [This little festival of braces is just a pop of the
428                    # last element in the list of portals between st and
429                    # dt.]
430                    portal = portals[st][dt][-1]
431                    mux = len([ i for i in portal.interface \
432                            if not i.get_attribute('portal')])
433                    if mux == self.muxmax:
434                        new_portal = True
435                        portal_type = "experiment"
436                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
437                        desthost = "%stunnel%d" % (st.lower(), 
438                                len(portals[st][dt]))
439                    else:
440                        new_i = topdl.Interface(
441                                substrate=sub.name,
442                                attribute=[ 
443                                    topdl.Attribute(
444                                        attribute='ip4_address', 
445                                        value=tbs[dt]
446                                    )
447                                ])
448                        portal.interface.append(new_i)
449                else:
450                    # First connection to this testbed, make an empty list
451                    # and set up to add the new portal below
452                    new_portal = True
453                    portals[st][dt] = [ ]
454                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
455                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
456
457                    if dt in masters or st in masters: portal_type = "both"
458                    else: portal_type = "experiment"
459
460                if new_portal:
461                    infs = (
462                            (segment_substrate[st][dt].name, 
463                                (('portal', 'true'),)),
464                            (sub.name, 
465                                (('ip4_address', tbs[dt]),))
466                        )
467                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
468                            masters, eid, myname, desthost, portal_type,
469                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
470
471                    topo[st].elements.append(portal)
472                    portals[st][dt].append(portal)
473                    connInfo[st].append(info)
474
475    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, 
476            connInfo, expid):
477        # Add to the master testbed
478        tsubstrate, segment_element = \
479                self.new_portal_substrate(st, dt, eid, tbparams, expid)
480        myname = "%stunnel" % dt
481        desthost = "%stunnel" % st
482
483        portal, info = self.new_portal_node(st, dt, tbparams, masters,
484                eid, myname, desthost, "control", 
485                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
486                conn_attrs=[], expid=expid)
487
488        topo[st].substrates.append(tsubstrate)
489        topo[st].elements.append(segment_element)
490        topo[st].elements.append(portal)
491        if not connInfo.has_key(st):
492            connInfo[st] = [ ]
493        connInfo[st].append(info)
494
495    def new_dragon_portal(self, st, dt, masters, eid, myip, dip, idx, 
496            substrate, tbparams, expid):
497        # Add to the master testbed
498        myname = "%stunnel" % dt
499        desthost = "%s" % ip_addr(dip)
500
501        portal, info = self.new_portal_node(st, dt, tbparams, masters,
502                eid, myname, desthost, "control", 
503                ((substrate.name,(
504                    ('portal','true'),
505                    ('ip4_address', "%s" % ip_addr(myip)),)),),
506                conn_type="transit", conn_attrs=[], expid=expid)
507
508        return portal
509
510    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
511            connInfo, expid):
512        """
513        For each substrate in the main topology, find those that
514        have nodes on more than one testbed.  Insert portal nodes
515        into the copies of those substrates on the sub topologies.
516        """
517        segment_substrate = { }
518        portals = { }
519        for s in top.substrates:
520            # tbs will contain an ip address on this subsrate that is in
521            # each testbed.
522            tbs = { }
523            for i in s.interfaces:
524                e = i.element
525                tb = e.get_attribute('testbed')
526                if tb and not tbs.has_key(tb):
527                    for i in e.interface:
528                        if s in i.subs:
529                            tbs[tb]= i.get_attribute('ip4_address')
530            if len(tbs) < 2:
531                continue
532
533            # DRAGON will not create multi-site vlans yet
534            if len(tbs) == 2 and \
535                    all([tbparams[x].has_key('dragon') for x in tbs]):
536                self.create_dragon_substrate(s, topo, tbs, tbparams, 
537                        masters, eid, connInfo, expid)
538            else:
539                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
540                        eid, segment_substrate, portals, connInfo, expid)
541
542        # Make sure that all the service importers have a control portal back
543        # to the master for each service.
544        for mtb in [ t for t in tbparams if t in masters ]:
545            importers = set([])
546            for m in masters[mtb]:
547                importers |= set(m.importers)
548            for tb in importers:
549                if len([e for e in topo[tb].elements \
550                        if isinstance(e, topdl.Computer) and \
551                        e.get_attribute('destination_testbed') == mtb and \
552                        e.get_attribute('portal') and \
553                        e.get_attribute('portal_type') == 'both']) == 0:
554
555                    if tbparams[mtb].has_key('dragon') \
556                            and tbparams[tb].has_key('dragon'):
557
558                        idx = len([x for x in topo.keys() \
559                                if x.startswith('dragon')])
560                        dip, leng = ip_allocator.allocate(4)
561                        dip += 1
562                        mip = dip+1
563                        csub = topdl.Substrate(
564                                name="dragon-control-%s" % tb,
565                                capacity=topdl.Capacity(100000.0, 'max'),
566                                attribute=[
567                                    topdl.Attribute(
568                                        attribute='portal',
569                                        value='true'
570                                        )
571                                    ]
572                                )
573                        seg = topdl.Segment(
574                                id= tbparams[mtb]['allocID'],
575                                type='emulab',
576                                uri = self.tbmap.get(master, None),
577                                interface=[ 
578                                    topdl.Interface(
579                                        substrate=csub.name),
580                                    ],
581                                attribute = [
582                                    topdl.Attribute(attribute=n, value=v)
583                                        for n, v in (\
584                                            ('domain', 
585                                                tbparams[mtb].get('domain',
586                                                    ".example.com")),
587                                            ('experiment', "%s/%s" % \
588                                                    (tbparams[mtb].get(
589                                                        'project', 
590                                                        'project'), 
591                                                        eid)),)
592                                    ],
593                                )
594                        portal = self.new_dragon_portal(tb, mtb,
595                                masters, eid, dip, mip, idx, csub,
596                                tbparams, expid)
597                        topo[tb].substrates.append(csub)
598                        topo[tb].elements.append(portal)
599                        topo[tb].elements.append(seg)
600
601                        mcsub = csub.clone()
602                        seg = topdl.Segment(
603                                id= tbparams[tb]['allocID'],
604                                type='emulab',
605                                uri = self.tbmap.get(tb, None),
606                                interface=[ 
607                                    topdl.Interface(
608                                        substrate=csub.name),
609                                    ],
610                                attribute = [
611                                    topdl.Attribute(attribute=n, value=v)
612                                        for n, v in (\
613                                            ('domain', 
614                                                tbparams[tb].get('domain',
615                                                    ".example.com")),
616                                            ('experiment', "%s/%s" % \
617                                                    (tbparams[tb].get('project', 
618                                                        'project'), 
619                                                        eid)),)
620                                    ],
621                                )
622                        portal = self.new_dragon_portal(mtb, tb, masters,
623                                eid, mip, dip, idx, mcsub, tbparams, expid)
624                        topo[mtb].substrates.append(mcsub)
625                        topo[mtb].elements.append(portal)
626                        topo[mtb].elements.append(seg)
627                        for t in (master, tb):
628                            topo[t].incorporate_elements()
629
630                        self.create_dragon_substrate(csub, topo, 
631                                {tb: ip_addr(mip), mtb: ip_addr(dip)}, 
632                                tbparams, masters, eid, connInfo,
633                                expid)
634                    else:
635                        self.add_control_portal(mtb, tb, masters, eid, topo, 
636                                tbparams, connInfo, expid)
637                        self.add_control_portal(tb, mtb, masters, eid, topo, 
638                                tbparams, connInfo, expid)
639
640        # Connect the portal nodes into the topologies and clear out
641        # substrates that are not in the topologies
642        for tb in tbparams:
643            topo[tb].incorporate_elements()
644            topo[tb].substrates = \
645                    [s for s in topo[tb].substrates \
646                        if len(s.interfaces) >0]
647
Note: See TracBrowser for help on using the repository browser.