Ignore:
Timestamp:
Aug 5, 2009 11:12:21 AM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
c9b3f49
Parents:
ab37086
Message:

Checkpoint (and whitespace in experiment_control)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/experiment_control.py

    rab37086 rdb6b092  
    2121from subprocess import *
    2222
     23from urlparse import urlparse
     24from urllib2 import urlopen
     25
    2326from util import *
    2427from fedid import fedid, generate_fedid
     
    2629from service_error import service_error
    2730
     31import topdl
     32from ip_allocator import ip_allocator
     33from ip_addr import ip_addr
     34
    2835
    2936class nullHandler(logging.Handler):
     
    4451
    4552    class list_log:
    46         """
    47         Provide an interface that lets logger.StreamHandler s write to a list
    48         of strings.
    49         """
    50         def __init__(self, l=[]):
    51             """
    52             Link to an existing list or just create a log
    53             """
    54             self.ll = l
    55             self.lock = Lock()
    56         def write(self, str):
    57             """
    58             Add the string to the log.  Lock for consistency.
    59             """
    60             self.lock.acquire()
    61             self.ll.append(str)
    62             self.lock.release()
    63 
    64         def flush(self):
    65             """
    66             No-op that StreamHandlers expect
    67             """
    68             pass
     53        """
     54        Provide an interface that lets logger.StreamHandler s write to a list
     55        of strings.
     56        """
     57        def __init__(self, l=[]):
     58            """
     59            Link to an existing list or just create a log
     60            """
     61            self.ll = l
     62            self.lock = Lock()
     63        def write(self, str):
     64            """
     65            Add the string to the log.  Lock for consistency.
     66            """
     67            self.lock.acquire()
     68            self.ll.append(str)
     69            self.lock.release()
     70
     71        def flush(self):
     72            """
     73            No-op that StreamHandlers expect
     74            """
     75            pass
    6976
    7077   
     
    253260        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
    254261
    255         self.overrides = set([])
    256         ovr = config.get('experiment_control', 'overrides')
    257         if ovr:
    258             for o in ovr.split(","):
    259                 o = o.strip()
    260                 if o.startswith('fedid:'): o = o[len('fedid:'):]
    261                 self.overrides.add(fedid(hexstr=o))
     262        self.overrides = set([])
     263        ovr = config.get('experiment_control', 'overrides')
     264        if ovr:
     265            for o in ovr.split(","):
     266                o = o.strip()
     267                if o.startswith('fedid:'): o = o[len('fedid:'):]
     268                self.overrides.add(fedid(hexstr=o))
    262269
    263270        self.state = { }
     
    334341        # Dispatch tables
    335342        self.soap_services = {\
    336                 'Create': soap_handler('Create', self.create_experiment),
     343                'Create': soap_handler('Create', self.new_create_experiment),
    337344                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
    338345                'Vis': soap_handler('Vis', self.get_vis),
     
    344351
    345352        self.xmlrpc_services = {\
    346                 'Create': xmlrpc_handler('Create', self.create_experiment),
     353                'Create': xmlrpc_handler('Create', self.new_create_experiment),
    347354                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
    348355                'Vis': xmlrpc_handler('Vis', self.get_vis),
     
    417424                            if f.has_key('fedid') ]:
    418425                    self.auth.set_attribute(self.state[k]['owner'], eid)
    419                     # allow overrides to control experiments as well
    420                     for o in self.overrides:
    421                         self.auth.set_attribute(o, eid)
     426                    # allow overrides to control experiments as well
     427                    for o in self.overrides:
     428                        self.auth.set_attribute(o, eid)
    422429            except KeyError, e:
    423430                self.log.warning("[read_state]: State ownership or identity " +\
     
    508515
    509516    class emulab_segment:
    510         def __init__(self, log=None, keyfile=None, debug=False):
    511             self.log = log or logging.getLogger(\
    512                     'fedd.experiment_control.emulab_segment')
    513             self.ssh_privkey_file = keyfile
    514             self.debug = debug
    515             self.ssh_exec="/usr/bin/ssh"
    516             self.scp_exec = "/usr/bin/scp"
    517             self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
    518 
    519         def scp_file(self, file, user, host, dest=""):
    520             """
    521             scp a file to the remote host.  If debug is set the action is only
    522             logged.
    523             """
    524 
    525             scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
    526                     '-o', 'StrictHostKeyChecking yes', '-i',
    527                     self.ssh_privkey_file, file,
    528                     "%s@%s:%s" % (user, host, dest)]
    529             rv = 0
    530 
    531             try:
    532                 dnull = open("/dev/null", "w")
    533             except IOError:
    534                 self.log.debug("[ssh_file]: failed to open " + \
    535                         "/dev/null for redirect")
    536                 dnull = Null
    537 
    538             self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
    539             if not self.debug:
    540                 rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
    541                         close_fds=True)
    542 
    543             return rv == 0
    544 
    545         def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
    546             """
    547             Run a remote command on host as user.  If debug is set, the action
    548             is only logged.  Commands are run without stdin, to avoid stray
    549             SIGTTINs.
    550             """
    551             sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
    552                     "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
    553                     (self.ssh_exec, self.ssh_privkey_file,
    554                             user, host, cmd)
    555 
    556             try:
    557                 dnull = open("/dev/null", "w")
    558             except IOError:
    559                 self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
    560                         "for redirect")
    561                 dnull = Null
    562 
    563             self.log.debug("[ssh_cmd]: %s" % sh_str)
    564             if not self.debug:
    565                 if dnull:
    566                     sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
    567                             close_fds=True)
    568                 else:
    569                     sub = Popen(sh_str, shell=True,
    570                             close_fds=True)
    571                 if timeout:
    572                     i = 0
    573                     rv = sub.poll()
    574                     while i < timeout:
    575                         if rv is not None: break
    576                         else:
    577                             time.sleep(1)
    578                             rv = sub.poll()
    579                             i += 1
    580                     else:
    581                         self.log.debug("Process exceeded runtime: %s" % sh_str)
    582                         os.kill(sub.pid, signal.SIGKILL)
    583                         raise self.ssh_cmd_timeout();
    584                     return rv == 0
    585                 else:
    586                     return sub.wait() == 0
    587             else:
    588                 if timeout == 0:
    589                     self.log.debug("debug timeout raised on %s " % sh_str)
    590                     raise self.ssh_cmd_timeout()
    591                 else:
    592                     return True
     517        def __init__(self, log=None, keyfile=None, debug=False):
     518            self.log = log or logging.getLogger(\
     519                    'fedd.experiment_control.emulab_segment')
     520            self.ssh_privkey_file = keyfile
     521            self.debug = debug
     522            self.ssh_exec="/usr/bin/ssh"
     523            self.scp_exec = "/usr/bin/scp"
     524            self.ssh_cmd_timeout = experiment_control_local.ssh_cmd_timeout
     525
     526        def scp_file(self, file, user, host, dest=""):
     527            """
     528            scp a file to the remote host.  If debug is set the action is only
     529            logged.
     530            """
     531
     532            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
     533                    '-o', 'StrictHostKeyChecking yes', '-i',
     534                    self.ssh_privkey_file, file,
     535                    "%s@%s:%s" % (user, host, dest)]
     536            rv = 0
     537
     538            try:
     539                dnull = open("/dev/null", "w")
     540            except IOError:
     541                self.log.debug("[ssh_file]: failed to open " + \
     542                        "/dev/null for redirect")
     543                dnull = Null
     544
     545            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
     546            if not self.debug:
     547                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
     548                        close_fds=True)
     549
     550            return rv == 0
     551
     552        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
     553            """
     554            Run a remote command on host as user.  If debug is set, the action
     555            is only logged.  Commands are run without stdin, to avoid stray
     556            SIGTTINs.
     557            """
     558            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
     559                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
     560                    (self.ssh_exec, self.ssh_privkey_file,
     561                            user, host, cmd)
     562
     563            try:
     564                dnull = open("/dev/null", "w")
     565            except IOError:
     566                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
     567                        "for redirect")
     568                dnull = Null
     569
     570            self.log.debug("[ssh_cmd]: %s" % sh_str)
     571            if not self.debug:
     572                if dnull:
     573                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
     574                            close_fds=True)
     575                else:
     576                    sub = Popen(sh_str, shell=True,
     577                            close_fds=True)
     578                if timeout:
     579                    i = 0
     580                    rv = sub.poll()
     581                    while i < timeout:
     582                        if rv is not None: break
     583                        else:
     584                            time.sleep(1)
     585                            rv = sub.poll()
     586                            i += 1
     587                    else:
     588                        self.log.debug("Process exceeded runtime: %s" % sh_str)
     589                        os.kill(sub.pid, signal.SIGKILL)
     590                        raise self.ssh_cmd_timeout();
     591                    return rv == 0
     592                else:
     593                    return sub.wait() == 0
     594            else:
     595                if timeout == 0:
     596                    self.log.debug("debug timeout raised on %s " % sh_str)
     597                    raise self.ssh_cmd_timeout()
     598                else:
     599                    return True
    593600
    594601    class start_segment(emulab_segment):
    595         def __init__(self, log=None, keyfile=None, debug=False):
    596             experiment_control_local.emulab_segment.__init__(self,
    597                     log=log, keyfile=keyfile, debug=debug)
    598 
    599         def create_config_tree(self, src_dir, dest_dir, script):
    600             """
    601             Append commands to script that will create the directory hierarchy
    602             on the remote federant.
    603             """
    604 
    605             if os.path.isdir(src_dir):
    606                 print >>script, "mkdir -p %s" % dest_dir
    607                 print >>script, "chmod 770 %s" % dest_dir
    608 
    609                 for f in os.listdir(src_dir):
    610                     if os.path.isdir(f):
    611                         self.create_config_tree("%s/%s" % (src_dir, f),
    612                                 "%s/%s" % (dest_dir, f), script)
    613             else:
    614                 self.log.debug("[create_config_tree]: Not a directory: %s" \
    615                         % src_dir)
    616 
    617         def ship_configs(self, host, user, src_dir, dest_dir):
    618             """
    619             Copy federant-specific configuration files to the federant.
    620             """
    621             for f in os.listdir(src_dir):
    622                 if os.path.isdir(f):
    623                     if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
    624                             "%s/%s" % (dest_dir, f)):
    625                         return False
    626                 else:
    627                     if not self.scp_file("%s/%s" % (src_dir, f),
    628                             user, host, dest_dir):
    629                         return False
    630             return True
    631 
    632         def get_state(self, user, host, tb, pid, eid):
    633             # command to test experiment state
    634             expinfo_exec = "/usr/testbed/bin/expinfo" 
    635             # Regular expressions to parse the expinfo response
    636             state_re = re.compile("State:\s+(\w+)")
    637             no_exp_re = re.compile("^No\s+such\s+experiment")
    638             swapping_re = re.compile("^No\s+information\s+available.")
    639             state = None    # Experiment state parsed from expinfo
    640             # The expinfo ssh command.  Note the identity restriction to use
    641             # only the identity provided in the pubkey given.
    642             cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
    643                     'StrictHostKeyChecking yes', '-i',
    644                     self.ssh_privkey_file, "%s@%s" % (user, host),
    645                     expinfo_exec, pid, eid]
    646 
    647             dev_null = None
    648             try:
    649                 dev_null = open("/dev/null", "a")
    650             except IOError, e:
    651                 self.log.error("[get_state]: can't open /dev/null: %s" %e)
    652 
    653             if self.debug:
    654                 state = 'swapped'
    655                 rv = 0
    656             else:
    657                 status = Popen(cmd, stdout=PIPE, stderr=dev_null,
    658                         close_fds=True)
    659                 for line in status.stdout:
    660                     m = state_re.match(line)
    661                     if m: state = m.group(1)
    662                     else:
    663                         for reg, st in ((no_exp_re, "none"),
    664                                 (swapping_re, "swapping")):
    665                             m = reg.match(line)
    666                             if m: state = st
    667                 rv = status.wait()
    668 
    669             # If the experiment is not present the subcommand returns a
    670             # non-zero return value.  If we successfully parsed a "none"
    671             # outcome, ignore the return code.
    672             if rv != 0 and state != 'none':
    673                 raise service_error(service_error.internal,
    674                         "Cannot get status of segment %s:%s/%s" % \
    675                                 (tb, pid, eid))
    676             elif state not in ('active', 'swapped', 'swapping', 'none'):
    677                 raise service_error(service_error.internal,
    678                         "Cannot get status of segment %s:%s/%s" % \
    679                                 (tb, pid, eid))
    680             else: return state
    681 
    682 
    683         def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
    684             """
    685             Start a sub-experiment on a federant.
    686 
    687             Get the current state, modify or create as appropriate, ship data
    688             and configs and start the experiment.  There are small ordering
    689             differences based on the initial state of the sub-experiment.
    690             """
    691             # ops node in the federant
    692             host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    693             user = tbparams[tb]['user']     # federant user
    694             pid = tbparams[tb]['project']   # federant project
    695             # XXX
    696             base_confs = ( "hosts",)
    697             tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
    698             # Configuration directories on the remote machine
    699             proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
    700             tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
    701             rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
    702 
    703             state = self.get_state(user, host, tb, pid, eid)
    704 
    705             self.log.debug("[start_segment]: %s: %s" % (tb, state))
    706             self.log.info("[start_segment]:transferring experiment to %s" % tb)
    707 
    708             if not self.scp_file("%s/%s/%s" % \
    709                     (tmpdir, tb, tclfile), user, host):
    710                 return False
    711            
    712             if state == 'none':
    713                 # Create a null copy of the experiment so that we capture any
    714                 # logs there if the modify fails.  Emulab software discards the
    715                 # logs from a failed startexp
    716                 if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
    717                     return False
    718                 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
    719                 timedout = False
    720                 try:
    721                     if not self.ssh_cmd(user, host,
    722                             ("/usr/testbed/bin/startexp -i -f -w -p %s " +
    723                             "-e %s null.tcl") % (pid, eid), "startexp",
    724                             timeout=60 * 10):
    725                         return False
    726                 except self.ssh_cmd_timeout:
    727                     timedout = True
    728 
    729                 if timedout:
    730                     state = self.get_state(user, host, tb, pid, eid)
    731                     if state != "swapped":
    732                         return False
    733 
    734            
    735             # Open up a temporary file to contain a script for setting up the
    736             # filespace for the new experiment.
    737             self.log.info("[start_segment]: creating script file")
    738             try:
    739                 sf, scriptname = tempfile.mkstemp()
    740                 scriptfile = os.fdopen(sf, 'w')
    741             except IOError:
    742                 return False
    743 
    744             scriptbase = os.path.basename(scriptname)
    745 
    746             # Script the filesystem changes
    747             print >>scriptfile, "/bin/rm -rf %s" % proj_dir
    748             # Clear and create the tarfiles and rpm directories
    749             for d in (tarfiles_dir, rpms_dir):
    750                 print >>scriptfile, "/bin/rm -rf %s/*" % d
    751                 print >>scriptfile, "mkdir -p %s" % d
    752             print >>scriptfile, 'mkdir -p %s' % proj_dir
    753             self.create_config_tree("%s/%s" % (tmpdir, tb),
    754                     proj_dir, scriptfile)
    755             if os.path.isdir("%s/tarfiles" % tmpdir):
    756                 self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
    757                         scriptfile)
    758             if os.path.isdir("%s/rpms" % tmpdir):
    759                 self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,
    760                         scriptfile)
    761             print >>scriptfile, "rm -f %s" % scriptbase
    762             scriptfile.close()
    763 
    764             # Move the script to the remote machine
    765             # XXX: could collide tempfile names on the remote host
    766             if self.scp_file(scriptname, user, host, scriptbase):
    767                 os.remove(scriptname)
    768             else:
    769                 return False
    770 
    771             # Execute the script (and the script's last line deletes it)
    772             if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
    773                 return False
    774 
    775             for f in base_confs:
    776                 if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
    777                         "%s/%s" % (proj_dir, f)):
    778                     return False
    779             if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
    780                     proj_dir):
    781                 return False
    782             if os.path.isdir("%s/tarfiles" % tmpdir):
    783                 if not self.ship_configs(host, user,
    784                         "%s/tarfiles" % tmpdir, tarfiles_dir):
    785                     return False
    786             if os.path.isdir("%s/rpms" % tmpdir):
    787                 if not self.ship_configs(host, user,
    788                         "%s/rpms" % tmpdir, tarfiles_dir):
    789                     return False
    790             # Stage the new configuration (active experiments will stay swapped
    791             # in now)
    792             self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
    793             try:
    794                 if not self.ssh_cmd(user, host,
    795                         "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
    796                                 (pid, eid, tclfile),
    797                         "modexp", timeout= 60 * 10):
    798                     return False
    799             except self.ssh_cmd_timeout:
    800                 self.log.error("Modify command failed to complete in time")
    801                 # There's really no way to see if this succeeded or failed, so
    802                 # if it hangs, assume the worst.
    803                 return False
    804             # Active experiments are still swapped, this swaps the others in.
    805             if state != 'active':
    806                 self.log.info("[start_segment]: Swapping %s in on %s" % \
    807                         (eid, tb))
    808                 timedout = False
    809                 try:
    810                     if not self.ssh_cmd(user, host,
    811                             "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    812                             "swapexp", timeout=10*60):
    813                         return False
    814                 except self.ssh_cmd_timeout:
    815                     timedout = True
    816                
    817                 # If the command was terminated, but completed successfully,
    818                 # report success.
    819                 if timedout:
    820                     self.log.debug("[start_segment]: swapin timed out " +\
    821                             "checking state")
    822                     state = self.get_state(user, host, tb, pid, eid)
    823                     self.log.debug("[start_segment]: state is %s" % state)
    824                     return state == 'active'
    825             # Everything has gone OK.
    826             return True
     602        def __init__(self, log=None, keyfile=None, debug=False):
     603            experiment_control_local.emulab_segment.__init__(self,
     604                    log=log, keyfile=keyfile, debug=debug)
     605
     606        def create_config_tree(self, src_dir, dest_dir, script):
     607            """
     608            Append commands to script that will create the directory hierarchy
     609            on the remote federant.
     610            """
     611
     612            if os.path.isdir(src_dir):
     613                print >>script, "mkdir -p %s" % dest_dir
     614                print >>script, "chmod 770 %s" % dest_dir
     615
     616                for f in os.listdir(src_dir):
     617                    if os.path.isdir(f):
     618                        self.create_config_tree("%s/%s" % (src_dir, f),
     619                                "%s/%s" % (dest_dir, f), script)
     620            else:
     621                self.log.debug("[create_config_tree]: Not a directory: %s" \
     622                        % src_dir)
     623
     624        def ship_configs(self, host, user, src_dir, dest_dir):
     625            """
     626            Copy federant-specific configuration files to the federant.
     627            """
     628            for f in os.listdir(src_dir):
     629                if os.path.isdir(f):
     630                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
     631                            "%s/%s" % (dest_dir, f)):
     632                        return False
     633                else:
     634                    if not self.scp_file("%s/%s" % (src_dir, f),
     635                            user, host, dest_dir):
     636                        return False
     637            return True
     638
     639        def get_state(self, user, host, tb, pid, eid):
     640            # command to test experiment state
     641            expinfo_exec = "/usr/testbed/bin/expinfo" 
     642            # Regular expressions to parse the expinfo response
     643            state_re = re.compile("State:\s+(\w+)")
     644            no_exp_re = re.compile("^No\s+such\s+experiment")
     645            swapping_re = re.compile("^No\s+information\s+available.")
     646            state = None    # Experiment state parsed from expinfo
     647            # The expinfo ssh command.  Note the identity restriction to use
     648            # only the identity provided in the pubkey given.
     649            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
     650                    'StrictHostKeyChecking yes', '-i',
     651                    self.ssh_privkey_file, "%s@%s" % (user, host),
     652                    expinfo_exec, pid, eid]
     653
     654            dev_null = None
     655            try:
     656                dev_null = open("/dev/null", "a")
     657            except IOError, e:
     658                self.log.error("[get_state]: can't open /dev/null: %s" %e)
     659
     660            if self.debug:
     661                state = 'swapped'
     662                rv = 0
     663            else:
     664                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
     665                        close_fds=True)
     666                for line in status.stdout:
     667                    m = state_re.match(line)
     668                    if m: state = m.group(1)
     669                    else:
     670                        for reg, st in ((no_exp_re, "none"),
     671                                (swapping_re, "swapping")):
     672                            m = reg.match(line)
     673                            if m: state = st
     674                rv = status.wait()
     675
     676            # If the experiment is not present the subcommand returns a
     677            # non-zero return value.  If we successfully parsed a "none"
     678            # outcome, ignore the return code.
     679            if rv != 0 and state != 'none':
     680                raise service_error(service_error.internal,
     681                        "Cannot get status of segment %s:%s/%s" % \
     682                                (tb, pid, eid))
     683            elif state not in ('active', 'swapped', 'swapping', 'none'):
     684                raise service_error(service_error.internal,
     685                        "Cannot get status of segment %s:%s/%s" % \
     686                                (tb, pid, eid))
     687            else: return state
     688
     689
     690        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
     691            """
     692            Start a sub-experiment on a federant.
     693
     694            Get the current state, modify or create as appropriate, ship data
     695            and configs and start the experiment.  There are small ordering
     696            differences based on the initial state of the sub-experiment.
     697            """
     698            # ops node in the federant
     699            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     700            user = tbparams[tb]['user']     # federant user
     701            pid = tbparams[tb]['project']   # federant project
     702            # XXX
     703            base_confs = ( "hosts",)
     704            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     705            # Configuration directories on the remote machine
     706            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
     707            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
     708            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
     709
     710            state = self.get_state(user, host, tb, pid, eid)
     711
     712            self.log.debug("[start_segment]: %s: %s" % (tb, state))
     713            self.log.info("[start_segment]:transferring experiment to %s" % tb)
     714
     715            if not self.scp_file("%s/%s/%s" % \
     716                    (tmpdir, tb, tclfile), user, host):
     717                return False
     718           
     719            if state == 'none':
     720                # Create a null copy of the experiment so that we capture any
     721                # logs there if the modify fails.  Emulab software discards the
     722                # logs from a failed startexp
     723                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
     724                    return False
     725                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
     726                timedout = False
     727                try:
     728                    if not self.ssh_cmd(user, host,
     729                            ("/usr/testbed/bin/startexp -i -f -w -p %s " +
     730                            "-e %s null.tcl") % (pid, eid), "startexp",
     731                            timeout=60 * 10):
     732                        return False
     733                except self.ssh_cmd_timeout:
     734                    timedout = True
     735
     736                if timedout:
     737                    state = self.get_state(user, host, tb, pid, eid)
     738                    if state != "swapped":
     739                        return False
     740
     741           
     742            # Open up a temporary file to contain a script for setting up the
     743            # filespace for the new experiment.
     744            self.log.info("[start_segment]: creating script file")
     745            try:
     746                sf, scriptname = tempfile.mkstemp()
     747                scriptfile = os.fdopen(sf, 'w')
     748            except IOError:
     749                return False
     750
     751            scriptbase = os.path.basename(scriptname)
     752
     753            # Script the filesystem changes
     754            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
     755            # Clear and create the tarfiles and rpm directories
     756            for d in (tarfiles_dir, rpms_dir):
     757                print >>scriptfile, "/bin/rm -rf %s/*" % d
     758                print >>scriptfile, "mkdir -p %s" % d
     759            print >>scriptfile, 'mkdir -p %s' % proj_dir
     760            self.create_config_tree("%s/%s" % (tmpdir, tb),
     761                    proj_dir, scriptfile)
     762            if os.path.isdir("%s/tarfiles" % tmpdir):
     763                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
     764                        scriptfile)
     765            if os.path.isdir("%s/rpms" % tmpdir):
     766                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,
     767                        scriptfile)
     768            print >>scriptfile, "rm -f %s" % scriptbase
     769            scriptfile.close()
     770
     771            # Move the script to the remote machine
     772            # XXX: could collide tempfile names on the remote host
     773            if self.scp_file(scriptname, user, host, scriptbase):
     774                os.remove(scriptname)
     775            else:
     776                return False
     777
     778            # Execute the script (and the script's last line deletes it)
     779            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
     780                return False
     781
     782            for f in base_confs:
     783                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
     784                        "%s/%s" % (proj_dir, f)):
     785                    return False
     786            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
     787                    proj_dir):
     788                return False
     789            if os.path.isdir("%s/tarfiles" % tmpdir):
     790                if not self.ship_configs(host, user,
     791                        "%s/tarfiles" % tmpdir, tarfiles_dir):
     792                    return False
     793            if os.path.isdir("%s/rpms" % tmpdir):
     794                if not self.ship_configs(host, user,
     795                        "%s/rpms" % tmpdir, tarfiles_dir):
     796                    return False
     797            # Stage the new configuration (active experiments will stay swapped
     798            # in now)
     799            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
     800            try:
     801                if not self.ssh_cmd(user, host,
     802                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
     803                                (pid, eid, tclfile),
     804                        "modexp", timeout= 60 * 10):
     805                    return False
     806            except self.ssh_cmd_timeout:
     807                self.log.error("Modify command failed to complete in time")
     808                # There's really no way to see if this succeeded or failed, so
     809                # if it hangs, assume the worst.
     810                return False
     811            # Active experiments are still swapped, this swaps the others in.
     812            if state != 'active':
     813                self.log.info("[start_segment]: Swapping %s in on %s" % \
     814                        (eid, tb))
     815                timedout = False
     816                try:
     817                    if not self.ssh_cmd(user, host,
     818                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
     819                            "swapexp", timeout=10*60):
     820                        return False
     821                except self.ssh_cmd_timeout:
     822                    timedout = True
     823               
     824                # If the command was terminated, but completed successfully,
     825                # report success.
     826                if timedout:
     827                    self.log.debug("[start_segment]: swapin timed out " +\
     828                            "checking state")
     829                    state = self.get_state(user, host, tb, pid, eid)
     830                    self.log.debug("[start_segment]: state is %s" % state)
     831                    return state == 'active'
     832            # Everything has gone OK.
     833            return True
    827834
    828835    class stop_segment(emulab_segment):
    829         def __init__(self, log=None, keyfile=None, debug=False):
    830             experiment_control_local.emulab_segment.__init__(self,
    831                     log=log, keyfile=keyfile, debug=debug)
    832 
    833         def __call__(self, tb, eid, tbparams):
    834             """
    835             Stop a sub experiment by calling swapexp on the federant
    836             """
    837             user = tbparams[tb]['user']
    838             host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
    839             pid = tbparams[tb]['project']
    840 
    841             self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
    842             rv = False
    843             try:
    844                 # Clean out tar files: we've gone over quota in the past
    845                 self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
    846                 self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
    847                         (pid, eid))
    848                 rv = self.ssh_cmd(user, host,
    849                         "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
    850             except self.ssh_cmd_timeout:
    851                 rv = False
    852             return rv
     836        def __init__(self, log=None, keyfile=None, debug=False):
     837            experiment_control_local.emulab_segment.__init__(self,
     838                    log=log, keyfile=keyfile, debug=debug)
     839
     840        def __call__(self, tb, eid, tbparams):
     841            """
     842            Stop a sub experiment by calling swapexp on the federant
     843            """
     844            user = tbparams[tb]['user']
     845            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     846            pid = tbparams[tb]['project']
     847
     848            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
     849            rv = False
     850            try:
     851                # Clean out tar files: we've gone over quota in the past
     852                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
     853                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
     854                        (pid, eid))
     855                rv = self.ssh_cmd(user, host,
     856                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
     857            except self.ssh_cmd_timeout:
     858                rv = False
     859            return rv
    853860
    854861       
     
    9961003                    "Failed to open file in genviz")
    9971004
    998         try:
    999             dnull = open('/dev/null', 'w')
    1000         except IOError:
    1001             service_error(service_error.internal,
     1005        try:
     1006            dnull = open('/dev/null', 'w')
     1007        except IOError:
     1008            service_error(service_error.internal,
    10021009                    "Failed to open /dev/null in genviz")
    10031010
     
    10231030        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
    10241031                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
    1025                 close_fds=True)
    1026         dnull.close()
     1032                close_fds=True)
     1033        dnull.close()
    10271034
    10281035        # Translate dot to vis format
     
    16871694
    16881695    def allocate_resources(self, allocated, master, eid, expid, expcert,
    1689             tbparams, tmpdir, alloc_log=None):
    1690         started = { }           # Testbeds where a sub-experiment started
    1691                                 # successfully
     1696            tbparams, tmpdir, alloc_log=None):
     1697        started = { }           # Testbeds where a sub-experiment started
     1698                                # successfully
    16921699
    16931700        # XXX
    16941701        fail_soft = False
    16951702
    1696         log = alloc_log or self.log
     1703        log = alloc_log or self.log
    16971704
    16981705        thread_pool = self.thread_pool(self.nthreads)
     
    17041711            thread_pool.wait_for_slot()
    17051712            t  = self.pooled_thread(\
    1706                     target=self.start_segment(log=log,
    1707                         keyfile=self.ssh_privkey_file, debug=self.debug),
     1713                    target=self.start_segment(log=log,
     1714                        keyfile=self.ssh_privkey_file, debug=self.debug),
    17081715                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    17091716                    pdata=thread_pool, trace_file=self.trace_file)
     
    17181725
    17191726        if len(failed) == 0:
    1720             starter = self.start_segment(log=log,
    1721                     keyfile=self.ssh_privkey_file, debug=self.debug)
     1727            starter = self.start_segment(log=log,
     1728                    keyfile=self.ssh_privkey_file, debug=self.debug)
    17221729            if not starter(master, eid, tbparams, tmpdir):
    17231730                failed.append(master)
     
    17321739                    thread_pool.wait_for_slot()
    17331740                    t  = self.pooled_thread(\
    1734                             target=self.stop_segment(log=log,
    1735                                 keyfile=self.ssh_privkey_file,
    1736                                 debug=self.debug),
     1741                            target=self.stop_segment(log=log,
     1742                                keyfile=self.ssh_privkey_file,
     1743                                debug=self.debug),
    17371744                            args=(tb, eid, tbparams), name=tb,
    17381745                            pdata=thread_pool, trace_file=self.trace_file)
     
    17471754                self.state_lock.acquire()
    17481755                self.state[eid]['experimentStatus'] = 'failed'
    1749                 if self.state_filename: self.write_state()
     1756                if self.state_filename: self.write_state()
    17501757                self.state_lock.release()
    17511758
     
    17531760                #    "Swap in failed on %s" % ",".join(failed))
    17541761                log.error("Swap in failed on %s" % ",".join(failed))
    1755                 return
     1762                return
    17561763        else:
    17571764            log.info("[start_segment]: Experiment %s active" % eid)
     
    17691776        # Insert the experiment into our state and update the disk copy
    17701777        self.state_lock.acquire()
    1771         self.state[expid]['experimentStatus'] = 'active'
     1778        self.state[expid]['experimentStatus'] = 'active'
    17721779        self.state[eid] = self.state[expid]
    17731780        if self.state_filename: self.write_state()
    17741781        self.state_lock.release()
    1775         return
     1782        return
    17761783
    17771784    def create_experiment(self, req, fid):
     
    18441851        if req.has_key('experimentID') and \
    18451852                req['experimentID'].has_key('localname'):
    1846             overwrite = False
     1853            overwrite = False
    18471854            eid = req['experimentID']['localname']
    1848             # If there's an old failed experiment here with the same local name
    1849             # and accessible by this user, we'll overwrite it, otherwise we'll
    1850             # fall through and do the collision avoidance.
    1851             old_expid = self.get_experiment_fedid(eid)
    1852             if old_expid and self.check_experiment_access(fid, old_expid):
    1853                 self.state_lock.acquire()
    1854                 status = self.state[eid].get('experimentStatus', None)
    1855                 if status and status == 'failed':
    1856                     # remove the old access attribute
    1857                     self.auth.unset_attribute(fid, old_expid)
    1858                     overwrite = True
    1859                     del self.state[eid]
    1860                     del self.state[old_expid]
    1861                 self.state_lock.release()
     1855            # If there's an old failed experiment here with the same local name
     1856            # and accessible by this user, we'll overwrite it, otherwise we'll
     1857            # fall through and do the collision avoidance.
     1858            old_expid = self.get_experiment_fedid(eid)
     1859            if old_expid and self.check_experiment_access(fid, old_expid):
     1860                self.state_lock.acquire()
     1861                status = self.state[eid].get('experimentStatus', None)
     1862                if status and status == 'failed':
     1863                    # remove the old access attribute
     1864                    self.auth.unset_attribute(fid, old_expid)
     1865                    overwrite = True
     1866                    del self.state[eid]
     1867                    del self.state[old_expid]
     1868                self.state_lock.release()
    18621869            self.state_lock.acquire()
    18631870            while (self.state.has_key(eid) and not overwrite):
    18641871                eid += random.choice(string.ascii_letters)
    1865             # Initial state
     1872            # Initial state
    18661873            self.state[eid] = {
    1867                     'experimentID' : \
    1868                             [ { 'localname' : eid }, {'fedid': expid } ],
    1869                     'experimentStatus': 'starting',
    1870                     'experimentAccess': { 'X509' : expcert },
    1871                     'owner': fid,
    1872                     'log' : [],
    1873                 }
    1874             self.state[expid] = self.state[eid]
     1874                    'experimentID' : \
     1875                            [ { 'localname' : eid }, {'fedid': expid } ],
     1876                    'experimentStatus': 'starting',
     1877                    'experimentAccess': { 'X509' : expcert },
     1878                    'owner': fid,
     1879                    'log' : [],
     1880                }
     1881            self.state[expid] = self.state[eid]
    18751882            if self.state_filename: self.write_state()
    18761883            self.state_lock.release()
     
    18841891                for i in range(0,5):
    18851892                    eid += random.choice(string.ascii_letters)
    1886             # Initial state
     1893            # Initial state
    18871894            self.state[eid] = {
    1888                     'experimentID' : \
    1889                             [ { 'localname' : eid }, {'fedid': expid } ],
    1890                     'experimentStatus': 'starting',
    1891                     'experimentAccess': { 'X509' : expcert },
    1892                     'owner': fid,
    1893                     'log' : [],
    1894                 }
    1895             self.state[expid] = self.state[eid]
     1895                    'experimentID' : \
     1896                            [ { 'localname' : eid }, {'fedid': expid } ],
     1897                    'experimentStatus': 'starting',
     1898                    'experimentAccess': { 'X509' : expcert },
     1899                    'owner': fid,
     1900                    'log' : [],
     1901                }
     1902            self.state[expid] = self.state[eid]
    18961903            if self.state_filename: self.write_state()
    18971904            self.state_lock.release()
     
    19351942
    19361943                self.log.debug("running local splitter %s", " ".join(tclcmd))
    1937                 # This is just fantastic.  As a side effect the parser copies
    1938                 # tb_compat.tcl into the current directory, so that directory
    1939                 # must be writable by the fedd user.  Doing this in the
    1940                 # temporary subdir ensures this is the case.
     1944                # This is just fantastic.  As a side effect the parser copies
     1945                # tb_compat.tcl into the current directory, so that directory
     1946                # must be writable by the fedd user.  Doing this in the
     1947                # temporary subdir ensures this is the case.
    19411948                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True,
    1942                         cwd=tmpdir)
     1949                        cwd=tmpdir)
    19431950                split_data = tclparser.stdout
    19441951
     
    20022009                    }
    20032010
    2004             self.state_lock.acquire()
    2005             self.state[eid]['vtopo'] = vtopo
    2006             self.state[eid]['vis'] = vis
    2007             self.state[expid]['federant'] = \
    2008                     [ tbparams[tb]['federant'] for tb in tbparams.keys() \
    2009                         if tbparams[tb].has_key('federant') ]
     2011            self.state_lock.acquire()
     2012            self.state[eid]['vtopo'] = vtopo
     2013            self.state[eid]['vis'] = vis
     2014            self.state[expid]['federant'] = \
     2015                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
     2016                        if tbparams[tb].has_key('federant') ]
    20102017            if self.state_filename: self.write_state()
    2011             self.state_lock.release()
     2018            self.state_lock.release()
    20122019
    20132020            # Copy tarfiles and rpms needed at remote sites into a staging area
     
    20502057            # If something goes wrong in the parse (usually an access error)
    20512058            # clear the placeholder state.  From here on out the code delays
    2052             # exceptions.  Failing at this point returns a fault to the remote
    2053             # caller.
     2059            # exceptions.  Failing at this point returns a fault to the remote
     2060            # caller.
    20542061            self.state_lock.acquire()
    20552062            del self.state[eid]
     
    20602067
    20612068
    2062         # Start the background swapper and return the starting state.  From
    2063         # here on out, the state will stick around a while.
    2064 
    2065         # Let users touch the state
     2069        # Start the background swapper and return the starting state.  From
     2070        # here on out, the state will stick around a while.
     2071
     2072        # Let users touch the state
    20662073        self.auth.set_attribute(fid, expid)
    20672074        self.auth.set_attribute(expid, expid)
    2068         # Override fedids can manipulate state as well
    2069         for o in self.overrides:
    2070             self.auth.set_attribute(o, expid)
    2071 
    2072         # Create a logger that logs to the experiment's state object as well as
    2073         # to the main log file.
    2074         alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
    2075         h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
    2076         # XXX: there should be a global one of these rather than repeating the
    2077         # code.
    2078         h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
    2079                     '%d %b %y %H:%M:%S'))
    2080         alloc_log.addHandler(h)
    2081        
    2082         # Start a thread to do the resource allocation
    2083         t  = Thread(target=self.allocate_resources,
    2084                 args=(allocated, master, eid, expid, expcert, tbparams,
    2085                     tmpdir, alloc_log),
    2086                 name=eid)
    2087         t.start()
    2088 
    2089         rv = {
    2090                 'experimentID': [
    2091                     {'localname' : eid }, { 'fedid': copy.copy(expid) }
    2092                 ],
    2093                 'experimentStatus': 'starting',
    2094                 'experimentAccess': { 'X509' : expcert }
    2095             }
    2096 
    2097         return rv
     2075        # Override fedids can manipulate state as well
     2076        for o in self.overrides:
     2077            self.auth.set_attribute(o, expid)
     2078
     2079        # Create a logger that logs to the experiment's state object as well as
     2080        # to the main log file.
     2081        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
     2082        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     2083        # XXX: there should be a global one of these rather than repeating the
     2084        # code.
     2085        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2086                    '%d %b %y %H:%M:%S'))
     2087        alloc_log.addHandler(h)
     2088       
     2089        # Start a thread to do the resource allocation
     2090        t  = Thread(target=self.allocate_resources,
     2091                args=(allocated, master, eid, expid, expcert, tbparams,
     2092                    tmpdir, alloc_log),
     2093                name=eid)
     2094        t.start()
     2095
     2096        rv = {
     2097                'experimentID': [
     2098                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
     2099                ],
     2100                'experimentStatus': 'starting',
     2101                'experimentAccess': { 'X509' : expcert }
     2102            }
     2103
     2104        return rv
     2105   
     2106
     2107    def new_create_experiment(self, req, fid):
     2108        """
     2109        The external interface to experiment creation called from the
     2110        dispatcher.
     2111
     2112        Creates a working directory, splits the incoming description using the
     2113        splitter script and parses out the avrious subsections using the
     2114        lcasses above.  Once each sub-experiment is created, use pooled threads
     2115        to instantiate them and start it all up.
     2116        """
     2117
     2118        if not self.auth.check_attribute(fid, 'create'):
     2119            raise service_error(service_error.access, "Create access denied")
     2120
     2121        try:
     2122            tmpdir = tempfile.mkdtemp(prefix="split-")
     2123        except IOError:
     2124            raise service_error(service_error.internal, "Cannot create tmp dir")
     2125
     2126        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
     2127        gw_secretkey_base = "fed.%s" % self.ssh_type
     2128        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
     2129        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
     2130        tclfile = tmpdir + "/experiment.tcl"
     2131        tbparams = { }
     2132        try:
     2133            access_user = self.accessdb[fid]
     2134        except KeyError:
     2135            raise service_error(service_error.internal,
     2136                    "Access map and authorizer out of sync in " + \
     2137                            "create_experiment for fedid %s"  % fid)
     2138
     2139        pid = "dummy"
     2140        gid = "dummy"
     2141        try:
     2142            os.mkdir(tmpdir+"/keys")
     2143        except OSError:
     2144            raise service_error(service_error.internal,
     2145                    "Can't make temporary dir")
     2146
     2147        req = req.get('CreateRequestBody', None)
     2148        if not req:
     2149            raise service_error(service_error.req,
     2150                    "Bad request format (no CreateRequestBody)")
     2151        # The tcl parser needs to read a file so put the content into that file
     2152        descr=req.get('experimentdescription', None)
     2153        if descr:
     2154            file_content=descr.get('ns2description', None)
     2155            if file_content:
     2156                try:
     2157                    f = open(tclfile, 'w')
     2158                    f.write(file_content)
     2159                    f.close()
     2160                except IOError:
     2161                    raise service_error(service_error.internal,
     2162                            "Cannot write temp experiment description")
     2163            else:
     2164                raise service_error(service_error.req,
     2165                        "Only ns2descriptions supported")
     2166        else:
     2167            raise service_error(service_error.req, "No experiment description")
     2168
     2169        # Generate an ID for the experiment (slice) and a certificate that the
     2170        # allocator can use to prove they own it.  We'll ship it back through
     2171        # the encrypted connection.
     2172        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
     2173
     2174        if req.has_key('experimentID') and \
     2175                req['experimentID'].has_key('localname'):
     2176            overwrite = False
     2177            eid = req['experimentID']['localname']
     2178            # If there's an old failed experiment here with the same local name
     2179            # and accessible by this user, we'll overwrite it, otherwise we'll
     2180            # fall through and do the collision avoidance.
     2181            old_expid = self.get_experiment_fedid(eid)
     2182            if old_expid and self.check_experiment_access(fid, old_expid):
     2183                self.state_lock.acquire()
     2184                status = self.state[eid].get('experimentStatus', None)
     2185                if status and status == 'failed':
     2186                    # remove the old access attribute
     2187                    self.auth.unset_attribute(fid, old_expid)
     2188                    overwrite = True
     2189                    del self.state[eid]
     2190                    del self.state[old_expid]
     2191                self.state_lock.release()
     2192            self.state_lock.acquire()
     2193            while (self.state.has_key(eid) and not overwrite):
     2194                eid += random.choice(string.ascii_letters)
     2195            # Initial state
     2196            self.state[eid] = {
     2197                    'experimentID' : \
     2198                            [ { 'localname' : eid }, {'fedid': expid } ],
     2199                    'experimentStatus': 'starting',
     2200                    'experimentAccess': { 'X509' : expcert },
     2201                    'owner': fid,
     2202                    'log' : [],
     2203                }
     2204            self.state[expid] = self.state[eid]
     2205            if self.state_filename: self.write_state()
     2206            self.state_lock.release()
     2207        else:
     2208            eid = self.exp_stem
     2209            for i in range(0,5):
     2210                eid += random.choice(string.ascii_letters)
     2211            self.state_lock.acquire()
     2212            while (self.state.has_key(eid)):
     2213                eid = self.exp_stem
     2214                for i in range(0,5):
     2215                    eid += random.choice(string.ascii_letters)
     2216            # Initial state
     2217            self.state[eid] = {
     2218                    'experimentID' : \
     2219                            [ { 'localname' : eid }, {'fedid': expid } ],
     2220                    'experimentStatus': 'starting',
     2221                    'experimentAccess': { 'X509' : expcert },
     2222                    'owner': fid,
     2223                    'log' : [],
     2224                }
     2225            self.state[expid] = self.state[eid]
     2226            if self.state_filename: self.write_state()
     2227            self.state_lock.release()
     2228
     2229        try:
     2230            # This catches exceptions to clear the placeholder if necessary
     2231            try:
     2232                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
     2233            except ValueError:
     2234                raise service_error(service_error.server_config,
     2235                        "Bad key type (%s)" % self.ssh_type)
     2236
     2237            user = req.get('user', None)
     2238            if user == None:
     2239                raise service_error(service_error.req, "No user")
     2240
     2241            master = req.get('master', None)
     2242            if not master:
     2243                raise service_error(service_error.req,
     2244                        "No master testbed label")
     2245            export_project = req.get('exportProject', None)
     2246            if not export_project:
     2247                raise service_error(service_error.req, "No export project")
     2248           
     2249            if self.splitter_url:
     2250                self.log.debug("Calling remote splitter at %s" % \
     2251                        self.splitter_url)
     2252                split_data = self.remote_splitter(self.splitter_url,
     2253                        file_content, master)
     2254            else:
     2255                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x',
     2256                    str(self.muxmax), '-m', master]
     2257
     2258                if self.fedkit:
     2259                    tclcmd.append('-k')
     2260
     2261                if self.gatewaykit:
     2262                    tclcmd.append('-K')
     2263
     2264                tclcmd.extend([pid, gid, eid, tclfile])
     2265
     2266                self.log.debug("running local splitter %s", " ".join(tclcmd))
     2267                # This is just fantastic.  As a side effect the parser copies
     2268                # tb_compat.tcl into the current directory, so that directory
     2269                # must be writable by the fedd user.  Doing this in the
     2270                # temporary subdir ensures this is the case.
     2271                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True,
     2272                        cwd=tmpdir)
     2273                split_data = tclparser.stdout
     2274
     2275            allocated = { }         # Testbeds we can access
     2276# XXX here's where we're working
     2277            def out_topo(filename, t):
     2278                try:
     2279                    f = open("/tmp/%s" % filename, "w")
     2280                    print >> f, "%s" % \
     2281                            topdl.topology_to_xml(t, top="experiment")
     2282                    f.close()
     2283                except IOError, e:
     2284                    raise service_error(service_error.internal, "Can't open file")
     2285
     2286            try:
     2287
     2288                top = topdl.topology_from_xml(file=split_data, top="experiment")
     2289                subs = sorted(top.substrates,
     2290                        cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
     2291                        reverse=True)
     2292                ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
     2293                for s in subs:
     2294                    a = ips.allocate(len(s.interfaces)+2)
     2295                    if a :
     2296                        base, num = a
     2297                        if num < len(s.interfaces) +2 :
     2298                            raise service_error(service_error.internal,
     2299                                    "Allocator returned wrong number of IPs??")
     2300                    else:
     2301                        raise service_error(service_error.req,
     2302                                "Cannot allocate IP addresses")
     2303
     2304                    base += 1
     2305                    for i in s.interfaces:
     2306                        i.attribute.append(
     2307                                topdl.Attribute('ip4_address',
     2308                                    "%s" % ip_addr(base)))
     2309                        base += 1
     2310
     2311                testbeds = set([ a.value for e in top.elements \
     2312                        for a in e.attribute \
     2313                            if a.attribute == 'testbed'] )
     2314                topo ={ }
     2315                for tb in testbeds:
     2316                    self.get_access(tb, None, user, tbparams, master,
     2317                            export_project, access_user)
     2318                    topo[tb] = top.clone()
     2319                    to_delete = [ ]
     2320                    for e in topo[tb].elements:
     2321                        etb = e.get_attribute('testbed')
     2322                        if etb and etb != tb:
     2323                            for i in e.interface:
     2324                                for s in i.subs:
     2325                                    try:
     2326                                        s.interfaces.remove(i)
     2327                                    except ValueError:
     2328                                        raise service_error(service_error.internal,
     2329                                                "Can't remove interface??")
     2330                            to_delete.append(e)
     2331                    for e in to_delete:
     2332                        topo[tb].elements.remove(e)
     2333                    topo[tb].make_indices()
     2334
     2335
     2336
     2337                for s in top.substrates:
     2338                    tests = { }
     2339                    for i in s.interfaces:
     2340                        e = i.element
     2341                        tb = e.get_attribute('testbed')
     2342                        if tb and not tests.has_key(tb):
     2343                            for i in e.interface:
     2344                                if s in i.subs:
     2345                                    tests[tb]= \
     2346                                            i.get_attribute('ip4_address')
     2347                    if len(tests) < 2:
     2348                        continue
     2349
     2350                    # More than one testbed is on this substrate.  Insert
     2351                    # some gateways into the subtopologies.
     2352
     2353                    for st in tests.keys():
     2354                        for dt in [ t for t in tests.keys() if t != st]:
     2355                            myname =  "%stunnel" % dt
     2356                            desthost  =  "%stunnel" % st
     2357                            sproject = tbparams[st].get('project', 'project')
     2358                            dproject = tbparams[dt].get('project', 'project')
     2359                            sdomain = ".%s.%s%s" % (eid, sproject,
     2360                                    tbparams[st].get('domain', ".example.com"))
     2361                            ddomain = ".%s.%s%s" % (eid, dproject,
     2362                                    tbparams[dt].get('domain', ".example.com"))
     2363                            boss = tbparams[master].get('boss', "boss")
     2364                            fs = tbparams[master].get('fs', "fs")
     2365                            event_server = "%s%s" % \
     2366                                    (tbparams[st].get('eventserver', "event_server"),
     2367                                            tbparams[dt].get('domain', "example.com"))
     2368                            remote_event_server = "%s%s" % \
     2369                                    (tbparams[dt].get('eventserver', "event_server"),
     2370                                            tbparams[dt].get('domain', "example.com"))
     2371                            seer_control = "%s%s" % \
     2372                                    (tbparams[st].get('control', "control"), sdomain)
     2373                            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
     2374                            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
     2375                            conf_file = "%s%s.gw.conf" % (myname, sdomain)
     2376                            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
     2377                            # translate to lower case so the `hostname` hack for specifying
     2378                            # configuration files works.
     2379                            conf_file = conf_file.lower();
     2380                            remote_conf_file = remote_conf_file.lower();
     2381                            active = ("%s" % (st == master))
     2382                            portal = topdl.Computer(**{
     2383                                    'name': "%stunnel" % dt,
     2384                                    'attribute' : [{
     2385                                        'attribute': n,
     2386                                        'value': v,
     2387                                        } for n, v in (\
     2388                                                ('gateway', 'true'),
     2389                                                ('boss', boss),
     2390                                                ('fs', fs),
     2391                                                ('event_server', event_server),
     2392                                                ('remote_event_server', remote_event_server),
     2393                                                ('seer_control', seer_control),
     2394                                                ('local_key_dir', local_key_dir),
     2395                                                ('remote_conf_dir', remote_conf_dir),
     2396                                                ('conf_file', conf_file),
     2397                                                ('remote_conf_file', remote_conf_file),
     2398                                                ('remote_script_dir', "/usr/local/federation/bin"),
     2399                                                ('local_script_dir', "/usr/local/federation/bin"),
     2400                                                )],
     2401                                    'interface': [{
     2402                                        'substrate': s.name,
     2403                                        'attribute': [ {
     2404                                            'attribute': 'ip4_addreess',
     2405                                            'value': tests[dt],
     2406                                            }, ],
     2407                                        }, ],
     2408                                    })
     2409                            topo[st].elements.append(portal)
     2410                # Connect the gateway nodes into the topologies and clear out
     2411                # substrates that are not in the topologies
     2412                for tb in testbeds:
     2413                    topo[tb].incorporate_elements()
     2414                    topo[tb].substrates = \
     2415                            [s for s in topo[tb].substrates \
     2416                                if len(s.interfaces) >0]
     2417
     2418                softdir ="%s/software" % tmpdir
     2419                softmap = { }
     2420                os.mkdir(softdir)
     2421                pkgs = set([fedkit, gatewaykit])
     2422                pkgs.update([x.location for e in top.elements \
     2423                        for x in e.software])
     2424                for pkg in pkgs:
     2425                    loc = pkg
     2426
     2427                    scheme, host, path = urlparse(loc)[0:3]
     2428                    dest = os.path.basename(path)
     2429                    if not scheme:
     2430                        if not loc.startswith('/'):
     2431                            loc = "/%s" % loc
     2432                        loc = "file://%s" %loc
     2433                    try:
     2434                        u = urlopen(loc)
     2435                    except Exception, e:
     2436                        raise service_error(service_error.req,
     2437                                "Cannot open %s: %s" % (loc, e))
     2438                    try:
     2439                        f = open("%s/%s" % (softdir, dest) , "w")
     2440                        data = u.read(4096)
     2441                        while data:
     2442                            f.write(data)
     2443                            data = u.read(4096)
     2444                        f.close()
     2445                        u.close()
     2446                    except Exception, e:
     2447                        raise service_error(service_error.internal,
     2448                                "Could not copy %s: %s" % (loc, e))
     2449                    path = re.sub("/tmp", "", softdir)
     2450                    # XXX
     2451                    softmap[pkg] = \
     2452                            "https://users.isi.deterlab.net:23232/%s/%s" %\
     2453                            ( path, dest)
     2454
     2455                # Convert the software locations in the segments into the local
     2456                # copies on this host
     2457                for soft in [ s for tb in topo.values() \
     2458                        for e in tb.elements \
     2459                            for s in e.software ]:
     2460                    if softmap.has_key(soft.location):
     2461                        soft.location = softmap[soft.location]
     2462                for tb in testbeds:
     2463                    out_topo("%s.xml" %tb, topo[tb])
     2464
     2465                vtopo = topdl.topology_to_vtopo(top)
     2466                vis = self.genviz(vtopo)
     2467
     2468            except Exception, e:
     2469                traceback.print_exc()
     2470                raise service_error(service_error.internal, "%s"  % e)
     2471
     2472
     2473
     2474            # Build the testbed topologies:
     2475
     2476
     2477            if True:
     2478                raise service_error(service_error.internal, "Developing")
     2479
     2480# XXX old code
     2481            # Objects to parse the splitter output (defined above)
     2482            parse_current_testbed = self.current_testbed(eid, tmpdir,
     2483                    self.fedkit, self.gatewaykit)
     2484            parse_allbeds = self.allbeds(self.get_access)
     2485            parse_gateways = self.gateways(eid, master, tmpdir,
     2486                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
     2487                    self.fedkit)
     2488            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
     2489                        "^#\s+End\s+Vtopo")
     2490            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
     2491                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
     2492            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
     2493                    "^#\s+End\s+tarfiles")
     2494            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
     2495                    "^#\s+End\s+rpms")
     2496
     2497            # Working on the split data
     2498            for line in split_data:
     2499                line = line.rstrip()
     2500                if parse_current_testbed(line, master, allocated, tbparams):
     2501                    continue
     2502                elif parse_allbeds(line, user, tbparams, master, export_project,
     2503                        access_user):
     2504                    continue
     2505                elif parse_gateways(line, allocated, tbparams):
     2506                    continue
     2507                elif parse_vtopo(line):
     2508                    continue
     2509                elif parse_hostnames(line):
     2510                    continue
     2511                elif parse_tarfiles(line):
     2512                    continue
     2513                elif parse_rpms(line):
     2514                    continue
     2515                else:
     2516                    raise service_error(service_error.internal,
     2517                            "Bad tcl parse? %s" % line)
     2518            # Virtual topology and visualization
     2519            vtopo = self.gentopo(parse_vtopo.str)
     2520            if not vtopo:
     2521                raise service_error(service_error.internal,
     2522                        "Failed to generate virtual topology")
     2523
     2524            vis = self.genviz(vtopo)
     2525            if not vis:
     2526                raise service_error(service_error.internal,
     2527                        "Failed to generate visualization")
     2528
     2529           
     2530            # save federant information
     2531            for k in allocated.keys():
     2532                tbparams[k]['federant'] = {\
     2533                        'name': [ { 'localname' : eid} ],\
     2534                        'emulab': tbparams[k]['emulab'],\
     2535                        'allocID' : tbparams[k]['allocID'],\
     2536                        'master' : k == master,\
     2537                    }
     2538
     2539            self.state_lock.acquire()
     2540            self.state[eid]['vtopo'] = vtopo
     2541            self.state[eid]['vis'] = vis
     2542            self.state[expid]['federant'] = \
     2543                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
     2544                        if tbparams[tb].has_key('federant') ]
     2545            if self.state_filename: self.write_state()
     2546            self.state_lock.release()
     2547
     2548            # Copy tarfiles and rpms needed at remote sites into a staging area
     2549            try:
     2550                if self.fedkit:
     2551                    for t in self.fedkit:
     2552                        parse_tarfiles.list.append(t[1])
     2553                if self.gatewaykit:
     2554                    for t in self.gatewaykit:
     2555                        parse_tarfiles.list.append(t[1])
     2556                for t in parse_tarfiles.list:
     2557                    if not os.path.exists("%s/tarfiles" % tmpdir):
     2558                        os.mkdir("%s/tarfiles" % tmpdir)
     2559                    self.copy_file(t, "%s/tarfiles/%s" % \
     2560                            (tmpdir, os.path.basename(t)))
     2561                for r in parse_rpms.list:
     2562                    if not os.path.exists("%s/rpms" % tmpdir):
     2563                        os.mkdir("%s/rpms" % tmpdir)
     2564                    self.copy_file(r, "%s/rpms/%s" % \
     2565                            (tmpdir, os.path.basename(r)))
     2566                # A null experiment file in case we need to create a remote
     2567                # experiment from scratch
     2568                f = open("%s/null.tcl" % tmpdir, "w")
     2569                print >>f, """
     2570set ns [new Simulator]
     2571source tb_compat.tcl
     2572
     2573set a [$ns node]
     2574
     2575$ns rtproto Session
     2576$ns run
     2577"""
     2578                f.close()
     2579
     2580            except IOError, e:
     2581                raise service_error(service_error.internal,
     2582                        "Cannot stage tarfile/rpm: %s" % e.strerror)
     2583
     2584        except service_error, e:
     2585            # If something goes wrong in the parse (usually an access error)
     2586            # clear the placeholder state.  From here on out the code delays
     2587            # exceptions.  Failing at this point returns a fault to the remote
     2588            # caller.
     2589            self.state_lock.acquire()
     2590            del self.state[eid]
     2591            del self.state[expid]
     2592            if self.state_filename: self.write_state()
     2593            self.state_lock.release()
     2594            raise e
     2595
     2596
     2597        # Start the background swapper and return the starting state.  From
     2598        # here on out, the state will stick around a while.
     2599
     2600        # Let users touch the state
     2601        self.auth.set_attribute(fid, expid)
     2602        self.auth.set_attribute(expid, expid)
     2603        # Override fedids can manipulate state as well
     2604        for o in self.overrides:
     2605            self.auth.set_attribute(o, expid)
     2606
     2607        # Create a logger that logs to the experiment's state object as well as
     2608        # to the main log file.
     2609        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
     2610        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
     2611        # XXX: there should be a global one of these rather than repeating the
     2612        # code.
     2613        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2614                    '%d %b %y %H:%M:%S'))
     2615        alloc_log.addHandler(h)
     2616       
     2617        # Start a thread to do the resource allocation
     2618        t  = Thread(target=self.allocate_resources,
     2619                args=(allocated, master, eid, expid, expcert, tbparams,
     2620                    tmpdir, alloc_log),
     2621                name=eid)
     2622        t.start()
     2623
     2624        rv = {
     2625                'experimentID': [
     2626                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
     2627                ],
     2628                'experimentStatus': 'starting',
     2629                'experimentAccess': { 'X509' : expcert }
     2630            }
     2631
     2632        return rv
    20982633   
    20992634    def get_experiment_fedid(self, key):
    21002635        """
    2101         find the fedid associated with the localname key in the state database.
    2102         """
    2103 
    2104         rv = None
    2105         self.state_lock.acquire()
    2106         if self.state.has_key(key):
    2107             if isinstance(self.state[key], dict):
    2108                 try:
    2109                     kl = [ f['fedid'] for f in \
    2110                             self.state[key]['experimentID']\
    2111                                 if f.has_key('fedid') ]
    2112                 except KeyError:
    2113                     self.state_lock.release()
    2114                     raise service_error(service_error.internal,
    2115                             "No fedid for experiment %s when getting "+\
    2116                                     "fedid(!?)" % key)
    2117                 if len(kl) == 1:
    2118                     rv = kl[0]
    2119                 else:
    2120                     self.state_lock.release()
    2121                     raise service_error(service_error.internal,
    2122                             "multiple fedids for experiment %s when " +\
    2123                                     "getting fedid(!?)" % key)
    2124             else:
    2125                 self.state_lock.release()
    2126                 raise service_error(service_error.internal,
    2127                         "Unexpected state for %s" % key)
    2128         self.state_lock.release()
    2129         return rv
     2636        find the fedid associated with the localname key in the state database.
     2637        """
     2638
     2639        rv = None
     2640        self.state_lock.acquire()
     2641        if self.state.has_key(key):
     2642            if isinstance(self.state[key], dict):
     2643                try:
     2644                    kl = [ f['fedid'] for f in \
     2645                            self.state[key]['experimentID']\
     2646                                if f.has_key('fedid') ]
     2647                except KeyError:
     2648                    self.state_lock.release()
     2649                    raise service_error(service_error.internal,
     2650                            "No fedid for experiment %s when getting "+\
     2651                                    "fedid(!?)" % key)
     2652                if len(kl) == 1:
     2653                    rv = kl[0]
     2654                else:
     2655                    self.state_lock.release()
     2656                    raise service_error(service_error.internal,
     2657                            "multiple fedids for experiment %s when " +\
     2658                                    "getting fedid(!?)" % key)
     2659            else:
     2660                self.state_lock.release()
     2661                raise service_error(service_error.internal,
     2662                        "Unexpected state for %s" % key)
     2663        self.state_lock.release()
     2664        return rv
    21302665
    21312666    def check_experiment_access(self, fid, key):
     
    21362671        """
    21372672        if not isinstance(key, fedid):
    2138             key = self.get_experiment_fedid(key)
     2673            key = self.get_experiment_fedid(key)
    21392674
    21402675        if self.auth.check_attribute(fid, key):
     
    21442679
    21452680
     2681    def get_handler(self, path, fid):
     2682        print "in get_handler %s %s" % (path, fid)
     2683        return ("/users/faber/test.html", "text/html")
    21462684
    21472685    def get_vtopo(self, req, fid):
     
    21502688        """
    21512689        rv = None
    2152         state = None
     2690        state = None
    21532691
    21542692        req = req.get('VtopoRequestBody', None)
     
    21732711        self.state_lock.acquire()
    21742712        if self.state.has_key(key):
    2175             if self.state[key].has_key('vtopo'):
    2176                 rv = { 'experiment' : {keytype: key },\
    2177                         'vtopo': self.state[key]['vtopo'],\
    2178                     }
    2179             else:
    2180                 state = self.state[key]['experimentStatus']
     2713            if self.state[key].has_key('vtopo'):
     2714                rv = { 'experiment' : {keytype: key },\
     2715                        'vtopo': self.state[key]['vtopo'],\
     2716                    }
     2717            else:
     2718                state = self.state[key]['experimentStatus']
    21812719        self.state_lock.release()
    21822720
    21832721        if rv: return rv
    21842722        else:
    2185             if state:
    2186                 raise service_error(service_error.partial,
    2187                         "Not ready: %s" % state)
    2188             else:
    2189                 raise service_error(service_error.req, "No such experiment")
     2723            if state:
     2724                raise service_error(service_error.partial,
     2725                        "Not ready: %s" % state)
     2726            else:
     2727                raise service_error(service_error.req, "No such experiment")
    21902728
    21912729    def get_vis(self, req, fid):
     
    21942732        """
    21952733        rv = None
    2196         state = None
     2734        state = None
    21972735
    21982736        req = req.get('VisRequestBody', None)
     
    22172755        self.state_lock.acquire()
    22182756        if self.state.has_key(key):
    2219             if self.state[key].has_key('vis'):
    2220                 rv =  { 'experiment' : {keytype: key },\
    2221                         'vis': self.state[key]['vis'],\
    2222                         }
    2223             else:
    2224                 state = self.state[key]['experimentStatus']
     2757            if self.state[key].has_key('vis'):
     2758                rv =  { 'experiment' : {keytype: key },\
     2759                        'vis': self.state[key]['vis'],\
     2760                        }
     2761            else:
     2762                state = self.state[key]['experimentStatus']
    22252763        self.state_lock.release()
    22262764
    22272765        if rv: return rv
    22282766        else:
    2229             if state:
    2230                 raise service_error(service_error.partial,
    2231                         "Not ready: %s" % state)
    2232             else:
    2233                 raise service_error(service_error.req, "No such experiment")
     2767            if state:
     2768                raise service_error(service_error.partial,
     2769                        "Not ready: %s" % state)
     2770            else:
     2771                raise service_error(service_error.req, "No such experiment")
    22342772
    22352773    def clean_info_response(self, rv):
    2236         """
    2237         Remove the information in the experiment's state object that is not in
    2238         the info response.
    2239         """
    2240         # Remove the owner info (should always be there, but...)
    2241         if rv.has_key('owner'): del rv['owner']
    2242 
    2243         # Convert the log into the allocationLog parameter and remove the
    2244         # log entry (with defensive programming)
    2245         if rv.has_key('log'):
    2246             rv['allocationLog'] = "".join(rv['log'])
    2247             del rv['log']
    2248         else:
    2249             rv['allocationLog'] = ""
    2250 
    2251         if rv['experimentStatus'] != 'active':
    2252             if rv.has_key('federant'): del rv['federant']
    2253         else:
    2254             # remove the allocationID info from each federant
    2255             for f in rv.get('federant', []):
    2256                 if f.has_key('allocID'): del f['allocID']
    2257         return rv
     2774        """
     2775        Remove the information in the experiment's state object that is not in
     2776        the info response.
     2777        """
     2778        # Remove the owner info (should always be there, but...)
     2779        if rv.has_key('owner'): del rv['owner']
     2780
     2781        # Convert the log into the allocationLog parameter and remove the
     2782        # log entry (with defensive programming)
     2783        if rv.has_key('log'):
     2784            rv['allocationLog'] = "".join(rv['log'])
     2785            del rv['log']
     2786        else:
     2787            rv['allocationLog'] = ""
     2788
     2789        if rv['experimentStatus'] != 'active':
     2790            if rv.has_key('federant'): del rv['federant']
     2791        else:
     2792            # remove the allocationID info from each federant
     2793            for f in rv.get('federant', []):
     2794                if f.has_key('allocID'): del f['allocID']
     2795        return rv
    22582796
    22592797    def get_info(self, req, fid):
     
    22902828        self.state_lock.release()
    22912829
    2292         if rv:
    2293             return self.clean_info_response(rv)
     2830        if rv:
     2831            return self.clean_info_response(rv)
    22942832        else:
    2295             raise service_error(service_error.req, "No such experiment")
     2833            raise service_error(service_error.req, "No such experiment")
    22962834
    22972835    def get_multi_info(self, req, fid):
     
    22992837        Return all the stored info that this fedid can access
    23002838        """
    2301         rv = { 'info': [ ] }
    2302 
    2303         self.state_lock.acquire()
    2304         for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
    2305             self.check_experiment_access(fid, key)
    2306 
    2307             if self.state.has_key(key):
    2308                 e = copy.deepcopy(self.state[key])
    2309                 e = self.clean_info_response(e)
    2310                 rv['info'].append(e)
     2839        rv = { 'info': [ ] }
     2840
     2841        self.state_lock.acquire()
     2842        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
     2843            self.check_experiment_access(fid, key)
     2844
     2845            if self.state.has_key(key):
     2846                e = copy.deepcopy(self.state[key])
     2847                e = self.clean_info_response(e)
     2848                rv['info'].append(e)
    23112849        self.state_lock.release()
    2312         return rv
     2850        return rv
    23132851
    23142852
     
    23232861            raise service_error(service_error.req,
    23242862                    "Bad request format (no TerminateRequestBody)")
    2325         force = req.get('force', False)
     2863        force = req.get('force', False)
    23262864        exp = req.get('experiment', None)
    23272865        if exp:
     
    23392877        self.check_experiment_access(fid, key)
    23402878
    2341         dealloc_list = [ ]
    2342 
    2343 
    2344         # Create a logger that logs to the dealloc_list as well as to the main
    2345         # log file.
    2346         dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
    2347         h = logging.StreamHandler(self.list_log(dealloc_list))
    2348         # XXX: there should be a global one of these rather than repeating the
    2349         # code.
    2350         h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
    2351                     '%d %b %y %H:%M:%S'))
    2352         dealloc_log.addHandler(h)
     2879        dealloc_list = [ ]
     2880
     2881
     2882        # Create a logger that logs to the dealloc_list as well as to the main
     2883        # log file.
     2884        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
     2885        h = logging.StreamHandler(self.list_log(dealloc_list))
     2886        # XXX: there should be a global one of these rather than repeating the
     2887        # code.
     2888        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
     2889                    '%d %b %y %H:%M:%S'))
     2890        dealloc_log.addHandler(h)
    23532891
    23542892        self.state_lock.acquire()
     
    23612899            # remove the experiment state when the termination is complete.
    23622900
    2363             # First make sure that the experiment creation is complete.
    2364             status = fed_exp.get('experimentStatus', None)
    2365 
    2366             if status:
    2367                 if status in ('starting', 'terminating'):
    2368                     if not force:
    2369                         self.state_lock.release()
    2370                         raise service_error(service_error.partial,
    2371                                 'Experiment still being created or destroyed')
    2372                     else:
    2373                         self.log.warning('Experiment in %s state ' % status + \
    2374                                 'being terminated by force.')
    2375             else:
    2376                 # No status??? trouble
    2377                 self.state_lock.release()
    2378                 raise service_error(service_error.internal,
    2379                         "Experiment has no status!?")
     2901            # First make sure that the experiment creation is complete.
     2902            status = fed_exp.get('experimentStatus', None)
     2903
     2904            if status:
     2905                if status in ('starting', 'terminating'):
     2906                    if not force:
     2907                        self.state_lock.release()
     2908                        raise service_error(service_error.partial,
     2909                                'Experiment still being created or destroyed')
     2910                    else:
     2911                        self.log.warning('Experiment in %s state ' % status + \
     2912                                'being terminated by force.')
     2913            else:
     2914                # No status??? trouble
     2915                self.state_lock.release()
     2916                raise service_error(service_error.internal,
     2917                        "Experiment has no status!?")
    23802918
    23812919            ids = []
     
    24162954                        'aid': aid,\
    24172955                    }
    2418             fed_exp['experimentStatus'] = 'terminating'
     2956            fed_exp['experimentStatus'] = 'terminating'
    24192957            if self.state_filename: self.write_state()
    24202958            self.state_lock.release()
    24212959
    2422             # Stop everyone.  NB, wait_for_all waits until a thread starts and
    2423             # then completes, so we can't wait if nothing starts.  So, no
    2424             # tbparams, no start.
    2425             if len(tbparams) > 0:
    2426                 thread_pool = self.thread_pool(self.nthreads)
    2427                 for tb in tbparams.keys():
    2428                     # Create and start a thread to stop the segment
    2429                     thread_pool.wait_for_slot()
    2430                     t  = self.pooled_thread(\
    2431                             target=self.stop_segment(log=dealloc_log,
    2432                                 keyfile=self.ssh_privkey_file, debug=self.debug),
    2433                             args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
    2434                             pdata=thread_pool, trace_file=self.trace_file)
    2435                     t.start()
    2436                 # Wait for completions
    2437                 thread_pool.wait_for_all_done()
    2438 
    2439             # release the allocations (failed experiments have done this
    2440             # already, and starting experiments may be in odd states, so we
    2441             # ignore errors releasing those allocations
    2442             try:
    2443                 for tb in tbparams.keys():
    2444                     self.release_access(tb, tbparams[tb]['aid'])
    2445             except service_error, e:
    2446                 if status != 'failed' and not force:
    2447                     raise e
     2960            # Stop everyone.  NB, wait_for_all waits until a thread starts and
     2961            # then completes, so we can't wait if nothing starts.  So, no
     2962            # tbparams, no start.
     2963            if len(tbparams) > 0:
     2964                thread_pool = self.thread_pool(self.nthreads)
     2965                for tb in tbparams.keys():
     2966                    # Create and start a thread to stop the segment
     2967                    thread_pool.wait_for_slot()
     2968                    t  = self.pooled_thread(\
     2969                            target=self.stop_segment(log=dealloc_log,
     2970                                keyfile=self.ssh_privkey_file, debug=self.debug),
     2971                            args=(tb, tbparams[tb]['eid'], tbparams), name=tb,
     2972                            pdata=thread_pool, trace_file=self.trace_file)
     2973                    t.start()
     2974                # Wait for completions
     2975                thread_pool.wait_for_all_done()
     2976
     2977            # release the allocations (failed experiments have done this
     2978            # already, and starting experiments may be in odd states, so we
     2979            # ignore errors releasing those allocations
     2980            try:
     2981                for tb in tbparams.keys():
     2982                    self.release_access(tb, tbparams[tb]['aid'])
     2983            except service_error, e:
     2984                if status != 'failed' and not force:
     2985                    raise e
    24482986
    24492987            # Remove the terminated experiment
     
    24552993            self.state_lock.release()
    24562994
    2457             return {
    2458                     'experiment': exp ,
    2459                     'deallocationLog': "".join(dealloc_list),
    2460                     }
     2995            return {
     2996                    'experiment': exp ,
     2997                    'deallocationLog': "".join(dealloc_list),
     2998                    }
    24612999        else:
    24623000            # Don't forget to release the lock
Note: See TracChangeset for help on using the changeset viewer.