Changeset 0ea11af


Ignore:
Timestamp:
Oct 9, 2008 2:08:28 PM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
0b466d1
Parents:
11a08b0
Message:

clean up and add some docs

Location:
fedd
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd.py

    r11a08b0 r0ea11af  
    2121from signal import signal, pause, SIGINT, SIGTERM
    2222from select import select
     23from time import sleep
    2324import logging
    2425
     
    3031
    3132class fedd_server(ThreadingSSLServer):
     33    """
     34    Interface the fedd services to the XMLRPC and SOAP interfaces
     35    """
    3236    def __init__(self, ME, handler, ssl_ctx, impl):
     37        """
     38        Create an SSL server that handles the transport in handler using the
     39        credentials in ssl_ctx, and interfacing to the implementation of fedd
     40        services in fedd.  ME is the host port pair on which to bind.
     41        """
    3342        ThreadingSSLServer.__init__(self, ME, handler, ssl_ctx)
    3443        self.impl = impl
     
    3746
    3847class fedd_soap_handler(BaseHTTPRequestHandler):
     48    """
     49    Standard connection between SOAP and the fedd services in impl.
     50
     51    Much of this is boilerplate from
     52    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
     53    """
    3954    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
    4055
     
    99114
    100115    def soap_dispatch(self, method, req, fid):
     116        """
     117        The connection to the implementation, using the  method maps
     118
     119        The implementation provides a mapping from SOAP method name to the
     120        method in the implementation that provides the service.
     121        """
    101122        if self.server.soap_methods.has_key(method):
    102123            try:
     
    118139
    119140class fedd_xmlrpc_handler(BaseHTTPRequestHandler):
     141    """
     142    Standard connection between XMLRPC and the fedd services in impl.
     143
     144    Much of this is boilerplate from
     145    http://www.xml.com/pub/a/ws/2004/01/20/salz.html
     146    """
    120147    server_version = "ZSI/2.0 fedd/0.1 " + BaseHTTPRequestHandler.server_version
    121148
     
    157184
    158185    def xmlrpc_dispatch(self, method, req, fid):
     186        """
     187        The connection to the implementation, using the  method maps
     188
     189        The implementation provides a mapping from XMLRPC method name to the
     190        method in the implementation that provides the service.
     191        """
    159192        if self.server.xmlrpc_methods.has_key(method):
    160193            try:
     
    193226                const=sys.stderr, help="Print SOAP exchange to stderr")
    194227
    195 servers_active = True
    196 
    197 log_params = {\
    198         'format': "%(asctime)s %(levelname)-8s %(message)s",\
    199         'datefmt': '%a, %d %b %Y %H:%M:%S'\
    200     }
     228servers_active = True       # Sub-servers run while this is True
     229services = [ ]              # Service descriptions
     230servers = [ ]               # fedd_server instances instantiated from services
     231servers_lock = Lock()       # Lock to manipulate servers from sub-server threads
    201232
    202233def shutdown(sig, frame):
     234    """
     235    On a signal, stop running sub-servers. 
     236   
     237    This is connected to signals below
     238    """
    203239    global servers_active, flog
    204240    servers_active = False
     
    206242
    207243def run_server(s):
     244    """
     245    Operate a subserver, shutting down when servers_active is false.
     246
     247    Each server (that is host/port/transport triple) has a thread running this
     248    function, so each can handle requests independently.  They all call in to
     249    the same implementation, which must manage its own synchronization.
     250    """
    208251    global servers_active   # Not strictly needed: servers_active is only read
    209 
    210     if s:
    211         while servers_active:
    212             i, o, e = select((s,), (), (), 5.0)
    213             if s in i:
    214                 s.handle_request()
    215 
    216 services = [ ]
    217 servers = [ ]
     252    global servers          # List of active servers
     253    global servers_lock     # Lock to manipulate servers
     254
     255    while servers_active:
     256        i, o, e = select((s,), (), (), 5.0)
     257        if s in i: s.handle_request()
     258
     259    # Done.  Remove us from the list
     260    servers_lock.acquire()
     261    servers.remove(s)
     262    servers_lock.release()
    218263
    219264opts, args = fedd_opts().parse_args()
    220265
     266# Logging setup
    221267flog = logging.getLogger("fedd")
    222 ffmt = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s",
    223         '%a, %d %b %Y %H:%M:%S')
     268ffmt = logging.Formatter("%(asctime)s %(name)s %(message)s",
     269        '%d %b %y %H:%M:%S')
    224270
    225271if opts.logfile: fh = logging.FileHandler(opts.logfile)
     
    236282flog.addHandler(fh)
    237283
    238 
     284# Initialize the implementation
    239285if opts.configfile != None:
    240286    try:
     
    247293    sys.exit("--configfile is required")
    248294
    249 SOAP_port = (opts.host, opts.port)
    250 
    251295if impl.cert_file == None:
    252296    sys.exit("Must supply certificate file (probably in config)")
    253297
     298# Create the SSL credentials
    254299ctx = None
    255300while ctx == None:
     
    261306            raise
    262307
     308# Walk through the service descriptions and pack them into the services list.
     309# That list has the form (transport (host, port)).
    263310if opts.services:
    264311    for s in opts.services:
     
    275322    services.append((opts.transport, (opts.host, opts.port)))
    276323
     324# Create the servers and put them into a list
    277325for s in services:
    278326    if s[0] == "soap":
     
    282330    else: flog.warning("Unknown transport: %s" % s[0])
    283331
     332#  Make sure that there are no malformed servers in the list
     333services = [ s for s in services if s ]
     334
     335# Catch signals
    284336signal(SIGINT, shutdown)
    285337signal(SIGTERM, shutdown)
    286338
     339# Start the servers
    287340for s in servers:
    288     if s:
    289         t = Thread(target=run_server, args=(s,))
    290         t.start()
    291 
    292 pause()
     341    Thread(target=run_server, args=(s,)).start()
     342
     343# Main thread waits for signals
     344while servers_active:
     345    pause()
     346
     347#Once shutdown starts wait for all the servers to terminate.
     348while True:
     349    servers_lock.acquire()
     350    if len(servers) == 0:
     351        servers_lock.release()
     352        flog.info("All servers exited.  Terminating")
     353        sys.exit(0)
     354    servers_lock.release()
     355    sleep(1)
     356
  • fedd/fedd_access.py

    r11a08b0 r0ea11af  
    2424import logging
    2525
     26
     27# Make log messages disappear if noone configures a fedd logger
    2628class nullHandler(logging.Handler):
    2729    def emit(self, record): pass
     
    9597                self.dynamic_projects_cert_pwd)
    9698
    97 
    98         if config.dynamic_projects_url == None:
    99             self.allocate_project = \
    100                 fedd_allocate_project_local(config.dynamic_projects,
    101                         config.dynamic_projects_url, proj_certs)
    102         else:
    103             self.allocate_project = \
    104                 fedd_allocate_project_remote(config.dynamic_projects,
    105                         config.dynamic_projects_url, proj_certs)
    106 
    107         self.soap_handlers = {\
     99        self.soap_services = {\
    108100            'RequestAccess': make_soap_handler(\
    109101                RequestAccessRequestMessage.typecode,\
     
    111103                "RequestAccessResponseBody")\
    112104            }
    113         self.xmlrpc_handlers =  {\
     105        self.xmlrpc_services =  {\
    114106            'RequestAccess': make_xmlrpc_handler(\
    115107                self.RequestAccess, "RequestAccessResponseBody")\
    116108            }
     109
     110
     111        if config.dynamic_projects_url == None:
     112            self.allocate_project = \
     113                fedd_allocate_project_local(config.dynamic_projects,
     114                        config.dynamic_projects_url, proj_certs)
     115        else:
     116            self.allocate_project = \
     117                fedd_allocate_project_remote(config.dynamic_projects,
     118                        config.dynamic_projects_url, proj_certs)
     119
     120        # If the project allocator exports services, put them in this object's
     121        # maps so that classes that instantiate this can call the services.
     122        self.soap_services.update(self.allocate_project.soap_services)
     123        self.xmlrpc_services.update(self.allocate_project.xmlrpc_services)
    117124
    118125    def dump_state(self):
     
    393400
    394401    def RequestAccess(self, req, fid):
    395 
     402        """
     403        Handle the access request.  Proxy if not for us.
     404
     405        Parse out the fields and make the allocations or rejections if for us,
     406        otherwise, assuming we're willing to proxy, proxy the request out.
     407        """
     408
     409        # The dance to get into the request body
    396410        if req.has_key('RequestAccessRequestBody'):
    397411            req = req['RequestAccessRequestBody']
     
    488502   
    489503    def get_soap_services(self):
    490         return self.soap_handlers
     504        return self.soap_services
    491505
    492506    def get_xmlrpc_services(self):
    493         return self.xmlrpc_handlers
    494 
     507        return self.xmlrpc_services
     508
  • fedd/fedd_allocate_project.py

    r11a08b0 r0ea11af  
    2525
    2626
     27# Configure loggers to dump to /dev/null which avoids errors if calling classes
     28# don't configure them.
    2729class nullHandler(logging.Handler):
    2830    def emit(self, record): pass
     
    3436
    3537class fedd_allocate_project_local:
     38    """
     39    Allocate projects on this machine in response to an access request.
     40    """
    3641    def __init__(self, dp=False, url=None, certs=None):
    3742        """
     
    4550        self.grantnodetype = '/usr/testbed/sbin/grantnodetype'
    4651        self.log = logging.getLogger("fedd.allocate.local")
     52
     53        # Internal services are SOAP only
     54        self.soap_services = {\
     55                "AllocateProject": make_soap_handler(\
     56                AllocateProjectRequestMessage.typecode,\
     57                self.dynamic_project, AllocateProjectResponseMessage,\
     58                "AllocateProjectResponseBody")\
     59                }
     60        self.xmlrpc_services = { }
    4761
    4862    def random_string(self, s, n=3):
     
    195209
    196210class fedd_allocate_project_remote:
     211    """
     212    Allocate projects on a remote machine using the internal SOAP interface
     213    """
    197214    def __init__(self, dp=False, url=None, certs=None):
    198215        """
     
    208225            self.cert_file, self.trusted_certs, self.cert_pwd = \
    209226                    (None, None, None)
    210 
     227        self.soap_services = { }
     228        self.xmlrpc_services = { }
     229       
    211230    def dynamic_project(self, req, fedid=None):
    212231        """
  • fedd/fedd_deter_impl.py

    r11a08b0 r0ea11af  
    2121        """
    2222        if config_path:
    23             self.soap_methods = { }
    24             self.xmlrpc_methods = { }
     23            self.soap_services = { }
     24            self.xmlrpc_services = { }
    2525            config = config_file(config_path)
    2626
     
    3232            self.experiment = fedd_experiment_control_local(config)
    3333
    34             self.soap_methods.update(self.access.get_soap_services())
    35             self.soap_methods.update(self.experiment.get_soap_services())
     34            self.soap_services.update(self.access.soap_services)
     35            self.soap_services.update(self.experiment.soap_services)
    3636
    37             self.xmlrpc_methods.update(self.access.get_xmlrpc_services())
    38             self.xmlrpc_methods.update(self.experiment.get_xmlrpc_services())
     37            self.xmlrpc_services.update(self.access.xmlrpc_services)
     38            self.xmlrpc_services.update(self.experiment.xmlrpc_services)
    3939
    4040    def get_soap_services(self):
    41         return self.soap_methods
     41        return self.soap_services
    4242
    4343    def get_xmlrpc_services(self):
    44         return self.xmlrpc_methods
     44        return self.xmlrpc_services
    4545
    4646def new_feddservice(configfile):
  • fedd/fedd_experiment_control.py

    r11a08b0 r0ea11af  
    3939
    4040class fedd_experiment_control_local:
     41    """
     42    Control of experiments that this system can directly access.
     43
     44    Includes experiment creation, termination and information dissemination.
     45    Thred safe.
     46    """
    4147    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
    4248        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
     
    4450   
    4551    class thread_pool:
     52        """
     53        A class to keep track of a set of threads all invoked for the same
     54        task.  Manages the mutual exclusion of the states.
     55        """
    4656        def __init__(self):
     57            """
     58            Start a pool.
     59            """
    4760            self.changed = Condition()
    4861            self.started = 0
     
    5063
    5164        def acquire(self):
     65            """
     66            Get the pool's lock.
     67            """
    5268            self.changed.acquire()
    5369
    5470        def release(self):
     71            """
     72            Release the pool's lock.
     73            """
    5574            self.changed.release()
    5675
    5776        def wait(self, timeout = None):
     77            """
     78            Wait for a pool thread to start or stop.
     79            """
    5880            self.changed.wait(timeout)
    5981
    6082        def start(self):
     83            """
     84            Called by a pool thread to report starting.
     85            """
    6186            self.changed.acquire()
    6287            self.started += 1
     
    6590
    6691        def terminate(self):
     92            """
     93            Called by a pool thread to report finishing.
     94            """
    6795            self.changed.acquire()
    6896            self.terminated += 1
     
    7199
    72100        def clear(self):
     101            """
     102            Clear all pool data.
     103            """
    73104            self.changed.acquire()
    74105            self.started = 0
     
    78109
    79110    class pooled_thread(Thread):
     111        """
     112        One of a set of threads dedicated to a specific task.  Uses the
     113        thread_pool class above for coordination.
     114        """
    80115        def __init__(self, group=None, target=None, name=None, args=(),
    81116                kwargs={}, pdata=None, trace_file=None):
    82117            Thread.__init__(self, group, target, name, args, kwargs)
    83             self.rv = None
    84             self.exception = None
    85             self.target=target
    86             self.args = args
    87             self.kwargs = kwargs
    88             self.pdata = pdata
    89             self.trace_file = trace_file
     118            self.rv = None          # Return value of the ops in this thread
     119            self.exception = None   # Exception that terminated this thread
     120            self.target=target      # Target function to run on start()
     121            self.args = args        # Args to pass to target
     122            self.kwargs = kwargs    # Additional kw args
     123            self.pdata = pdata      # thread_pool for this class
     124            # Logger for this thread
     125            self.log = logging.getLogger("fedd.experiment_control")
    90126       
    91127        def run(self):
     128            """
     129            Emulate Thread.run, except add pool data manipulation and error
     130            logging.
     131            """
    92132            if self.pdata:
    93133                self.pdata.start()
     
    98138                except service_error, s:
    99139                    self.exception = s
    100                     if self.trace_file:
    101                         logging.error("Thread exception: %s %s" % \
    102                                 (s.code_string(), s.desc))
     140                    self.log.error("Thread exception: %s %s" % \
     141                            (s.code_string(), s.desc))
    103142                except:
    104143                    self.exception = sys.exc_info()[1]
    105                     if self.trace_file:
    106                         logging.error(("Unexpected thread exception: %s" +\
    107                                 "Trace %s") % (self.exception,\
    108                                     traceback.format_exc()))
     144                    self.log.error(("Unexpected thread exception: %s" +\
     145                            "Trace %s") % (self.exception,\
     146                                traceback.format_exc()))
    109147            if self.pdata:
    110148                self.pdata.terminate()
    111149
    112150    def __init__(self, config=None):
     151        """
     152        Intialize the various attributes, most from the config object
     153        """
    113154        self.scripts = fedd_experiment_control_local.scripts
    114155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
     
    121162        # Walk through the various relevant certificat specifying config
    122163        # attributes until the local certificate attributes can be resolved.
    123         # The walk is from omst specific to most general specification.
     164        # The walk is from most specific to most general specification.
    124165        for p in ("create_experiment_", "proxy_", ""):
    125166            filen = "%scert_file" % p
     
    200241                        config.experiment_log)
    201242
    202         # Grab saved state
     243        # Grab saved state.  OK to do this w/o locking because it's read only
     244        # and only one thread should be in existence that can see self.state at
     245        # this point.
    203246        if self.state_filename:
    204247            self.read_state()
     
    210253                        "%s/%s not in local script dir" % (self.scripts_dir, s))
    211254
    212         self.soap_handlers = {\
     255        # Dispatch tables
     256        self.soap_services = {\
    213257                'Create': make_soap_handler(\
    214258                        CreateRequestMessage.typecode,
     
    238282        }
    239283
    240         self.xmlrpc_handlers = {\
     284        self.xmlrpc_services = {\
    241285                'Create': make_xmlrpc_handler(\
    242286                        getattr(self, "create_experiment"),
     
    257301
    258302    def get_soap_services(self):
    259         return self.soap_handlers
     303        return self.soap_services
    260304
    261305    def get_xmlrpc_services(self):
    262         return self.xmlrpc_handlers
     306        return self.xmlrpc_services
    263307
    264308    def copy_file(self, src, dest, size=1024):
     
    278322    # Call while holding self.state_lock
    279323    def write_state(self):
     324        """
     325        Write a new copy of experiment state after copying the existing state
     326        to a backup.
     327
     328        State format is a simple pickling of the state dictionary.
     329        """
    280330        if os.access(self.state_filename, os.W_OK):
    281331            self.copy_file(self.state_filename, \
     
    292342    # Call while holding self.state_lock
    293343    def read_state(self):
     344        """
     345        Read a new copy of experiment state.  Old state is overwritten.
     346
     347        State format is a simple pickling of the state dictionary.
     348        """
    294349        try:
    295350            f = open(self.state_filename, "r")
     
    303358    def scp_file(self, file, user, host, dest=""):
    304359        """
    305         scp a file to the remote host.
     360        scp a file to the remote host.  If debug is set the action is only
     361        logged.
    306362        """
    307363
     
    316372
    317373    def ssh_cmd(self, user, host, cmd, wname=None):
     374        """
     375        Run a remote command on host as user.  If debug is set, the action is
     376        only logged.
     377        """
    318378        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
    319379
     
    326386
    327387    def ship_scripts(self, host, user, dest_dir):
     388        """
     389        Copy the federation scripts (fedkit) to the a federant.
     390        """
    328391        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
    329392            for s in self.scripts:
     
    336399
    337400    def ship_configs(self, host, user, src_dir, dest_dir):
     401        """
     402        Copy federant-specific configuration files to the federant.
     403        """
    338404        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
    339405            return False
     
    353419
    354420    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
     421        """
     422        Start a sub-experiment on a federant.
     423
     424        Get the current state, modify or create as appropriate, ship data and
     425        configs and start the experiment.  There are small ordering differences
     426        based on the initial state of the sub-experiment.
     427        """
     428        # ops node in the federant
    355429        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    356         user = tbparams[tb]['user']
    357         pid = tbparams[tb]['project']
     430        user = tbparams[tb]['user']     # federant user
     431        pid = tbparams[tb]['project']   # federant project
    358432        # XXX
    359433        base_confs = ( "hosts",)
    360         tclfile = "%s.%s.tcl" % (eid, tb)
    361         expinfo_exec = "/usr/testbed/bin/expinfo"
     434        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     435        # command to test experiment state
     436        expinfo_exec = "/usr/testbed/bin/expinfo" 
     437        # Configuration directories on the remote machine
    362438        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    363439        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
    364440        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
     441        # Regular expressions to parse the expinfo response
    365442        state_re = re.compile("State:\s+(\w+)")
    366443        no_exp_re = re.compile("^No\s+such\s+experiment")
    367         state = None
     444        state = None    # Experiment state parsed from expinfo
     445        # The expinfo ssh command
    368446        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
    369447
     448        # Get status
    370449        self.log.debug("[start_segment]: %s"% " ".join(cmd))
    371450        dev_null = None
     
    383462                if m: state = "none"
    384463        rv = status.wait()
     464
    385465        # If the experiment is not present the subcommand returns a non-zero
    386466        # return value.  If we successfully parsed a "none" outcome, ignore the
     
    506586
    507587    def stop_segment(self, tb, eid, tbparams):
     588        """
     589        Stop a sub experiment by calling swapexp on the federant
     590        """
    508591        user = tbparams[tb]['user']
    509592        host = tbparams[tb]['host']
     
    541624
    542625    def gentopo(self, str):
     626        """
     627        Generate the topology dtat structure from the splitter's XML
     628        representation of it.
     629
     630        The topology XML looks like:
     631            <experiment>
     632                <nodes>
     633                    <node><vname></vname><ips>ip1:ip2</ips></node>
     634                </nodes>
     635                <lans>
     636                    <lan>
     637                        <vname></vname><vnode></vnode><ip></ip>
     638                        <bandwidth></bandwidth><member>node:port</member>
     639                    </lan>
     640                </lans>
     641        """
    543642        class topo_parse:
     643            """
     644            Parse the topology XML and create the dats structure.
     645            """
    544646            def __init__(self):
     647                # Typing of the subelements for data conversion
    545648                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
    546649                self.int_subelements = ( 'bandwidth',)
    547650                self.float_subelements = ( 'delay',)
     651                # The final data structure
    548652                self.nodes = [ ]
    549653                self.lans =  [ ]
    550                 self.element = { }
    551654                self.topo = { \
    552655                        'node': self.nodes,\
    553656                        'lan' : self.lans,\
    554657                    }
    555                 self.chars = ""
     658                self.element = { }  # Current element being created
     659                self.chars = ""     # Last text seen
    556660
    557661            def end_element(self, name):
     662                # After each sub element the contents is added to the current
     663                # element or to the appropriate list.
    558664                if name == 'node':
    559665                    self.nodes.append(self.element)
     
    782888       
    783889    class current_testbed:
     890        """
     891        Object for collecting the current testbed description.  The testbed
     892        description is saved to a file with the local testbed variables
     893        subsittuted line by line.
     894        """
    784895        def __init__(self, eid, tmpdir):
    785896            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
     
    870981
    871982    class allbeds:
     983        """
     984        Process the Allbeds section.  Get access to each federant and save the
     985        parameters in tbparams
     986        """
    872987        def __init__(self, get_access):
    873988            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
     
    10901205
    10911206    class shunt_to_file:
     1207        """
     1208        Simple class to write data between two regexps to a file.
     1209        """
    10921210        def __init__(self, begin, end, filename):
     1211            """
     1212            Begin shunting on a match of begin, stop on end, send data to
     1213            filename.
     1214            """
    10931215            self.begin = re.compile(begin)
    10941216            self.end = re.compile(end)
     
    10981220
    10991221        def __call__(self, line):
     1222            """
     1223            Call this on each line in the input that may be shunted.
     1224            """
    11001225            if not self.in_shunt:
    11011226                if self.begin.match(line):
     
    11211246
    11221247    class shunt_to_list:
     1248        """
     1249        Same interface as shunt_to_file.  Data collected in self.list, one list
     1250        element per line.
     1251        """
    11231252        def __init__(self, begin, end):
    11241253            self.begin = re.compile(begin)
     
    11421271
    11431272    class shunt_to_string:
     1273        """
     1274        Same interface as shunt_to_file.  Data collected in self.str, all in
     1275        one string.
     1276        """
    11441277        def __init__(self, begin, end):
    11451278            self.begin = re.compile(begin)
     
    11631296
    11641297    def create_experiment(self, req, fid):
     1298        """
     1299        The external interface to experiment creation called from the
     1300        dispatcher.
     1301
     1302        Creates a working directory, splits the incoming description using the
     1303        splitter script and parses out the avrious subsections using the
     1304        lcasses above.  Once each sub-experiment is created, use pooled threads
     1305        to instantiate them and start it all up.
     1306        """
    11651307        try:
    11661308            tmpdir = tempfile.mkdtemp(prefix="split-")
     
    12091351            while (self.state.has_key(eid)):
    12101352                eid += random.choice(string.ascii_letters)
     1353            # To avoid another thread picking this localname
    12111354            self.state[eid] = "placeholder"
    12121355            self.state_lock.release()
     
    12201363                for i in range(0,5):
    12211364                    eid += random.choice(string.ascii_letters)
     1365            # To avoid another thread picking this localname
    12221366            self.state[eid] = "placeholder"
    12231367            self.state_lock.release()
     
    12421386        tclparser = Popen(tclcmd, stdout=PIPE)
    12431387
    1244         allocated = { }
    1245         started = { }
    1246 
     1388        allocated = { }     # Testbeds we can access
     1389        started = { }       # Testbeds where a sub-experiment started
     1390                            # successfully
     1391
     1392        # Objects to parse the splitter output (defined above)
    12471393        parse_current_testbed = self.current_testbed(eid, tmpdir)
    12481394        parse_allbeds = self.allbeds(self.get_access)
     
    12581404                "^#\s+End\s+rpms")
    12591405
     1406        # Worling on the split data
    12601407        for line in tclparser.stdout:
    12611408            line = line.rstrip()
     
    12781425                        "Bad tcl parse? %s" % line)
    12791426
     1427        # Virtual topology and visualization
    12801428        vtopo = self.gentopo(parse_vtopo.str)
    12811429        if not vtopo:
     
    13901538                }
    13911539
     1540        # Insert the experiment into our state and update the disk copy
    13921541        self.state_lock.acquire()
    13931542        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
     
    14121561
    14131562    def get_vtopo(self, req, fid):
     1563        """
     1564        Return the stored virtual topology for this experiment
     1565        """
    14141566        rv = None
    14151567
     
    14421594
    14431595    def get_vis(self, req, fid):
     1596        """
     1597        Return the stored visualization for this experiment
     1598        """
    14441599        rv = None
    14451600
     
    14721627
    14731628    def get_info(self, req, fid):
     1629        """
     1630        Return all the stored info about this experiment
     1631        """
    14741632        rv = None
    14751633
     
    15041662
    15051663    def terminate_experiment(self, req, fid):
     1664        """
     1665        Swap this experiment out on the federants and delete the shared
     1666        information
     1667        """
    15061668        tbparams = { }
    15071669        req = req.get('TerminateRequestBody', None)
     
    15841746            self.state_lock.release()
    15851747            raise service_error(service_error.req, "No saved state")
    1586 
    1587 
    1588 
    1589 
    1590 if __name__ == '__main__':
    1591     from optparse import OptionParser
    1592    
    1593     parser = OptionParser()
    1594     parser.add_option('-d', '--debug', dest='debug', default=False,
    1595             action='store_true', help='print actions rather than take them')
    1596     parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse')
    1597     parser.add_option('-m', '--master', dest='master',
    1598             help='testbed label for matster testbd')
    1599     parser.add_option('-t', '--trace', dest='trace', default=None,
    1600             help='file to print intermediate messages to')
    1601     parser.add_option('-T', '--trace-stderr', dest='trace',
    1602             action='store_const',const=sys.stderr,
    1603             help='file to print intermediate messages to')
    1604     opts, args  = parser.parse_args()
    1605 
    1606     trace_file = None
    1607     if opts.trace:
    1608         try:
    1609             trace_file = open(opts.trace, 'w')
    1610         except IOError:
    1611             print >>sys.stderr, "Can't open trace file"
    1612 
    1613     if opts.debug:
    1614         if not trace_file:
    1615             trace_file = sys.stderr
    1616 
    1617     if opts.tcl != None:
    1618         try:
    1619             f = open(opts.tcl, 'r')
    1620             content = ''.join(f)
    1621             f.close()
    1622         except IOError, e:
    1623             sys.exit("Can't read %s: %s" % (opts.tcl, e))
    1624     else:
    1625         sys.exit("Must specify a file name")
    1626 
    1627     if not opts.master:
    1628         sys.exit("Must supply master tb label (--master)");
    1629 
    1630     obj = fedd_create_experiment_local(
    1631             debug=opts.debug,
    1632             scripts_dir="/users/faber/testbed/federation",
    1633             cert_file="./fedd_client.pem", cert_pwd="faber",
    1634             ssh_pubkey_file='/users/faber/.ssh/id_rsa.pub',
    1635             trusted_certs="./cacert.pem",
    1636             tbmap = {
    1637                 'deter':'https://users.isi.deterlab.net:23235',
    1638                 'emulab':'https://users.isi.deterlab.net:23236',
    1639                 'ucb':'https://users.isi.deterlab.net:23237',
    1640                 },
    1641             trace_file=trace_file
    1642         )
    1643     rv = obj.create_experiment( {\
    1644             'experimentdescription' : content,
    1645             'master' : opts.master,
    1646             'user': [ {'userID' : { 'localname' : 'faber' } } ],
    1647             },
    1648             None)
    1649 
    1650     print rv
Note: See TracChangeset for help on using the changeset viewer.