source: fedd/federation/experiment_partition.py @ 5f96438

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5f96438 was 5f96438, checked in by Ted Faber <faber@…>, 14 years ago

checkpoint for download - not tested

  • Property mode set to 100644
File size: 17.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31
32import topdl
33import list_log
34from ip_allocator import ip_allocator
35from ip_addr import ip_addr
36
37
38class nullHandler(logging.Handler):
39    def emit(self, record): pass
40
41fl = logging.getLogger("fedd.experiment_control")
42fl.addHandler(nullHandler())
43
44class experiment_partition: 
45    def __init__(self, auth=None, store_url=None, tbmap=None, muxmax=2):
46        """
47        Intialize the various attributes
48        """
49
50        self.log = logging.getLogger("fedd.experiment_control." + \
51                "experiment_paritition")
52        self.auth = auth
53        self.store_url = store_url
54        self.tbmap = tbmap
55        self.muxmax = muxmax
56
57
58    def new_portal_node(self, st, dt, tbparams, masters, eid, myname, desthost,
59            portal_type, iface_desc=(), conn_type="ssh", conn_attrs=[], 
60            expid=None):
61        """
62        Return a new internet portal node and a dict with the connectionInfo to
63        be attached.
64        """
65        seer_master = None
66        for k, s in masters.items():
67            if 'SEER' in s:
68                seer_master = k
69                break
70
71        if seer_master:
72            mdomain = tbparams[seer_master].get('domain', '.example.com')
73            mproject = tbparams[seer_master].get('project', 'project')
74            muser = tbparams[seer_master].get('user', 'root')
75            smbshare = tbparams[seer_master].get('smbshare', 'USERS')
76        else:
77            mdomain = '.example.com'
78            mproject = 'project'
79            muser = 'root'
80            smbshare = 'USERS'
81
82        dproject = tbparams[dt].get('project', 'project')
83        ddomain = tbparams[dt].get('domain', ".example.com")
84
85        if (st in masters and dt not in masters) or \
86                ( st in masters and dt not in masters ):
87            active = ("%s" % (st in masters))
88        else:
89            active = ("%s" % (st > dt))
90
91        ifaces = [ ]
92        for sub, attrs in iface_desc:
93            inf = topdl.Interface(
94                    name="inf%03d" % len(ifaces),
95                    substrate=sub,
96                    attribute=[
97                        topdl.Attribute(
98                            attribute=n,
99                            value = v)
100                        for n, v in attrs
101                        ]
102                    )
103            ifaces.append(inf)
104        if conn_type == "ssh":
105            try:
106                aid = tbparams[st]['allocID']['fedid']
107            except:
108                self.log.debug("[new_portal_node] Can't get alloc id for %s?" \
109                        % st)
110                aid = None
111            info = {
112                    "type" : conn_type, 
113                    "portal": myname,
114                    'fedAttr': [ 
115                            { 'attribute': 'masterdomain', 'value': mdomain},
116                            { 'attribute': 'masterexperiment', 'value': 
117                                "%s/%s" % (mproject, eid)},
118                            { 'attribute': 'active', 'value': active},
119                            # Move to SMB service description
120                            { 'attribute': 'masteruser', 'value': muser},
121                            { 'attribute': 'smbshare', 'value': smbshare},
122                        ],
123                    'parameter': [
124                        {
125                            'name': 'peer',
126                            'key': 'fedid:%s/%s' % (expid, myname),
127                            'store': self.store_url,
128                            'type': 'output',
129                        },
130                        {
131                            'name': 'ssh_port',
132                            'key': 'fedid:%s/%s-port' % (expid, myname),
133                            'store': self.store_url,
134                            'type': 'output',
135                        },
136                        {
137                            'name': 'peer',
138                            'key': 'fedid:%s/%s' % (expid, desthost),
139                            'store': self.store_url,
140                            'type': 'input',
141                        },
142                        {
143                            'name': 'ssh_port',
144                            'key': 'fedid:%s/%s-port' % (expid, desthost),
145                            'store': self.store_url,
146                            'type': 'input',
147                        },
148                        ]
149                    }
150            # Give this allocation the rights to access the key of the
151            # peers
152            if aid:
153                for h in (myname, desthost):
154                    self.auth.set_attribute(aid, 'fedid:%s/%s' % (expid, h))
155                    self.auth.set_attribute(aid, 'fedid:%s/%s-port' % \
156                            (expid, h))
157            else:
158                self.log.error("No aid for %s in new_portal_node" % st)
159        else:
160            info = None
161
162        return (topdl.Computer(
163                name=myname,
164                attribute=[ 
165                    topdl.Attribute(attribute=n,value=v)
166                        for n, v in (\
167                            ('portal', 'true'),
168                            ('portal_type', portal_type), 
169                        )
170                    ],
171                interface=ifaces,
172                ), info)
173
174    def new_portal_substrate(self, st, dt, eid, tbparams, expid):
175        ddomain = tbparams[dt].get('domain', ".example.com")
176        dproject = tbparams[dt].get('project', 'project')
177        tsubstrate = \
178                topdl.Substrate(name='%s-%s' % (st, dt),
179                        attribute= [
180                            topdl.Attribute(
181                                attribute='portal',
182                                value='true')
183                            ]
184                        )
185        segment_element = topdl.Segment(
186                id= tbparams[dt]['allocID'],
187                type='emulab',
188                uri = self.tbmap.get(dt, None),
189                interface=[ 
190                    topdl.Interface(
191                        substrate=tsubstrate.name),
192                    ],
193                attribute = [
194                    topdl.Attribute(attribute=n, value=v)
195                        for n, v in (\
196                            ('domain', ddomain),
197                            ('experiment', "%s/%s" % \
198                                    (dproject, eid)),)
199                    ],
200                )
201
202        return (tsubstrate, segment_element)
203
204    def new_dragon_topo(self, idx, sub, topo, tbs, tbparams, connInfo, expid):
205        if sub.capacity is None:
206            raise service_error(service_error.internal,
207                    "Cannot DRAGON split substrate w/o capacity")
208        segs = [ ]
209        substr = topdl.Substrate(name="dragon%d" % idx, 
210                capacity=sub.capacity.clone(),
211                attribute=[ topdl.Attribute(attribute=n, value=v)
212                    for n, v, in (\
213                            ('vlan', 'unassigned%d' % idx),)])
214        name = "dragon%d" % idx
215        store_key = 'fedid:%s/vlan%d' % (expid, idx)
216        for tb in tbs.keys():
217            seg = topdl.Segment(
218                    id = tbparams[tb]['allocID'],
219                    type='emulab',
220                    uri = self.tbmap.get(tb, None),
221                    interface=[ 
222                        topdl.Interface(
223                            substrate=substr.name),
224                        ],
225                    attribute=[ topdl.Attribute(
226                        attribute='dragon_endpoint', 
227                        value=tbparams[tb]['dragon']),
228                        ]
229                    )
230            if tbparams[tb].has_key('vlans'):
231                seg.set_attribute('vlans', tbparams[tb]['vlans'])
232            segs.append(seg)
233
234            # Give this allocation the rights to access the key of the
235            # vlan_id
236            try:
237                aid = tbparams[tb]['allocID']['fedid']
238                self.auth.set_attribute(aid, store_key)
239            except:
240                self.log.debug("[new_dragon_topo] Can't get alloc id for %s?"\
241                        % tb)
242
243        connInfo[name] = [ { 
244            'type': 'transit',
245            'parameter': [ { 
246                'name': 'vlan_id',
247                'key': store_key,
248                'store': self.store_url,
249                'type': 'output'
250                } ]
251            } ]
252
253        topo[name] = \
254                topdl.Topology(substrates=[substr], elements=segs,
255                        attribute=[
256                            topdl.Attribute(attribute="transit", value='true'),
257                            topdl.Attribute(attribute="dynamic", value='true'),
258                            topdl.Attribute(attribute="testbed", 
259                                value='dragon'),
260                            topdl.Attribute(attribute="store_keys", 
261                                value=store_key),
262                            ]
263                        )
264
265    def create_dragon_substrate(self, sub, topo, tbs, tbparams, masters, eid,
266            connInfo, expid=None):
267        """
268        Add attribiutes to the various elements indicating that they are to be
269        dragon connected and create a dragon segment in topo to be
270        instantiated.
271        """
272
273        def get_substrate_from_topo(name, t):
274            for s in t.substrates:
275                if s.name == name: return s
276            else: return None
277
278
279        seer_master = None
280        for k, s in masters.items():
281            if 'SEER' in s:
282                seer_master = k
283                break
284
285        if seer_master:
286            mdomain = tbparams[seer_master].get('domain', '.example.com')
287            mproject = tbparams[seer_master].get('project', 'project')
288        else:
289            mdomain = '.example.com'
290            mproject = 'project'
291
292        # dn is the number of previously created dragon nets.  This routine
293        # creates a net numbered by dn
294        dn = len([x for x in topo.keys() if x.startswith('dragon')])
295        # Count the number of interfaces on this substrate in each testbed from
296        # the global topology
297        count = { }
298        node = { }
299        for e in [ i.element for i in sub.interfaces ]:
300            tb = e.get_attribute('testbed')
301            count[tb] = count.get(tb, 0) + 1
302            node[tb] = i.get_attribute('ip4_address')
303
304
305        # Set the attributes in the copies that will allow setup of dragon
306        # connections.
307        for tb in tbs.keys():
308            s = get_substrate_from_topo(sub.name, topo[tb])
309            if s:
310                if not connInfo.has_key(tb):
311                    connInfo[tb] = [ ]
312
313                try:
314                    aid = tbparams[tb]['allocID']['fedid']
315                except:
316                    self.log.debug("[creat_dragon_substrate] " + 
317                            "Can't get alloc id for %s?" %tb)
318                    aid = None
319
320                # This may need another look, but only a service gateway will
321                # look at the active parameter, and these are only inserted to
322                # connect to a master.
323                active = "%s" % ( tb in masters)
324                info = {
325                        'type': 'transit',
326                        'member': [ {
327                            'element': i.element.name[0], 
328                            'interface': i.name
329                            } for i in s.interfaces \
330                                    if isinstance(i.element, topdl.Computer) ],
331                        'fedAttr': [ 
332                            { 'attribute': 'masterdomain', 'value': mdomain},
333                            { 'attribute': 'masterexperiment', 'value': 
334                                "%s/%s" % (mproject, eid)},
335                            { 'attribute': 'active', 'value': active},
336                            ],
337                        'parameter': [ {
338                            'name': 'vlan_id',
339                            'key': 'fedid:%s/vlan%d' % (expid, dn),
340                            'store': self.store_url,
341                            'type': 'input',
342                            } ]
343                        }
344                if tbs.has_key(tb):
345                    info['peer'] = tbs[tb]
346                connInfo[tb].append(info)
347
348                # Give this allocation the rights to access the key of the
349                # vlan_id
350                if aid:
351                    self.auth.set_attribute(aid, 
352                            'fedid:%s/vlan%d' % (expid, dn))
353            else:
354                raise service_error(service_error.internal,
355                        "No substrate %s in testbed %s" % (sub.name, tb))
356
357        self.new_dragon_topo(dn, sub, topo, tbs, tbparams, connInfo, expid)
358
359    def insert_internet_portals(self, sub, topo, tbs, tbparams, masters, eid,
360            segment_substrate, portals, connInfo, expid):
361        # More than one testbed is on this substrate.  Insert
362        # some portals into the subtopologies.  st == source testbed,
363        # dt == destination testbed.
364        for st in tbs.keys():
365            if not segment_substrate.has_key(st):
366                segment_substrate[st] = { }
367            if not portals.has_key(st): 
368                portals[st] = { }
369            if not connInfo.has_key(st):
370                connInfo[st] = [ ]
371            for dt in [ t for t in tbs.keys() if t != st]:
372                sproject = tbparams[st].get('project', 'project')
373                dproject = tbparams[dt].get('project', 'project')
374                sdomain = tbparams[st].get('domain', ".example.com")
375                ddomain = tbparams[dt].get('domain', ".example.com")
376                aid = tbparams[dt]['allocID']['fedid']
377
378                seer_master = None
379                for k, s in masters.items():
380                    if 'SEER' in s:
381                        seer_master = k
382                        break
383
384                if seer_master:
385                    mdomain = tbparams[seer_master].get('domain', '.example.com')
386                    mproject = tbparams[seer_master].get('project', 'project')
387                    muser = tbparams[seer_master].get('user', 'root')
388                    smbshare = tbparams[seer_master].get('smbshare', 'USERS')
389                else:
390                    mdomain = '.example.com'
391                    mproject = 'project'
392                    muser = 'root'
393                    smbshare = 'USERS'
394
395                if (st in masters  and dt not in masters) or \
396                        (st not in masters and dt in masters):
397                    active = ("%s" % (st in masters))
398                else:
399                    active = ("%s" %(st > dt))
400
401                if not segment_substrate[st].has_key(dt):
402                    # Put a substrate and a segment for the connected
403                    # testbed in there.
404                    tsubstrate, segment_element = \
405                            self.new_portal_substrate(st, dt, eid, tbparams,
406                                    expid)
407                    segment_substrate[st][dt] = tsubstrate
408                    topo[st].substrates.append(tsubstrate)
409                    topo[st].elements.append(segment_element)
410
411                new_portal = False
412                if portals[st].has_key(dt):
413                    # There's a portal set up to go to this destination.
414                    # See if there's room to multiplex this connection on
415                    # it.  If so, add an interface to the portal; if not,
416                    # set up to add a portal below.
417                    # [This little festival of braces is just a pop of the
418                    # last element in the list of portals between st and
419                    # dt.]
420                    portal = portals[st][dt][-1]
421                    mux = len([ i for i in portal.interface \
422                            if not i.get_attribute('portal')])
423                    if mux == self.muxmax:
424                        new_portal = True
425                        portal_type = "experiment"
426                        myname = "%stunnel%d" % (dt, len(portals[st][dt]))
427                        desthost = "%stunnel%d" % (st.lower(), 
428                                len(portals[st][dt]))
429                    else:
430                        new_i = topdl.Interface(
431                                substrate=sub.name,
432                                attribute=[ 
433                                    topdl.Attribute(
434                                        attribute='ip4_address', 
435                                        value=tbs[dt]
436                                    )
437                                ])
438                        portal.interface.append(new_i)
439                else:
440                    # First connection to this testbed, make an empty list
441                    # and set up to add the new portal below
442                    new_portal = True
443                    portals[st][dt] = [ ]
444                    myname = "%stunnel%d" % (dt, len(portals[st][dt]))
445                    desthost = "%stunnel%d" % (st.lower(), len(portals[st][dt]))
446
447                    if dt in masters or st in masters: portal_type = "both"
448                    else: portal_type = "experiment"
449
450                if new_portal:
451                    infs = (
452                            (segment_substrate[st][dt].name, 
453                                (('portal', 'true'),)),
454                            (sub.name, 
455                                (('ip4_address', tbs[dt]),))
456                        )
457                    portal, info  =  self.new_portal_node(st, dt, tbparams, 
458                            masters, eid, myname, desthost, portal_type,
459                            infs, conn_type="ssh", conn_attrs=[], expid=expid)
460
461                    topo[st].elements.append(portal)
462                    portals[st][dt].append(portal)
463                    connInfo[st].append(info)
464
465    def add_control_portal(self, st, dt, masters, eid, topo, tbparams, connInfo, expid):
466        # Add to the master testbed
467        tsubstrate, segment_element = \
468                self.new_portal_substrate(st, dt, eid, tbparams, expid)
469        myname = "%stunnel" % dt
470        desthost = "%stunnel" % st
471
472        portal, info = self.new_portal_node(st, dt, tbparams, masters,
473                eid, myname, desthost, "control", 
474                ((tsubstrate.name,(('portal','true'),)),), conn_type="ssh",
475                conn_attrs=[], expid=expid)
476
477        topo[st].substrates.append(tsubstrate)
478        topo[st].elements.append(segment_element)
479        topo[st].elements.append(portal)
480        if not connInfo.has_key(st):
481            connInfo[st] = [ ]
482        connInfo[st].append(info)
483
484    def new_dragon_portal(self, st, dt, masters, eid, myip, dip, idx, 
485            substrate, tbparams, expid):
486        # Add to the master testbed
487        myname = "%stunnel" % dt
488        desthost = "%s" % ip_addr(dip)
489
490        portal, info = self.new_portal_node(st, dt, tbparams, masters,
491                eid, myname, desthost, "control", 
492                ((substrate.name,(
493                    ('portal','true'),
494                    ('ip4_address', "%s" % ip_addr(myip)),)),),
495                conn_type="transit", conn_attrs=[], expid=expid)
496
497        return portal
498
499    def add_portals(self, top, topo, eid, masters, tbparams, ip_allocator, 
500            connInfo, expid):
501        """
502        For each substrate in the main topology, find those that
503        have nodes on more than one testbed.  Insert portal nodes
504        into the copies of those substrates on the sub topologies.
505        """
506        segment_substrate = { }
507        portals = { }
508        for s in top.substrates:
509            # tbs will contain an ip address on this subsrate that is in
510            # each testbed.
511            tbs = { }
512            for i in s.interfaces:
513                e = i.element
514                tb = e.get_attribute('testbed')
515                if tb and not tbs.has_key(tb):
516                    for i in e.interface:
517                        if s in i.subs:
518                            tbs[tb]= i.get_attribute('ip4_address')
519            if len(tbs) < 2:
520                continue
521
522            # DRAGON will not create multi-site vlans yet
523            if len(tbs) == 2 and \
524                    all([tbparams[x].has_key('dragon') for x in tbs]):
525                self.create_dragon_substrate(s, topo, tbs, tbparams, 
526                        masters, eid, connInfo, expid)
527            else:
528                self.insert_internet_portals(s, topo, tbs, tbparams, masters,
529                        eid, segment_substrate, portals, connInfo, expid)
530
531        # Make sure that all the service importers have a control portal back
532        # to the master for each service.
533        #XXX you're here
534        for tb in [ t for t in tbparams.keys() if t != master ]:
535            if len([e for e in topo[tb].elements \
536                    if isinstance(e, topdl.Computer) and \
537                    e.get_attribute('portal') and \
538                    e.get_attribute('portal_type') == 'both']) == 0:
539
540                if tbparams[master].has_key('dragon') \
541                        and tbparams[tb].has_key('dragon'):
542
543                    idx = len([x for x in topo.keys() \
544                            if x.startswith('dragon')])
545                    dip, leng = ip_allocator.allocate(4)
546                    dip += 1
547                    mip = dip+1
548                    csub = topdl.Substrate(
549                            name="dragon-control-%s" % tb,
550                            capacity=topdl.Capacity(100000.0, 'max'),
551                            attribute=[
552                                topdl.Attribute(
553                                    attribute='portal',
554                                    value='true'
555                                    )
556                                ]
557                            )
558                    seg = topdl.Segment(
559                            id= tbparams[master]['allocID'],
560                            type='emulab',
561                            uri = self.tbmap.get(master, None),
562                            interface=[ 
563                                topdl.Interface(
564                                    substrate=csub.name),
565                                ],
566                            attribute = [
567                                topdl.Attribute(attribute=n, value=v)
568                                    for n, v in (\
569                                        ('domain', 
570                                            tbparams[master].get('domain',
571                                                ".example.com")),
572                                        ('experiment', "%s/%s" % \
573                                                (tbparams[master].get(
574                                                    'project', 
575                                                    'project'), 
576                                                    eid)),)
577                                ],
578                            )
579                    portal = self.new_dragon_portal(tb, master,
580                            master, eid, dip, mip, idx, csub, tbparams, expid)
581                    topo[tb].substrates.append(csub)
582                    topo[tb].elements.append(portal)
583                    topo[tb].elements.append(seg)
584
585                    mcsub = csub.clone()
586                    seg = topdl.Segment(
587                            id= tbparams[tb]['allocID'],
588                            type='emulab',
589                            uri = self.tbmap.get(tb, None),
590                            interface=[ 
591                                topdl.Interface(
592                                    substrate=csub.name),
593                                ],
594                            attribute = [
595                                topdl.Attribute(attribute=n, value=v)
596                                    for n, v in (\
597                                        ('domain', 
598                                            tbparams[tb].get('domain',
599                                                ".example.com")),
600                                        ('experiment', "%s/%s" % \
601                                                (tbparams[tb].get('project', 
602                                                    'project'), 
603                                                    eid)),)
604                                ],
605                            )
606                    portal = self.new_dragon_portal(master, tb, master,
607                            eid, mip, dip, idx, mcsub, tbparams, expid)
608                    topo[master].substrates.append(mcsub)
609                    topo[master].elements.append(portal)
610                    topo[master].elements.append(seg)
611                    for t in (master, tb):
612                        topo[t].incorporate_elements()
613
614                    self.create_dragon_substrate(csub, topo, 
615                            {tb: ip_addr(mip), master: ip_addr(dip)}, 
616                            tbparams, master, eid, connInfo,
617                            expid)
618                else:
619                    self.add_control_portal(master, tb, master, eid, topo, 
620                            tbparams, connInfo, expid)
621                    self.add_control_portal(tb, master, master, eid, topo, 
622                            tbparams, connInfo, expid)
623
624        # Connect the portal nodes into the topologies and clear out
625        # substrates that are not in the topologies
626        for tb in tbparams.keys():
627            topo[tb].incorporate_elements()
628            topo[tb].substrates = \
629                    [s for s in topo[tb].substrates \
630                        if len(s.interfaces) >0]
631
Note: See TracBrowser for help on using the repository browser.