source: fedd/federation/experiment_partition.py @ 73e7f5c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 73e7f5c was 73e7f5c, checked in by Ted Faber <faber@…>, 14 years ago

Split the experiment partition routines out into a separate class

  • Property mode set to 100644
File size: 17.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2):
46        """
47        Intialize the various attributes
48        """
49
50        self.log = logging.getLogger("fedd.experiment_control." + \
51                "experiment_paritition")
52        self.auth = auth
53        self.store_url = store_url
54        self.tbmap = tbmap
55        self.muxmax = muxmax
56
57
58    def new_portal_node(self, st, dt, tbparams, master, eid, myname, desthost,
59            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
60            expid=None):
61        """
62        Return a new internet portal node and a dict with the connectionInfo to
63        be attached.
64        """
65        dproject = tbparams[dt].get('project', 'project')
66        ddomain = tbparams[dt].get('domain', ".example.com")
67        mdomain = tbparams[master].get('domain', '.example.com')
68        mproject = tbparams[master].get('project', 'project')
69        muser = tbparams[master].get('user', 'root')
70        smbshare = tbparams[master].get('smbshare', 'USERS')
71
72        if st == master or dt == master:
73            active = ("%s" % (st == master))
74        else:
75            active = ("%s" % (st > dt))
76
77        ifaces = [ ]
78        for sub, attrs in iface_desc:
79            inf = topdl.Interface(
80                    name="inf%03d" % len(ifaces),
81                    substrate=sub,
82                    attribute=[
83                        topdl.Attribute(
84                            attribute=n,
85                            value = v)
86                        for n, v in attrs
87                        ]
88                    )
89            ifaces.append(inf)
90        if conn_type == "ssh":
91            try:
92                aid = tbparams[st]['allocID']['fedid']
93            except:
94                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
95                        % st)
96                aid = None
97            info = {
98                    "type" : conn_type, 
99                    "portal": myname,
100                    'fedAttr': [ 
101                            { 'attribute': 'masterdomain', 'value': mdomain},
102                            { 'attribute': 'masterexperiment', 'value': 
103                                "%s/%s" % (mproject, eid)},
104                            { 'attribute': 'active', 'value': active},
105                            # Move to SMB service description
106                            { 'attribute': 'masteruser', 'value': muser},
107                            { 'attribute': 'smbshare', 'value': smbshare},
108                        ],
109                    'parameter': [
110                        {
111                            'name': 'peer',
112                            'key': 'fedid:%s/%s' % (expid, myname),
113                            'store': self.store_url,
114                            'type': 'output',
115                        },
116                        {
117                            'name': 'ssh_port',
118                            'key': 'fedid:%s/%s-port' % (expid, myname),
119                            'store': self.store_url,
120                            'type': 'output',
121                        },
122                        {
123                            'name': 'peer',
124                            'key': 'fedid:%s/%s' % (expid, desthost),
125                            'store': self.store_url,
126                            'type': 'input',
127                        },
128                        {
129                            'name': 'ssh_port',
130                            'key': 'fedid:%s/%s-port' % (expid, desthost),
131                            'store': self.store_url,
132                            'type': 'input',
133                        },
134                        ]
135                    }
136            # Give this allocation the rights to access the key of the
137            # peers
138            if aid:
139                for h in (myname, desthost):
140                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
141                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
142                            (expid, h))
143            else:
144                self.log.error("No aid for %s in new_portal_node" % st)
145        else:
146            info = None
147
148        return (topdl.Computer(
149                name=myname,
150                attribute=[ 
151                    topdl.Attribute(attribute=n,value=v)
152                        for n, v in (\
153                            ('portal', 'true'),
154                            ('portal_type', portal_type), 
155                        )
156                    ],
157                interface=ifaces,
158                ), info)
159
160    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
161        ddomain = tbparams[dt].get('domain', ".example.com")
162        dproject = tbparams[dt].get('project', 'project')
163        tsubstrate = \
164                topdl.Substrate(name='%s-%s' % (st, dt),
165                        attribute= [
166                            topdl.Attribute(
167                                attribute='portal',
168                                value='true')
169                            ]
170                        )
171        segment_element = topdl.Segment(
172                id= tbparams[dt]['allocID'],
173                type='emulab',
174                uri = self.tbmap.get(dt, None),
175                interface=[ 
176                    topdl.Interface(
177                        substrate=tsubstrate.name),
178                    ],
179                attribute = [
180                    topdl.Attribute(attribute=n, value=v)
181                        for n, v in (\
182                            ('domain', ddomain),
183                            ('experiment', "%s/%s" % \
184                                    (dproject, eid)),)
185                    ],
186                )
187
188        return (tsubstrate, segment_element)
189
190    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
191        if sub.capacity is None:
192            raise service_error(service_error.internal,
193                    "Cannot DRAGON split substrate w/o capacity")
194        segs = [ ]
195        substr = topdl.Substrate(name="dragon%d" % idx, 
196                capacity=sub.capacity.clone(),
197                attribute=[ topdl.Attribute(attribute=n, value=v)
198                    for n, v, in (\
199                            ('vlan', 'unassigned%d' % idx),)])
200        name = "dragon%d" % idx
201        store_key = 'fedid:%s/vlan%d' % (expid, idx)
202        for tb in tbs.keys():
203            seg = topdl.Segment(
204                    id = tbparams[tb]['allocID'],
205                    type='emulab',
206                    uri = self.tbmap.get(tb, None),
207                    interface=[ 
208                        topdl.Interface(
209                            substrate=substr.name),
210                        ],
211                    attribute=[ topdl.Attribute(
212                        attribute='dragon_endpoint', 
213                        value=tbparams[tb]['dragon']),
214                        ]
215                    )
216            if tbparams[tb].has_key('vlans'):
217                seg.set_attribute('vlans', tbparams[tb]['vlans'])
218            segs.append(seg)
219
220            # Give this allocation the rights to access the key of the
221            # vlan_id
222            try:
223                aid = tbparams[tb]['allocID']['fedid']
224                self.auth.set_attribute(aid, store_key)
225            except:
226                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
227                        % tb)
228
229        connInfo[name] = [ { 
230            'type': 'transit',
231            'parameter': [ { 
232                'name': 'vlan_id',
233                'key': store_key,
234                'store': self.store_url,
235                'type': 'output'
236                } ]
237            } ]
238
239        topo[name] = \
240                topdl.Topology(substrates=[substr], elements=segs,
241                        attribute=[
242                            topdl.Attribute(attribute="transit", value='true'),
243                            topdl.Attribute(attribute="dynamic", value='true'),
244                            topdl.Attribute(attribute="testbed", 
245                                value='dragon'),
246                            topdl.Attribute(attribute="store_keys", 
247                                value=store_key),
248                            ]
249                        )
250
251    def create_dragon_substrate(self, sub, topo, tbs, tbparams, master, eid,
252            connInfo, expid=None):
253        """
254        Add attribiutes to the various elements indicating that they are to be
255        dragon connected and create a dragon segment in topo to be
256        instantiated.
257        """
258
259        def get_substrate_from_topo(name, t):
260            for s in t.substrates:
261                if s.name == name: return s
262            else: return None
263
264
265        mdomain = tbparams[master].get('domain', '.example.com')
266        mproject = tbparams[master].get('project', 'project')
267        # dn is the number of previously created dragon nets.  This routine
268        # creates a net numbered by dn
269        dn = len([x for x in topo.keys() if x.startswith('dragon')])
270        # Count the number of interfaces on this substrate in each testbed from
271        # the global topology
272        count = { }
273        node = { }
274        for e in [ i.element for i in sub.interfaces ]:
275            tb = e.get_attribute('testbed')
276            count[tb] = count.get(tb, 0) + 1
277            node[tb] = i.get_attribute('ip4_address')
278
279
280        # Set the attributes in the copies that will allow setup of dragon
281        # connections.
282        for tb in tbs.keys():
283            s = get_substrate_from_topo(sub.name, topo[tb])
284            if s:
285                if not connInfo.has_key(tb):
286                    connInfo[tb] = [ ]
287
288                try:
289                    aid = tbparams[tb]['allocID']['fedid']
290                except:
291                    self.log.debug("[creat_dragon_substrate] " + 
292                            "Can't get alloc id for %s?" %tb)
293                    aid = None
294
295                # This may need another look, but only a service gateway will
296                # look at the active parameter, and these are only inserted to
297                # connect to the master.
298                active = "%s" % ( tb == master)
299                info = {
300                        'type': 'transit',
301                        'member': [ {
302                            'element': i.element.name[0], 
303                            'interface': i.name
304                            } for i in s.interfaces \
305                                    if isinstance(i.element, topdl.Computer) ],
306                        'fedAttr': [ 
307                            { 'attribute': 'masterdomain', 'value': mdomain},
308                            { 'attribute': 'masterexperiment', 'value': 
309                                "%s/%s" % (mproject, eid)},
310                            { 'attribute': 'active', 'value': active},
311                            ],
312                        'parameter': [ {
313                            'name': 'vlan_id',
314                            'key': 'fedid:%s/vlan%d' % (expid, dn),
315                            'store': self.store_url,
316                            'type': 'input',
317                            } ]
318                        }
319                if tbs.has_key(tb):
320                    info['peer'] = tbs[tb]
321                connInfo[tb].append(info)
322
323                # Give this allocation the rights to access the key of the
324                # vlan_id
325                if aid:
326                    self.auth.set_attribute(aid, 
327                            'fedid:%s/vlan%d' % (expid, dn))
328            else:
329                raise service_error(service_error.internal,
330                        "No substrate %s in testbed %s" % (sub.name, tb))
331
332        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
333
334    def insert_internet_portals(self, sub, topo, tbs, tbparams, master, eid,
335            segment_substrate, portals, connInfo, expid):
336        # More than one testbed is on this substrate.  Insert
337        # some portals into the subtopologies.  st == source testbed,
338        # dt == destination testbed.
339        for st in tbs.keys():
340            if not segment_substrate.has_key(st):
341                segment_substrate[st] = { }
342            if not portals.has_key(st): 
343                portals[st] = { }
344            if not connInfo.has_key(st):
345                connInfo[st] = [ ]
346            for dt in [ t for t in tbs.keys() if t != st]:
347                sproject = tbparams[st].get('project', 'project')
348                dproject = tbparams[dt].get('project', 'project')
349                mproject = tbparams[master].get('project', 'project')
350                sdomain = tbparams[st].get('domain', ".example.com")
351                ddomain = tbparams[dt].get('domain', ".example.com")
352                mdomain = tbparams[master].get('domain', '.example.com')
353                muser = tbparams[master].get('user', 'root')
354                smbshare = tbparams[master].get('smbshare', 'USERS')
355                aid = tbparams[dt]['allocID']['fedid']
356                if st == master or dt == master:
357                    active = ("%s" % (st == master))
358                else:
359                    active = ("%s" %(st > dt))
360                if not segment_substrate[st].has_key(dt):
361                    # Put a substrate and a segment for the connected
362                    # testbed in there.
363                    tsubstrate, segment_element = \
364                            self.new_portal_substrate(st, dt, eid, tbparams,
365                                    expid)
366                    segment_substrate[st][dt] = tsubstrate
367                    topo[st].substrates.append(tsubstrate)
368                    topo[st].elements.append(segment_element)
369
370                new_portal = False
371                if portals[st].has_key(dt):
372                    # There's a portal set up to go to this destination.
373                    # See if there's room to multiplex this connection on
374                    # it.  If so, add an interface to the portal; if not,
375                    # set up to add a portal below.
376                    # [This little festival of braces is just a pop of the
377                    # last element in the list of portals between st and
378                    # dt.]
379                    portal = portals[st][dt][-1]
380                    mux = len([ i for i in portal.interface \
381                            if not i.get_attribute('portal')])
382                    if mux == self.muxmax:
383                        new_portal = True
384                        portal_type = "experiment"
385                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
386                        desthost = "%stunnel%d" % (st.lower(), 
387                                len(portals[st][dt]))
388                    else:
389                        new_i = topdl.Interface(
390                                substrate=sub.name,
391                                attribute=[ 
392                                    topdl.Attribute(
393                                        attribute='ip4_address', 
394                                        value=tbs[dt]
395                                    )
396                                ])
397                        portal.interface.append(new_i)
398                else:
399                    # First connection to this testbed, make an empty list
400                    # and set up to add the new portal below
401                    new_portal = True
402                    portals[st][dt] = [ ]
403                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
404                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
405
406                    if dt == master or st == master: portal_type = "both"
407                    else: portal_type = "experiment"
408
409                if new_portal:
410                    infs = (
411                            (segment_substrate[st][dt].name, 
412                                (('portal', 'true'),)),
413                            (sub.name, 
414                                (('ip4_address', tbs[dt]),))
415                        )
416                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
417                            master, eid, myname, desthost, portal_type,
418                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
419
420                    topo[st].elements.append(portal)
421                    portals[st][dt].append(portal)
422                    connInfo[st].append(info)
423
424    def add_control_portal(self, st, dt, master, eid, topo, tbparams, connInfo, expid):
425        # Add to the master testbed
426        tsubstrate, segment_element = \
427                self.new_portal_substrate(st, dt, eid, tbparams, expid)
428        myname = "%stunnel" % dt
429        desthost = "%stunnel" % st
430
431        portal, info = self.new_portal_node(st, dt, tbparams, master,
432                eid, myname, desthost, "control", 
433                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
434                conn_attrs=[], expid=expid)
435
436        topo[st].substrates.append(tsubstrate)
437        topo[st].elements.append(segment_element)
438        topo[st].elements.append(portal)
439        if not connInfo.has_key(st):
440            connInfo[st] = [ ]
441        connInfo[st].append(info)
442
443    def new_dragon_portal(self, st, dt, master, eid, myip, dip, idx, 
444            substrate, tbparams, expid):
445        # Add to the master testbed
446        myname = "%stunnel" % dt
447        desthost = "%s" % ip_addr(dip)
448
449        portal, info = self.new_portal_node(st, dt, tbparams, master,
450                eid, myname, desthost, "control", 
451                ((substrate.name,(
452                    ('portal','true'),
453                    ('ip4_address', "%s" % ip_addr(myip)),)),),
454                conn_type="transit", conn_attrs=[], expid=expid)
455
456        return portal
457
458    def add_portals(self, top, topo, eid, master, tbparams, ip_allocator, 
459            connInfo, expid):
460        """
461        For each substrate in the main topology, find those that
462        have nodes on more than one testbed.  Insert portal nodes
463        into the copies of those substrates on the sub topologies.
464        """
465        segment_substrate = { }
466        portals = { }
467        for s in top.substrates:
468            # tbs will contain an ip address on this subsrate that is in
469            # each testbed.
470            tbs = { }
471            for i in s.interfaces:
472                e = i.element
473                tb = e.get_attribute('testbed')
474                if tb and not tbs.has_key(tb):
475                    for i in e.interface:
476                        if s in i.subs:
477                            tbs[tb]= i.get_attribute('ip4_address')
478            if len(tbs) < 2:
479                continue
480
481            # DRAGON will not create multi-site vlans yet
482            if len(tbs) == 2 and \
483                    all([tbparams[x].has_key('dragon') for x in tbs]):
484                self.create_dragon_substrate(s, topo, tbs, tbparams, 
485                        master, eid, connInfo, expid)
486            else:
487                self.insert_internet_portals(s, topo, tbs, tbparams, master,
488                        eid, segment_substrate, portals, connInfo, expid)
489
490        # Make sure that all the slaves have a control portal back to the
491        # master.
492        for tb in [ t for t in tbparams.keys() if t != master ]:
493            if len([e for e in topo[tb].elements \
494                    if isinstance(e, topdl.Computer) and \
495                    e.get_attribute('portal') and \
496                    e.get_attribute('portal_type') == 'both']) == 0:
497
498                if tbparams[master].has_key('dragon') \
499                        and tbparams[tb].has_key('dragon'):
500
501                    idx = len([x for x in topo.keys() \
502                            if x.startswith('dragon')])
503                    dip, leng = ip_allocator.allocate(4)
504                    dip += 1
505                    mip = dip+1
506                    csub = topdl.Substrate(
507                            name="dragon-control-%s" % tb,
508                            capacity=topdl.Capacity(100000.0, 'max'),
509                            attribute=[
510                                topdl.Attribute(
511                                    attribute='portal',
512                                    value='true'
513                                    )
514                                ]
515                            )
516                    seg = topdl.Segment(
517                            id= tbparams[master]['allocID'],
518                            type='emulab',
519                            uri = self.tbmap.get(master, None),
520                            interface=[ 
521                                topdl.Interface(
522                                    substrate=csub.name),
523                                ],
524                            attribute = [
525                                topdl.Attribute(attribute=n, value=v)
526                                    for n, v in (\
527                                        ('domain', 
528                                            tbparams[master].get('domain',
529                                                ".example.com")),
530                                        ('experiment', "%s/%s" % \
531                                                (tbparams[master].get(
532                                                    'project', 
533                                                    'project'), 
534                                                    eid)),)
535                                ],
536                            )
537                    portal = self.new_dragon_portal(tb, master,
538                            master, eid, dip, mip, idx, csub, tbparams, expid)
539                    topo[tb].substrates.append(csub)
540                    topo[tb].elements.append(portal)
541                    topo[tb].elements.append(seg)
542
543                    mcsub = csub.clone()
544                    seg = topdl.Segment(
545                            id= tbparams[tb]['allocID'],
546                            type='emulab',
547                            uri = self.tbmap.get(tb, None),
548                            interface=[ 
549                                topdl.Interface(
550                                    substrate=csub.name),
551                                ],
552                            attribute = [
553                                topdl.Attribute(attribute=n, value=v)
554                                    for n, v in (\
555                                        ('domain', 
556                                            tbparams[tb].get('domain',
557                                                ".example.com")),
558                                        ('experiment', "%s/%s" % \
559                                                (tbparams[tb].get('project', 
560                                                    'project'), 
561                                                    eid)),)
562                                ],
563                            )
564                    portal = self.new_dragon_portal(master, tb, master,
565                            eid, mip, dip, idx, mcsub, tbparams, expid)
566                    topo[master].substrates.append(mcsub)
567                    topo[master].elements.append(portal)
568                    topo[master].elements.append(seg)
569                    for t in (master, tb):
570                        topo[t].incorporate_elements()
571
572                    self.create_dragon_substrate(csub, topo, 
573                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
574                            tbparams, master, eid, connInfo,
575                            expid)
576                else:
577                    self.add_control_portal(master, tb, master, eid, topo, 
578                            tbparams, connInfo, expid)
579                    self.add_control_portal(tb, master, master, eid, topo, 
580                            tbparams, connInfo, expid)
581
582        # Connect the portal nodes into the topologies and clear out
583        # substrates that are not in the topologies
584        for tb in tbparams.keys():
585            topo[tb].incorporate_elements()
586            topo[tb].substrates = \
587                    [s for s in topo[tb].substrates \
588                        if len(s.interfaces) >0]
589
Note: See TracBrowser for help on using the repository browser.