Ignore:
Timestamp:
Sep 2, 2009 10:36:18 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
f9ef40b
Parents:
cc8d8e9
Message:

checkpoint

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    rcc8d8e9 r6c57fe9  
    1818import xml.parsers.expat
    1919
    20 from threading import *
    21 from subprocess import *
     20from threading import Lock, Thread, Condition
     21from subprocess import call, Popen, PIPE
    2222
    2323from urlparse import urlparse
     
    232232                or config.get("globals", "trusted_certs")
    233233
    234         # XXX:
    235         self.repodir = '/usr/local/etc/fedd/repo'
     234        self.repodir = config.get("experiment_control", "repodir")
    236235
    237236        self.exp_stem = "fed-stem"
     
    21522151            self.caller = caller
    21532152
    2154         def __call__(self, uri, aid, topo):
     2153        def __call__(self, uri, aid, topo, attrs=None):
    21552154            req = {
    21562155                    'allocID': { 'fedid' : aid },
     
    21592158                    },
    21602159                }
    2161 
    2162             print "calling %s"  % uri
     2160            if attrs:
     2161                req['fedAttr'] = attrs
     2162
    21632163            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
    21642164                    self.trusted_certs)
     
    21702170
    21712171    def new_allocate_resources(self, allocated, master, eid, expid, expcert,
    2172             tbparams, topo, tmpdir, alloc_log=None):
     2172            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
    21732173        started = { }           # Testbeds where a sub-experiment started
    21742174                                # successfully
     
    22032203                        trusted_certs=self.trusted_certs,
    22042204                        caller=self.call_StartSegment),
    2205                     args=(uri, aid, topo[tb]), name=tb,
     2205                    args=(uri, aid, topo[tb], attrs), name=tb,
    22062206                    pdata=thread_pool, trace_file=self.trace_file)
    22072207            threads.append(t)
     
    22942294        to instantiate them and start it all up.
    22952295        """
     2296
     2297        def add_kit(e, kit):
     2298            """
     2299            Add a Software object created from the list of (install, location)
     2300            tuples passed as kit  to the software attribute of an object e.  We
     2301            do this enough to break out the code, but it's kind of a hack to
     2302            avoid changing the old tuple rep.
     2303            """
     2304
     2305            s = [ topdl.Software(install=i, location=l) for i, l in kit]
     2306
     2307            if isinstance(e.software, list): e.software.extend(s)
     2308            else: e.software = s
     2309
    22962310
    22972311        if not self.auth.check_attribute(fid, 'create'):
     
    24632477                    reverse=True)
    24642478            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
    2465             for s in subs:
     2479            ifs = { }
     2480            hosts = [ ]
     2481            # The config urlpath
     2482            configpath = "/%s/config" % expid
     2483            # The config file system location
     2484            configdir ="%s%s" % ( self.repodir, configpath)
     2485
     2486            for idx, s in enumerate(subs):
    24662487                a = ips.allocate(len(s.interfaces)+2)
    24672488                if a :
     
    24792500                            topdl.Attribute('ip4_address',
    24802501                                "%s" % ip_addr(base)))
     2502                    hname = i.element.name[0]
     2503                    if ifs.has_key(hname):
     2504                        hosts.append("%s\t%s-%s %s-%d" % \
     2505                                (ip_addr(base), hname, s.name, hname,
     2506                                    ifs[hname]))
     2507                    else:
     2508                        ifs[hname] = 0
     2509                        hosts.append("%s\t%s-%s %s-%d %s" % \
     2510                                (ip_addr(base), hname, s.name, hname,
     2511                                    ifs[hname], hname))
     2512
     2513                    ifs[hname] += 1
    24812514                    base += 1
    2482 
     2515            # save config files
     2516            try:
     2517                os.makedirs(configdir)
     2518            except IOError, e:
     2519                raise service_error(
     2520                        "Cannot create config directory: %s" % e)
    24832521            # Find the testbeds to look up
    24842522            testbeds = set([ a.value for e in top.elements \
    24852523                    for a in e.attribute \
    24862524                        if a.attribute == 'testbed'] )
     2525
    24872526
    24882527            # Make per testbed topologies.  Copy the main topo and remove
     
    25102549                topo[tb].make_indices()
    25112550
    2512 
     2551                for e in topo[tb].elements:
     2552                    if tb == master:
     2553                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
     2554                    else:
     2555                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
     2556                    scmd = e.get_attribute('startup')
     2557                    if scmd:
     2558                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
     2559
     2560                    e.set_attribute('startup', cmd)
     2561                    if self.fedkit: add_kit(e, self.fedkit)
     2562
     2563            # Copy configuration files into the remote file store
     2564            try:
     2565                f = open("%s/hosts" % configdir, "w")
     2566                f.write('\n'.join(hosts))
     2567                f.close()
     2568            except IOError, e:
     2569                raise service_error(service_error.internal,
     2570                        "Cannot write hosts file: %s" % e)
     2571            try:
     2572                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
     2573                        (configdir, gw_pubkey_base))
     2574                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
     2575                        (configdir, gw_secretkey_base))
     2576            except IOError, e:
     2577                raise service_error(service_error.internal,
     2578                        "Cannot copy keyfiles: %s" % e)
     2579
     2580            # Allow the individual testbeds to access the configuration files.
     2581            for tb in tbparams.keys():
     2582                asignee = tbparams[tb]['allocID']['fedid']
     2583                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
     2584                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
     2585                    print "assigned %s/%s" % (configpath, f)
    25132586
    25142587            # Now, for each substrate in the main topology, find those that
     
    25162589            # into the copies of those substrates on the sub topologies.
    25172590            for s in top.substrates:
    2518                 tests = { }
     2591                # tbs will contain an ip address on this subsrate that is in
     2592                # each testbed.
     2593                tbs = { }
    25192594                for i in s.interfaces:
    25202595                    e = i.element
    25212596                    tb = e.get_attribute('testbed')
    2522                     if tb and not tests.has_key(tb):
     2597                    if tb and not tbs.has_key(tb):
    25232598                        for i in e.interface:
    25242599                            if s in i.subs:
    2525                                 tests[tb]= \
    2526                                         i.get_attribute('ip4_address')
    2527                 if len(tests) < 2:
     2600                                tbs[tb]= i.get_attribute('ip4_address')
     2601                if len(tbs) < 2:
    25282602                    continue
    25292603
    25302604                # More than one testbed is on this substrate.  Insert
    2531                 # some portals into the subtopologies.
    2532 
    2533                 for st in tests.keys():
    2534                     for dt in [ t for t in tests.keys() if t != st]:
     2605                # some portals into the subtopologies.  st == source testbed,
     2606                # dt == destination testbed.
     2607                segment_substrate = { }
     2608                for st in tbs.keys():
     2609                    segment_substrate[st] = { }
     2610                    for dt in [ t for t in tbs.keys() if t != st]:
    25352611                        myname =  "%stunnel" % dt
    25362612                        desthost  =  "%stunnel" % st
     
    25412617                        ddomain = ".%s.%s%s" % (eid, dproject,
    25422618                                tbparams[dt].get('domain', ".example.com"))
    2543                         boss = tbparams[master].get('boss', "boss")
    2544                         fs = tbparams[master].get('fs', "fs")
    2545                         event_server = "%s%s" % \
    2546                                 (tbparams[st].get('eventserver', "event_server"),
    2547                                         tbparams[dt].get('domain', "example.com"))
    2548                         remote_event_server = "%s%s" % \
    2549                                 (tbparams[dt].get('eventserver', "event_server"),
    2550                                         tbparams[dt].get('domain', "example.com"))
    2551                         seer_control = "%s%s" % \
    2552                                 (tbparams[st].get('control', "control"), sdomain)
    2553                         local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
    2554                         remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
    2555                         conf_file = "%s%s.gw.conf" % (myname, sdomain)
    2556                         remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
    2557                         # translate to lower case so the `hostname` hack for specifying
    2558                         # configuration files works.
    2559                         conf_file = conf_file.lower();
    2560                         remote_conf_file = remote_conf_file.lower();
     2619                        mdomain = "%s.%s%s" % (eid,
     2620                                tbparams[master].get('project', 'project'),
     2621                                tbparams[master].get('domain', '.example.com'))
     2622                        # XXX: active and type need to be unkludged
    25612623                        active = ("%s" % (st == master))
    2562                         portal = topdl.Computer(**{
    2563                                 'name': "%stunnel" % dt,
    2564                                 'attribute' : [{
    2565                                     'attribute': n,
    2566                                     'value': v,
    2567                                     } for n, v in (\
    2568                                             ('gateway', 'true'),
    2569                                             ('boss', boss),
    2570                                             ('fs', fs),
    2571                                             ('event_server', event_server),
    2572                                             ('remote_event_server', remote_event_server),
    2573                                             ('seer_control', seer_control),
    2574                                             ('local_key_dir', local_key_dir),
    2575                                             ('remote_conf_dir', remote_conf_dir),
    2576                                             ('conf_file', conf_file),
    2577                                             ('remote_conf_file', remote_conf_file),
    2578                                             ('remote_script_dir', "/usr/local/federation/bin"),
    2579                                             ('local_script_dir', "/usr/local/federation/bin"),
    2580                                             )],
    2581                                 'interface': [{
    2582                                     'substrate': s.name,
    2583                                     'attribute': [ {
    2584                                         'attribute': 'ip4_addreess',
    2585                                         'value': tests[dt],
    2586                                         }, ],
    2587                                     }, ],
    2588                                 })
     2624                        if not segment_substrate[st].has_key(dt):
     2625                            # Put a substrate and a segment for the connected
     2626                            # testbed in there.
     2627                            tsubstrate = \
     2628                                    topdl.Substrate(name='%s-%s' % (st, dt))
     2629                            segment_element = topdl.Segment(
     2630                                    id= tbparams[dt]['allocID'],
     2631                                    type='emulab',
     2632                                    uri = self.tbmap.get(dt, None),
     2633                                    interface=[
     2634                                        topdl.Interface(
     2635                                            substrate=tsubstrate.name),
     2636                                        ],
     2637                                    attribute = [
     2638                                        topdl.Attribute(attribute=n, value=v)
     2639                                            for n, v in (\
     2640                                                ('domain', ddomain),
     2641                                                ('experiment', "%s/%s" % \
     2642                                                        (dproject, eid)),)
     2643                                        ],
     2644                                    )
     2645                            segment_substrate[st][dt] = tsubstrate
     2646                            topo[st].substrates.append(tsubstrate)
     2647                            topo[st].elements.append(segment_element)
     2648                        portal = topdl.Computer(
     2649                                name="%stunnel" % dt,
     2650                                attribute=[
     2651                                    topdl.Attribute(attribute=n,value=v)
     2652                                        for n, v in (\
     2653                                            ('portal', 'true'),
     2654                                            ('masterdomain', mdomain),
     2655                                            ('experiment', "%s/%s" % \
     2656                                                    (sproject, eid)),
     2657                                            ('peer', "%s.%s" % \
     2658                                                    (desthost, ddomain)),
     2659                                            ('scriptdir',
     2660                                                "/usr/local/federation/bin"),
     2661                                            ('active', "%s" % active),
     2662                                            ('type', 'both'),
     2663                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), sdomain.lower())))
     2664                                    ],
     2665                                interface=[
     2666                                    topdl.Interface(
     2667                                        substrate=s.name,
     2668                                        attribute=[
     2669                                            topdl.Attribute(
     2670                                                attribute='ip4_addreess',
     2671                                                value=tbs[dt]
     2672                                            )
     2673                                        ]),
     2674                                    topdl.Interface(
     2675                                        substrate=\
     2676                                            segment_substrate[st][dt].name
     2677                                        ),
     2678                                    ],
     2679                                )
     2680                        if self.fedkit: add_kit(portal, self.fedkit)
     2681                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
     2682
    25892683                        topo[st].elements.append(portal)
     2684
    25902685            # Connect the gateway nodes into the topologies and clear out
    25912686            # substrates that are not in the topologies
     
    26262721                try:
    26272722                    f = open("%s/%s" % (softdir, dest) , "w")
     2723                    self.log.debug("Writing %s/%s" % (softdir,dest) )
    26282724                    data = u.read(4096)
    26292725                    while data:
     
    26442740                for tb in tbparams.keys():
    26452741                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'],
    2646                             "%s/%s" % ( path, dest))
     2742                            "/%s/%s" % ( path, dest))
    26472743
    26482744            # Convert the software locations in the segments into the local
     
    26502746            for soft in [ s for tb in topo.values() \
    26512747                    for e in tb.elements \
    2652                         for s in e.software ]:
     2748                        if getattr(e, 'software', False) \
     2749                            for s in e.software ]:
    26532750                if softmap.has_key(soft.location):
    26542751                    soft.location = softmap[soft.location]
     
    27092806        alloc_log.addHandler(h)
    27102807       
     2808        # XXX
     2809        url_base = 'https://users.isi.deterlab.net:23232'
     2810        attrs = [
     2811                {
     2812                    'attribute': 'ssh_pubkey',
     2813                    'value': '%s/%s/config/%s' % \
     2814                            (url_base, expid, gw_pubkey_base)
     2815                },
     2816                {
     2817                    'attribute': 'ssh_secretkey',
     2818                    'value': '%s/%s/config/%s' % \
     2819                            (url_base, expid, gw_secretkey_base)
     2820                },
     2821                {
     2822                    'attribute': 'hosts',
     2823                    'value': '%s/%s/config/hosts' % \
     2824                            (url_base, expid)
     2825                },
     2826            ]
     2827
    27112828        # Start a thread to do the resource allocation
    27122829        t  = Thread(target=self.new_allocate_resources,
    27132830                args=(allocated, master, eid, expid, expcert, tbparams,
    2714                     topo, tmpdir, alloc_log),
     2831                    topo, tmpdir, alloc_log, attrs),
    27152832                name=eid)
    27162833        t.start()
     
    27742891
    27752892    def get_handler(self, path, fid):
    2776         print "in get_handler %s %s" % (path, fid)
    2777         return ("/users/faber/test.html", "text/html")
     2893        print "%s" %  path
     2894        if self.auth.check_attribute(fid, path):
     2895            return ("%s/%s" % (self.repodir, path), "application/binary")
     2896        else:
     2897            return (None, None)
    27782898
    27792899    def get_vtopo(self, req, fid):
Note: See TracChangeset for help on using the changeset viewer.