Ignore:
Timestamp:
Aug 28, 2009 6:07:42 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
6c57fe9
Parents:
4c8a0b7
Message:

checkpoint

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    r4c8a0b7 rcc8d8e9  
    196196    call_RequestAccess = service_caller('RequestAccess')
    197197    call_ReleaseAccess = service_caller('ReleaseAccess')
     198    call_StartSegment = service_caller('StartSegment')
    198199    call_Ns2Split = service_caller('Ns2Split')
    199200
     
    230231        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
    231232                or config.get("globals", "trusted_certs")
     233
     234        # XXX:
     235        self.repodir = '/usr/local/etc/fedd/repo'
    232236
    233237        self.exp_stem = "fed-stem"
     
    403407        State format is a simple pickling of the state dictionary.
    404408        """
     409       
     410        def get_experiment_id(state):
     411            """
     412            Pull the fedid experimentID out of the saved state.  This is kind
     413            of a gross walk through the dict.
     414            """
     415
     416            if state.has_key('experimentID'):
     417                for e in state['experimentID']:
     418                    if e.has_key('fedid'):
     419                        return e['fedid']
     420                else:
     421                    return None
     422            else:
     423                return None
     424
     425        def get_alloc_ids(state):
     426            """
     427            Pull the fedids of the identifiers of each allocation from the
     428            state.  Again, a dict dive that's best isolated.
     429            """
     430
     431            return [ f['allocID']['fedid']
     432                    for f in state.get('federant',[]) \
     433                        if f.has_key('allocID') and \
     434                            f['allocID'].has_key('fedid')]
     435
     436
    405437        try:
    406438            f = open(self.state_filename, "r")
     
    415447                    "Unpickling failed: %s") % e)
    416448       
    417         for k in self.state.keys():
     449        for s in self.state.values():
    418450            try:
    419                 # This list should only have one element in it, but phrasing it
    420                 # as a for loop doesn't cost much, really.  We have to find the
    421                 # fedid elements anyway.
    422                 for eid in [ f['fedid'] \
    423                         for f in self.state[k]['experimentID']\
    424                             if f.has_key('fedid') ]:
    425                     self.auth.set_attribute(self.state[k]['owner'], eid)
     451
     452                eid = get_experiment_id(s)
     453                if eid :
     454                    # Give the owner rights to the experiment
     455                    self.auth.set_attribute(s['owner'], eid)
     456                    # And holders of the eid as well
     457                    self.auth.set_attribute(eid, eid)
    426458                    # allow overrides to control experiments as well
    427459                    for o in self.overrides:
    428460                        self.auth.set_attribute(o, eid)
     461                    # Set permissions to allow reading of the software repo, if
     462                    # any, as well.
     463                    for a in get_alloc_ids(s):
     464                        self.auth.set_attribute(a, 'repo/%s' % eid)
     465                else:
     466                    raise KeyError("No experiment id")
    429467            except KeyError, e:
    430468                self.log.warning("[read_state]: State ownership or identity " +\
     
    9701008            nodes = [ n['vname'] for n in topo['node'] ]
    9711009            topo_lans = topo['lan']
    972         except KeyError:
    973             raise service_error(service_error.internal, "Bad topology")
     1010        except KeyError, e:
     1011            raise service_error(service_error.internal, "Bad topology: %s" %e)
    9741012
    9751013        lans = { }
     
    21032141
    21042142        return rv
     2143
     2144    class new_start_segment:
     2145        def __init__(self, debug=False, log=None, cert_file=None,
     2146                cert_pwd=None, trusted_certs=None, caller=None):
     2147            self.log = log
     2148            self.debug = debug
     2149            self.cert_file = cert_file
     2150            self.cert_pwd = cert_pwd
     2151            self.trusted_certs = None
     2152            self.caller = caller
     2153
     2154        def __call__(self, uri, aid, topo):
     2155            req = {
     2156                    'allocID': { 'fedid' : aid },
     2157                    'segmentdescription': {
     2158                        'topdldescription': topo.to_dict(),
     2159                    },
     2160                }
     2161
     2162            print "calling %s"  % uri
     2163            r = self.caller(uri, req, self.cert_file, self.cert_pwd,
     2164                    self.trusted_certs)
     2165            print r
     2166            return True
     2167
     2168
    21052169   
     2170
     2171    def new_allocate_resources(self, allocated, master, eid, expid, expcert,
     2172            tbparams, topo, tmpdir, alloc_log=None):
     2173        started = { }           # Testbeds where a sub-experiment started
     2174                                # successfully
     2175
     2176        # XXX
     2177        fail_soft = False
     2178
     2179        log = alloc_log or self.log
     2180
     2181        thread_pool = self.thread_pool(self.nthreads)
     2182        threads = [ ]
     2183
     2184        for tb in [ k for k in allocated.keys() if k != master]:
     2185            # Create and start a thread to start the segment, and save it to
     2186            # get the return value later
     2187            thread_pool.wait_for_slot()
     2188            uri = self.tbmap.get(tb, None)
     2189            if not uri:
     2190                raise service_error(service_error.internal,
     2191                        "Unknown testbed %s !?" % tb)
     2192
     2193            if tbparams[tb].has_key('allocID') and \
     2194                    tbparams[tb]['allocID'].has_key('fedid'):
     2195                aid = tbparams[tb]['allocID']['fedid']
     2196            else:
     2197                raise service_error(service_error.internal,
     2198                        "No alloc id for testbed %s !?" % tb)
     2199
     2200            t  = self.pooled_thread(\
     2201                    target=self.new_start_segment(log=log, debug=self.debug,
     2202                        cert_file=self.cert_file, cert_pwd=self.cert_pwd,
     2203                        trusted_certs=self.trusted_certs,
     2204                        caller=self.call_StartSegment),
     2205                    args=(uri, aid, topo[tb]), name=tb,
     2206                    pdata=thread_pool, trace_file=self.trace_file)
     2207            threads.append(t)
     2208            t.start()
     2209
     2210        # Wait until all finish
     2211        thread_pool.wait_for_all_done()
     2212
     2213        # If none failed, start the master
     2214        failed = [ t.getName() for t in threads if not t.rv ]
     2215
     2216        if len(failed) == 0:
     2217            uri = self.tbmap.get(master, None)
     2218            if not uri:
     2219                raise service_error(service_error.internal,
     2220                        "Unknown testbed %s !?" % master)
     2221
     2222            if tbparams[master].has_key('allocID') and \
     2223                    tbparams[master]['allocID'].has_key('fedid'):
     2224                aid = tbparams[master]['allocID']['fedid']
     2225            else:
     2226                raise service_error(service_error.internal,
     2227                    "No alloc id for testbed %s !?" % master)
     2228            starter = self.new_start_segment(log=log, debug=self.debug,
     2229                    cert_file=self.cert_file, cert_pwd=self.cert_pwd,
     2230                    trusted_certs=self.trusted_certs,
     2231                    caller=self.call_StartSegment)
     2232            if not starter(uri, aid, topo[master]):
     2233                failed.append(master)
     2234
     2235        succeeded = [tb for tb in allocated.keys() if tb not in failed]
     2236        # If one failed clean up, unless fail_soft is set
     2237        if failed and False:
     2238            if not fail_soft:
     2239                thread_pool.clear()
     2240                for tb in succeeded:
     2241                    # Create and start a thread to stop the segment
     2242                    thread_pool.wait_for_slot()
     2243                    t  = self.pooled_thread(\
     2244                            target=self.stop_segment(log=log,
     2245                                keyfile=self.ssh_privkey_file,
     2246                                debug=self.debug),
     2247                            args=(tb, eid, tbparams), name=tb,
     2248                            pdata=thread_pool, trace_file=self.trace_file)
     2249                    t.start()
     2250                # Wait until all finish
     2251                thread_pool.wait_for_all_done()
     2252
     2253                # release the allocations
     2254                for tb in tbparams.keys():
     2255                    self.release_access(tb, tbparams[tb]['allocID'])
     2256                # Remove the placeholder
     2257                self.state_lock.acquire()
     2258                self.state[eid]['experimentStatus'] = 'failed'
     2259                if self.state_filename: self.write_state()
     2260                self.state_lock.release()
     2261
     2262                log.error("Swap in failed on %s" % ",".join(failed))
     2263                return
     2264        else:
     2265            log.info("[start_segment]: Experiment %s active" % eid)
     2266
     2267        log.debug("[start_experiment]: removing %s" % tmpdir)
     2268
     2269        # Walk up tmpdir, deleting as we go
     2270        for path, dirs, files in os.walk(tmpdir, topdown=False):
     2271            for f in files:
     2272                os.remove(os.path.join(path, f))
     2273            for d in dirs:
     2274                os.rmdir(os.path.join(path, d))
     2275        os.rmdir(tmpdir)
     2276
     2277        # Insert the experiment into our state and update the disk copy
     2278        self.state_lock.acquire()
     2279        self.state[expid]['experimentStatus'] = 'active'
     2280        self.state[eid] = self.state[expid]
     2281        if self.state_filename: self.write_state()
     2282        self.state_lock.release()
     2283        return
     2284
    21062285
    21072286    def new_create_experiment(self, req, fid):
     
    22742453
    22752454            allocated = { }         # Testbeds we can access
    2276 # XXX here's where we're working
    2277             def out_topo(filename, t):
    2278                 try:
    2279                     f = open("/tmp/%s" % filename, "w")
    2280                     print >> f, "%s" % \
    2281                             topdl.topology_to_xml(t, top="experiment")
    2282                     f.close()
    2283                 except IOError, e:
    2284                     raise service_error(service_error.internal, "Can't open file")
    2285 
    2286             try:
    2287 
    2288                 top = topdl.topology_from_xml(file=split_data, top="experiment")
    2289                 subs = sorted(top.substrates,
    2290                         cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
    2291                         reverse=True)
    2292                 ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
    2293                 for s in subs:
    2294                     a = ips.allocate(len(s.interfaces)+2)
    2295                     if a :
    2296                         base, num = a
    2297                         if num < len(s.interfaces) +2 :
    2298                             raise service_error(service_error.internal,
    2299                                     "Allocator returned wrong number of IPs??")
    2300                     else:
    2301                         raise service_error(service_error.req,
    2302                                 "Cannot allocate IP addresses")
    2303 
    2304                     base += 1
    2305                     for i in s.interfaces:
    2306                         i.attribute.append(
    2307                                 topdl.Attribute('ip4_address',
    2308                                     "%s" % ip_addr(base)))
    2309                         base += 1
    2310 
    2311                 testbeds = set([ a.value for e in top.elements \
    2312                         for a in e.attribute \
    2313                             if a.attribute == 'testbed'] )
    2314                 topo ={ }
    2315                 for tb in testbeds:
    2316                     self.get_access(tb, None, user, tbparams, master,
    2317                             export_project, access_user)
    2318                     topo[tb] = top.clone()
    2319                     to_delete = [ ]
    2320                     for e in topo[tb].elements:
    2321                         etb = e.get_attribute('testbed')
    2322                         if etb and etb != tb:
    2323                             for i in e.interface:
    2324                                 for s in i.subs:
    2325                                     try:
    2326                                         s.interfaces.remove(i)
    2327                                     except ValueError:
    2328                                         raise service_error(service_error.internal,
    2329                                                 "Can't remove interface??")
    2330                             to_delete.append(e)
    2331                     for e in to_delete:
    2332                         topo[tb].elements.remove(e)
    2333                     topo[tb].make_indices()
    2334 
    2335 
    2336 
    2337                 for s in top.substrates:
    2338                     tests = { }
    2339                     for i in s.interfaces:
    2340                         e = i.element
    2341                         tb = e.get_attribute('testbed')
    2342                         if tb and not tests.has_key(tb):
    2343                             for i in e.interface:
    2344                                 if s in i.subs:
    2345                                     tests[tb]= \
    2346                                             i.get_attribute('ip4_address')
    2347                     if len(tests) < 2:
    2348                         continue
    2349 
    2350                     # More than one testbed is on this substrate.  Insert
    2351                     # some gateways into the subtopologies.
    2352 
    2353                     for st in tests.keys():
    2354                         for dt in [ t for t in tests.keys() if t != st]:
    2355                             myname =  "%stunnel" % dt
    2356                             desthost  =  "%stunnel" % st
    2357                             sproject = tbparams[st].get('project', 'project')
    2358                             dproject = tbparams[dt].get('project', 'project')
    2359                             sdomain = ".%s.%s%s" % (eid, sproject,
    2360                                     tbparams[st].get('domain', ".example.com"))
    2361                             ddomain = ".%s.%s%s" % (eid, dproject,
    2362                                     tbparams[dt].get('domain', ".example.com"))
    2363                             boss = tbparams[master].get('boss', "boss")
    2364                             fs = tbparams[master].get('fs', "fs")
    2365                             event_server = "%s%s" % \
    2366                                     (tbparams[st].get('eventserver', "event_server"),
    2367                                             tbparams[dt].get('domain', "example.com"))
    2368                             remote_event_server = "%s%s" % \
    2369                                     (tbparams[dt].get('eventserver', "event_server"),
    2370                                             tbparams[dt].get('domain', "example.com"))
    2371                             seer_control = "%s%s" % \
    2372                                     (tbparams[st].get('control', "control"), sdomain)
    2373                             local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
    2374                             remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
    2375                             conf_file = "%s%s.gw.conf" % (myname, sdomain)
    2376                             remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
    2377                             # translate to lower case so the `hostname` hack for specifying
    2378                             # configuration files works.
    2379                             conf_file = conf_file.lower();
    2380                             remote_conf_file = remote_conf_file.lower();
    2381                             active = ("%s" % (st == master))
    2382                             portal = topdl.Computer(**{
    2383                                     'name': "%stunnel" % dt,
    2384                                     'attribute' : [{
    2385                                         'attribute': n,
    2386                                         'value': v,
    2387                                         } for n, v in (\
    2388                                                 ('gateway', 'true'),
    2389                                                 ('boss', boss),
    2390                                                 ('fs', fs),
    2391                                                 ('event_server', event_server),
    2392                                                 ('remote_event_server', remote_event_server),
    2393                                                 ('seer_control', seer_control),
    2394                                                 ('local_key_dir', local_key_dir),
    2395                                                 ('remote_conf_dir', remote_conf_dir),
    2396                                                 ('conf_file', conf_file),
    2397                                                 ('remote_conf_file', remote_conf_file),
    2398                                                 ('remote_script_dir', "/usr/local/federation/bin"),
    2399                                                 ('local_script_dir', "/usr/local/federation/bin"),
    2400                                                 )],
    2401                                     'interface': [{
    2402                                         'substrate': s.name,
    2403                                         'attribute': [ {
    2404                                             'attribute': 'ip4_addreess',
    2405                                             'value': tests[dt],
    2406                                             }, ],
    2407                                         }, ],
    2408                                     })
    2409                             topo[st].elements.append(portal)
    2410                 # Connect the gateway nodes into the topologies and clear out
    2411                 # substrates that are not in the topologies
    2412                 for tb in testbeds:
    2413                     topo[tb].incorporate_elements()
    2414                     topo[tb].substrates = \
    2415                             [s for s in topo[tb].substrates \
    2416                                 if len(s.interfaces) >0]
    2417 
    2418                 softdir ="%s/software" % tmpdir
    2419                 softmap = { }
    2420                 os.mkdir(softdir)
    2421                 pkgs = set([fedkit, gatewaykit])
    2422                 pkgs.update([x.location for e in top.elements \
    2423                         for x in e.software])
    2424                 for pkg in pkgs:
    2425                     loc = pkg
    2426 
    2427                     scheme, host, path = urlparse(loc)[0:3]
    2428                     dest = os.path.basename(path)
    2429                     if not scheme:
    2430                         if not loc.startswith('/'):
    2431                             loc = "/%s" % loc
    2432                         loc = "file://%s" %loc
    2433                     try:
    2434                         u = urlopen(loc)
    2435                     except Exception, e:
    2436                         raise service_error(service_error.req,
    2437                                 "Cannot open %s: %s" % (loc, e))
    2438                     try:
    2439                         f = open("%s/%s" % (softdir, dest) , "w")
    2440                         data = u.read(4096)
    2441                         while data:
    2442                             f.write(data)
    2443                             data = u.read(4096)
    2444                         f.close()
    2445                         u.close()
    2446                     except Exception, e:
    2447                         raise service_error(service_error.internal,
    2448                                 "Could not copy %s: %s" % (loc, e))
    2449                     path = re.sub("/tmp", "", softdir)
    2450                     # XXX
    2451                     softmap[pkg] = \
    2452                             "https://users.isi.deterlab.net:23232/%s/%s" %\
    2453                             ( path, dest)
    2454 
    2455                 # Convert the software locations in the segments into the local
    2456                 # copies on this host
    2457                 for soft in [ s for tb in topo.values() \
    2458                         for e in tb.elements \
    2459                             for s in e.software ]:
    2460                     if softmap.has_key(soft.location):
    2461                         soft.location = softmap[soft.location]
    2462                 for tb in testbeds:
    2463                     out_topo("%s.xml" %tb, topo[tb])
    2464 
    2465                 vtopo = topdl.topology_to_vtopo(top)
    2466                 vis = self.genviz(vtopo)
    2467 
    2468             except Exception, e:
    2469                 traceback.print_exc()
    2470                 raise service_error(service_error.internal, "%s"  % e)
    2471 
    2472 
    2473 
    2474             # Build the testbed topologies:
    2475 
    2476 
    2477             if True:
    2478                 raise service_error(service_error.internal, "Developing")
    2479 
    2480 # XXX old code
    2481             # Objects to parse the splitter output (defined above)
    2482             parse_current_testbed = self.current_testbed(eid, tmpdir,
    2483                     self.fedkit, self.gatewaykit)
    2484             parse_allbeds = self.allbeds(self.get_access)
    2485             parse_gateways = self.gateways(eid, master, tmpdir,
    2486                     gw_pubkey_base, gw_secretkey_base, self.copy_file,
    2487                     self.fedkit)
    2488             parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
    2489                         "^#\s+End\s+Vtopo")
    2490             parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
    2491                         "^#\s+End\s+hostnames", tmpdir + "/hosts")
    2492             parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
    2493                     "^#\s+End\s+tarfiles")
    2494             parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
    2495                     "^#\s+End\s+rpms")
    2496 
    2497             # Working on the split data
    2498             for line in split_data:
    2499                 line = line.rstrip()
    2500                 if parse_current_testbed(line, master, allocated, tbparams):
    2501                     continue
    2502                 elif parse_allbeds(line, user, tbparams, master, export_project,
    2503                         access_user):
    2504                     continue
    2505                 elif parse_gateways(line, allocated, tbparams):
    2506                     continue
    2507                 elif parse_vtopo(line):
    2508                     continue
    2509                 elif parse_hostnames(line):
    2510                     continue
    2511                 elif parse_tarfiles(line):
    2512                     continue
    2513                 elif parse_rpms(line):
    2514                     continue
    2515                 else:
    2516                     raise service_error(service_error.internal,
    2517                             "Bad tcl parse? %s" % line)
    2518             # Virtual topology and visualization
    2519             vtopo = self.gentopo(parse_vtopo.str)
    2520             if not vtopo:
    2521                 raise service_error(service_error.internal,
    2522                         "Failed to generate virtual topology")
    2523 
    2524             vis = self.genviz(vtopo)
    2525             if not vis:
    2526                 raise service_error(service_error.internal,
    2527                         "Failed to generate visualization")
    2528 
    2529            
     2455            # Allocate IP addresses: The allocator is a buddy system memory
     2456            # allocator.  Allocate from the largest substrate to the
     2457            # smallest to make the packing more likely to work - i.e.
     2458            # avoiding internal fragmentation.
     2459            top = topdl.topology_from_xml(file=split_data, top="experiment")
     2460            subs = sorted(top.substrates,
     2461                    cmp=lambda x,y: cmp(len(x.interfaces),
     2462                        len(y.interfaces)),
     2463                    reverse=True)
     2464            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
     2465            for s in subs:
     2466                a = ips.allocate(len(s.interfaces)+2)
     2467                if a :
     2468                    base, num = a
     2469                    if num < len(s.interfaces) +2 :
     2470                        raise service_error(service_error.internal,
     2471                                "Allocator returned wrong number of IPs??")
     2472                else:
     2473                    raise service_error(service_error.req,
     2474                            "Cannot allocate IP addresses")
     2475
     2476                base += 1
     2477                for i in s.interfaces:
     2478                    i.attribute.append(
     2479                            topdl.Attribute('ip4_address',
     2480                                "%s" % ip_addr(base)))
     2481                    base += 1
     2482
     2483            # Find the testbeds to look up
     2484            testbeds = set([ a.value for e in top.elements \
     2485                    for a in e.attribute \
     2486                        if a.attribute == 'testbed'] )
     2487
     2488            # Make per testbed topologies.  Copy the main topo and remove
     2489            # interfaces and nodes that don't live in the testbed.
     2490            topo ={ }
     2491            for tb in testbeds:
     2492                self.get_access(tb, None, user, tbparams, master,
     2493                        export_project, access_user)
     2494                allocated[tb] = 1
     2495                topo[tb] = top.clone()
     2496                to_delete = [ ]
     2497                for e in topo[tb].elements:
     2498                    etb = e.get_attribute('testbed')
     2499                    if etb and etb != tb:
     2500                        for i in e.interface:
     2501                            for s in i.subs:
     2502                                try:
     2503                                    s.interfaces.remove(i)
     2504                                except ValueError:
     2505                                    raise service_error(service_error.internal,
     2506                                            "Can't remove interface??")
     2507                        to_delete.append(e)
     2508                for e in to_delete:
     2509                    topo[tb].elements.remove(e)
     2510                topo[tb].make_indices()
     2511
     2512
     2513
     2514            # Now, for each substrate in the main topology, find those that
     2515            # have nodes on more than one testbed.  Insert portal nodes
     2516            # into the copies of those substrates on the sub topologies.
     2517            for s in top.substrates:
     2518                tests = { }
     2519                for i in s.interfaces:
     2520                    e = i.element
     2521                    tb = e.get_attribute('testbed')
     2522                    if tb and not tests.has_key(tb):
     2523                        for i in e.interface:
     2524                            if s in i.subs:
     2525                                tests[tb]= \
     2526                                        i.get_attribute('ip4_address')
     2527                if len(tests) < 2:
     2528                    continue
     2529
     2530                # More than one testbed is on this substrate.  Insert
     2531                # some portals into the subtopologies.
     2532
     2533                for st in tests.keys():
     2534                    for dt in [ t for t in tests.keys() if t != st]:
     2535                        myname =  "%stunnel" % dt
     2536                        desthost  =  "%stunnel" % st
     2537                        sproject = tbparams[st].get('project', 'project')
     2538                        dproject = tbparams[dt].get('project', 'project')
     2539                        sdomain = ".%s.%s%s" % (eid, sproject,
     2540                                tbparams[st].get('domain', ".example.com"))
     2541                        ddomain = ".%s.%s%s" % (eid, dproject,
     2542                                tbparams[dt].get('domain', ".example.com"))
     2543                        boss = tbparams[master].get('boss', "boss")
     2544                        fs = tbparams[master].get('fs', "fs")
     2545                        event_server = "%s%s" % \
     2546                                (tbparams[st].get('eventserver', "event_server"),
     2547                                        tbparams[dt].get('domain', "example.com"))
     2548                        remote_event_server = "%s%s" % \
     2549                                (tbparams[dt].get('eventserver', "event_server"),
     2550                                        tbparams[dt].get('domain', "example.com"))
     2551                        seer_control = "%s%s" % \
     2552                                (tbparams[st].get('control', "control"), sdomain)
     2553                        local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
     2554                        remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
     2555                        conf_file = "%s%s.gw.conf" % (myname, sdomain)
     2556                        remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
     2557                        # translate to lower case so the `hostname` hack for specifying
     2558                        # configuration files works.
     2559                        conf_file = conf_file.lower();
     2560                        remote_conf_file = remote_conf_file.lower();
     2561                        active = ("%s" % (st == master))
     2562                        portal = topdl.Computer(**{
     2563                                'name': "%stunnel" % dt,
     2564                                'attribute' : [{
     2565                                    'attribute': n,
     2566                                    'value': v,
     2567                                    } for n, v in (\
     2568                                            ('gateway', 'true'),
     2569                                            ('boss', boss),
     2570                                            ('fs', fs),
     2571                                            ('event_server', event_server),
     2572                                            ('remote_event_server', remote_event_server),
     2573                                            ('seer_control', seer_control),
     2574                                            ('local_key_dir', local_key_dir),
     2575                                            ('remote_conf_dir', remote_conf_dir),
     2576                                            ('conf_file', conf_file),
     2577                                            ('remote_conf_file', remote_conf_file),
     2578                                            ('remote_script_dir', "/usr/local/federation/bin"),
     2579                                            ('local_script_dir', "/usr/local/federation/bin"),
     2580                                            )],
     2581                                'interface': [{
     2582                                    'substrate': s.name,
     2583                                    'attribute': [ {
     2584                                        'attribute': 'ip4_addreess',
     2585                                        'value': tests[dt],
     2586                                        }, ],
     2587                                    }, ],
     2588                                })
     2589                        topo[st].elements.append(portal)
     2590            # Connect the gateway nodes into the topologies and clear out
     2591            # substrates that are not in the topologies
     2592            for tb in testbeds:
     2593                topo[tb].incorporate_elements()
     2594                topo[tb].substrates = \
     2595                        [s for s in topo[tb].substrates \
     2596                            if len(s.interfaces) >0]
     2597
     2598            # Copy the rpms and tarfiles to a distribution directory from
     2599            # which the federants can retrieve them
     2600            linkpath = "%s/software" %  expid
     2601            softdir ="%s/%s" % ( self.repodir, linkpath)
     2602            softmap = { }
     2603            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
     2604                    for p, t in l ])
     2605            pkgs.update([x.location for e in top.elements \
     2606                    for x in e.software])
     2607            try:
     2608                os.makedirs(softdir)
     2609            except IOError, e:
     2610                raise service_error(
     2611                        "Cannot create software directory: %s" % e)
     2612            for pkg in pkgs:
     2613                loc = pkg
     2614
     2615                scheme, host, path = urlparse(loc)[0:3]
     2616                dest = os.path.basename(path)
     2617                if not scheme:
     2618                    if not loc.startswith('/'):
     2619                        loc = "/%s" % loc
     2620                    loc = "file://%s" %loc
     2621                try:
     2622                    u = urlopen(loc)
     2623                except Exception, e:
     2624                    raise service_error(service_error.req,
     2625                            "Cannot open %s: %s" % (loc, e))
     2626                try:
     2627                    f = open("%s/%s" % (softdir, dest) , "w")
     2628                    data = u.read(4096)
     2629                    while data:
     2630                        f.write(data)
     2631                        data = u.read(4096)
     2632                    f.close()
     2633                    u.close()
     2634                except Exception, e:
     2635                    raise service_error(service_error.internal,
     2636                            "Could not copy %s: %s" % (loc, e))
     2637                path = re.sub("/tmp", "", linkpath)
     2638                # XXX
     2639                softmap[pkg] = \
     2640                        "https://users.isi.deterlab.net:23232/%s/%s" %\
     2641                        ( path, dest)
     2642
     2643                # Allow the individual testbeds to access the software.
     2644                for tb in tbparams.keys():
     2645                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'],
     2646                            "%s/%s" % ( path, dest))
     2647
     2648            # Convert the software locations in the segments into the local
     2649            # copies on this host
     2650            for soft in [ s for tb in topo.values() \
     2651                    for e in tb.elements \
     2652                        for s in e.software ]:
     2653                if softmap.has_key(soft.location):
     2654                    soft.location = softmap[soft.location]
     2655
     2656            vtopo = topdl.topology_to_vtopo(top)
     2657            vis = self.genviz(vtopo)
     2658
    25302659            # save federant information
    25312660            for k in allocated.keys():
     
    25432672                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
    25442673                        if tbparams[tb].has_key('federant') ]
    2545             if self.state_filename: self.write_state()
     2674            if self.state_filename:
     2675                self.write_state()
    25462676            self.state_lock.release()
    2547 
    2548             # Copy tarfiles and rpms needed at remote sites into a staging area
    2549             try:
    2550                 if self.fedkit:
    2551                     for t in self.fedkit:
    2552                         parse_tarfiles.list.append(t[1])
    2553                 if self.gatewaykit:
    2554                     for t in self.gatewaykit:
    2555                         parse_tarfiles.list.append(t[1])
    2556                 for t in parse_tarfiles.list:
    2557                     if not os.path.exists("%s/tarfiles" % tmpdir):
    2558                         os.mkdir("%s/tarfiles" % tmpdir)
    2559                     self.copy_file(t, "%s/tarfiles/%s" % \
    2560                             (tmpdir, os.path.basename(t)))
    2561                 for r in parse_rpms.list:
    2562                     if not os.path.exists("%s/rpms" % tmpdir):
    2563                         os.mkdir("%s/rpms" % tmpdir)
    2564                     self.copy_file(r, "%s/rpms/%s" % \
    2565                             (tmpdir, os.path.basename(r)))
    2566                 # A null experiment file in case we need to create a remote
    2567                 # experiment from scratch
    2568                 f = open("%s/null.tcl" % tmpdir, "w")
    2569                 print >>f, """
    2570 set ns [new Simulator]
    2571 source tb_compat.tcl
    2572 
    2573 set a [$ns node]
    2574 
    2575 $ns rtproto Session
    2576 $ns run
    2577 """
    2578                 f.close()
    2579 
    2580             except IOError, e:
    2581                 raise service_error(service_error.internal,
    2582                         "Cannot stage tarfile/rpm: %s" % e.strerror)
    2583 
    25842677        except service_error, e:
    25852678            # If something goes wrong in the parse (usually an access error)
     
    25872680            # exceptions.  Failing at this point returns a fault to the remote
    25882681            # caller.
     2682
    25892683            self.state_lock.acquire()
    25902684            del self.state[eid]
     
    26162710       
    26172711        # Start a thread to do the resource allocation
    2618         t  = Thread(target=self.allocate_resources,
     2712        t  = Thread(target=self.new_allocate_resources,
    26192713                args=(allocated, master, eid, expid, expcert, tbparams,
    2620                     tmpdir, alloc_log),
     2714                    topo, tmpdir, alloc_log),
    26212715                name=eid)
    26222716        t.start()
Note: See TracChangeset for help on using the changeset viewer.