Ignore:
Timestamp:
Aug 28, 2009 6:07:42 PM (15 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-2.00, version-3.01, version-3.02
Children:
6c57fe9
Parents:
4c8a0b7
Message:

checkpoint

File:
1 edited

Legend:

Unmodified
Added
Removed
  • fedd/federation/access.py

    r4c8a0b7 rcc8d8e9  
    105105            'RequestAccess': soap_handler("RequestAccess", self.RequestAccess),
    106106            'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess),
     107            'StartSegment': soap_handler("StartSegment", self.StartSegment),
    107108            }
    108109        self.xmlrpc_services =  {\
     
    111112            'ReleaseAccess': xmlrpc_handler('ReleaseAccess',
    112113                self.ReleaseAccess),
     114            'StartSegment': xmlrpc_handler('StartSegment',
     115                self.StartSegment),
    113116            }
    114117
     
    772775
    773776
     777
     778    class emulab_segment:
     779        class ssh_cmd_timeout(RuntimeError): pass
     780
     781        def __init__(self, log=None, keyfile=None, debug=False):
     782            self.log = log or logging.getLogger(\
     783                    'fedd.experiment_control.emulab_segment')
     784            self.ssh_privkey_file = keyfile
     785            self.debug = debug
     786            self.ssh_exec="/usr/bin/ssh"
     787            self.scp_exec = "/usr/bin/scp"
     788            self.ssh_cmd_timeout = emulab_segment.ssh_cmd_timeout
     789
     790        def scp_file(self, file, user, host, dest=""):
     791            """
     792            scp a file to the remote host.  If debug is set the action is only
     793            logged.
     794            """
     795
     796            scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes',
     797                    '-o', 'StrictHostKeyChecking yes', '-i',
     798                    self.ssh_privkey_file, file,
     799                    "%s@%s:%s" % (user, host, dest)]
     800            rv = 0
     801
     802            try:
     803                dnull = open("/dev/null", "w")
     804            except IOError:
     805                self.log.debug("[ssh_file]: failed to open " + \
     806                        "/dev/null for redirect")
     807                dnull = Null
     808
     809            self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
     810            if not self.debug:
     811                rv = call(scp_cmd, stdout=dnull, stderr=dnull, close_fds=True,
     812                        close_fds=True)
     813
     814            return rv == 0
     815
     816        def ssh_cmd(self, user, host, cmd, wname=None, timeout=None):
     817            """
     818            Run a remote command on host as user.  If debug is set, the action
     819            is only logged.  Commands are run without stdin, to avoid stray
     820            SIGTTINs.
     821            """
     822            sh_str = ("%s -n -o 'IdentitiesOnly yes' -o " + \
     823                    "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \
     824                    (self.ssh_exec, self.ssh_privkey_file,
     825                            user, host, cmd)
     826
     827            try:
     828                dnull = open("/dev/null", "w")
     829            except IOError:
     830                self.log.debug("[ssh_cmd]: failed to open /dev/null " + \
     831                        "for redirect")
     832                dnull = Null
     833
     834            self.log.debug("[ssh_cmd]: %s" % sh_str)
     835            if not self.debug:
     836                if dnull:
     837                    sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull,
     838                            close_fds=True)
     839                else:
     840                    sub = Popen(sh_str, shell=True,
     841                            close_fds=True)
     842                if timeout:
     843                    i = 0
     844                    rv = sub.poll()
     845                    while i < timeout:
     846                        if rv is not None: break
     847                        else:
     848                            time.sleep(1)
     849                            rv = sub.poll()
     850                            i += 1
     851                    else:
     852                        self.log.debug("Process exceeded runtime: %s" % sh_str)
     853                        os.kill(sub.pid, signal.SIGKILL)
     854                        raise self.ssh_cmd_timeout();
     855                    return rv == 0
     856                else:
     857                    return sub.wait() == 0
     858            else:
     859                if timeout == 0:
     860                    self.log.debug("debug timeout raised on %s " % sh_str)
     861                    raise self.ssh_cmd_timeout()
     862                else:
     863                    return True
     864
     865    class start_segment(emulab_segment):
     866        def __init__(self, log=None, keyfile=None, debug=False):
     867            experiment_control_local.emulab_segment.__init__(self,
     868                    log=log, keyfile=keyfile, debug=debug)
     869
     870        def create_config_tree(self, src_dir, dest_dir, script):
     871            """
     872            Append commands to script that will create the directory hierarchy
     873            on the remote federant.
     874            """
     875
     876            if os.path.isdir(src_dir):
     877                print >>script, "mkdir -p %s" % dest_dir
     878                print >>script, "chmod 770 %s" % dest_dir
     879
     880                for f in os.listdir(src_dir):
     881                    if os.path.isdir(f):
     882                        self.create_config_tree("%s/%s" % (src_dir, f),
     883                                "%s/%s" % (dest_dir, f), script)
     884            else:
     885                self.log.debug("[create_config_tree]: Not a directory: %s" \
     886                        % src_dir)
     887
     888        def ship_configs(self, host, user, src_dir, dest_dir):
     889            """
     890            Copy federant-specific configuration files to the federant.
     891            """
     892            for f in os.listdir(src_dir):
     893                if os.path.isdir(f):
     894                    if not self.ship_configs(host, user, "%s/%s" % (src_dir, f),
     895                            "%s/%s" % (dest_dir, f)):
     896                        return False
     897                else:
     898                    if not self.scp_file("%s/%s" % (src_dir, f),
     899                            user, host, dest_dir):
     900                        return False
     901            return True
     902
     903        def get_state(self, user, host, tb, pid, eid):
     904            # command to test experiment state
     905            expinfo_exec = "/usr/testbed/bin/expinfo" 
     906            # Regular expressions to parse the expinfo response
     907            state_re = re.compile("State:\s+(\w+)")
     908            no_exp_re = re.compile("^No\s+such\s+experiment")
     909            swapping_re = re.compile("^No\s+information\s+available.")
     910            state = None    # Experiment state parsed from expinfo
     911            # The expinfo ssh command.  Note the identity restriction to use
     912            # only the identity provided in the pubkey given.
     913            cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o',
     914                    'StrictHostKeyChecking yes', '-i',
     915                    self.ssh_privkey_file, "%s@%s" % (user, host),
     916                    expinfo_exec, pid, eid]
     917
     918            dev_null = None
     919            try:
     920                dev_null = open("/dev/null", "a")
     921            except IOError, e:
     922                self.log.error("[get_state]: can't open /dev/null: %s" %e)
     923
     924            if self.debug:
     925                state = 'swapped'
     926                rv = 0
     927            else:
     928                status = Popen(cmd, stdout=PIPE, stderr=dev_null,
     929                        close_fds=True)
     930                for line in status.stdout:
     931                    m = state_re.match(line)
     932                    if m: state = m.group(1)
     933                    else:
     934                        for reg, st in ((no_exp_re, "none"),
     935                                (swapping_re, "swapping")):
     936                            m = reg.match(line)
     937                            if m: state = st
     938                rv = status.wait()
     939
     940            # If the experiment is not present the subcommand returns a
     941            # non-zero return value.  If we successfully parsed a "none"
     942            # outcome, ignore the return code.
     943            if rv != 0 and state != 'none':
     944                raise service_error(service_error.internal,
     945                        "Cannot get status of segment %s:%s/%s" % \
     946                                (tb, pid, eid))
     947            elif state not in ('active', 'swapped', 'swapping', 'none'):
     948                raise service_error(service_error.internal,
     949                        "Cannot get status of segment %s:%s/%s" % \
     950                                (tb, pid, eid))
     951            else: return state
     952
     953
     954        def __call__(self, tb, eid, tbparams, tmpdir, timeout=0):
     955            """
     956            Start a sub-experiment on a federant.
     957
     958            Get the current state, modify or create as appropriate, ship data
     959            and configs and start the experiment.  There are small ordering
     960            differences based on the initial state of the sub-experiment.
     961            """
     962            # ops node in the federant
     963            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     964            user = tbparams[tb]['user']     # federant user
     965            pid = tbparams[tb]['project']   # federant project
     966            # XXX
     967            base_confs = ( "hosts",)
     968            tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
     969            # Configuration directories on the remote machine
     970            proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
     971            tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
     972            rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
     973
     974            state = self.get_state(user, host, tb, pid, eid)
     975
     976            self.log.debug("[start_segment]: %s: %s" % (tb, state))
     977            self.log.info("[start_segment]:transferring experiment to %s" % tb)
     978
     979            if not self.scp_file("%s/%s/%s" % \
     980                    (tmpdir, tb, tclfile), user, host):
     981                return False
     982           
     983            if state == 'none':
     984                # Create a null copy of the experiment so that we capture any
     985                # logs there if the modify fails.  Emulab software discards the
     986                # logs from a failed startexp
     987                if not self.scp_file("%s/null.tcl" % tmpdir, user, host):
     988                    return False
     989                self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
     990                timedout = False
     991                try:
     992                    if not self.ssh_cmd(user, host,
     993                            ("/usr/testbed/bin/startexp -i -f -w -p %s " +
     994                            "-e %s null.tcl") % (pid, eid), "startexp",
     995                            timeout=60 * 10):
     996                        return False
     997                except self.ssh_cmd_timeout:
     998                    timedout = True
     999
     1000                if timedout:
     1001                    state = self.get_state(user, host, tb, pid, eid)
     1002                    if state != "swapped":
     1003                        return False
     1004
     1005           
     1006            # Open up a temporary file to contain a script for setting up the
     1007            # filespace for the new experiment.
     1008            self.log.info("[start_segment]: creating script file")
     1009            try:
     1010                sf, scriptname = tempfile.mkstemp()
     1011                scriptfile = os.fdopen(sf, 'w')
     1012            except IOError:
     1013                return False
     1014
     1015            scriptbase = os.path.basename(scriptname)
     1016
     1017            # Script the filesystem changes
     1018            print >>scriptfile, "/bin/rm -rf %s" % proj_dir
     1019            # Clear and create the tarfiles and rpm directories
     1020            for d in (tarfiles_dir, rpms_dir):
     1021                print >>scriptfile, "/bin/rm -rf %s/*" % d
     1022                print >>scriptfile, "mkdir -p %s" % d
     1023            print >>scriptfile, 'mkdir -p %s' % proj_dir
     1024            self.create_config_tree("%s/%s" % (tmpdir, tb),
     1025                    proj_dir, scriptfile)
     1026            if os.path.isdir("%s/tarfiles" % tmpdir):
     1027                self.create_config_tree("%s/tarfiles" % tmpdir, tarfiles_dir,
     1028                        scriptfile)
     1029            if os.path.isdir("%s/rpms" % tmpdir):
     1030                self.create_config_tree("%s/rpms" % tmpdir, rpms_dir,
     1031                        scriptfile)
     1032            print >>scriptfile, "rm -f %s" % scriptbase
     1033            scriptfile.close()
     1034
     1035            # Move the script to the remote machine
     1036            # XXX: could collide tempfile names on the remote host
     1037            if self.scp_file(scriptname, user, host, scriptbase):
     1038                os.remove(scriptname)
     1039            else:
     1040                return False
     1041
     1042            # Execute the script (and the script's last line deletes it)
     1043            if not self.ssh_cmd(user, host, "sh -x %s" % scriptbase):
     1044                return False
     1045
     1046            for f in base_confs:
     1047                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
     1048                        "%s/%s" % (proj_dir, f)):
     1049                    return False
     1050            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
     1051                    proj_dir):
     1052                return False
     1053            if os.path.isdir("%s/tarfiles" % tmpdir):
     1054                if not self.ship_configs(host, user,
     1055                        "%s/tarfiles" % tmpdir, tarfiles_dir):
     1056                    return False
     1057            if os.path.isdir("%s/rpms" % tmpdir):
     1058                if not self.ship_configs(host, user,
     1059                        "%s/rpms" % tmpdir, tarfiles_dir):
     1060                    return False
     1061            # Stage the new configuration (active experiments will stay swapped
     1062            # in now)
     1063            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
     1064            try:
     1065                if not self.ssh_cmd(user, host,
     1066                        "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
     1067                                (pid, eid, tclfile),
     1068                        "modexp", timeout= 60 * 10):
     1069                    return False
     1070            except self.ssh_cmd_timeout:
     1071                self.log.error("Modify command failed to complete in time")
     1072                # There's really no way to see if this succeeded or failed, so
     1073                # if it hangs, assume the worst.
     1074                return False
     1075            # Active experiments are still swapped, this swaps the others in.
     1076            if state != 'active':
     1077                self.log.info("[start_segment]: Swapping %s in on %s" % \
     1078                        (eid, tb))
     1079                timedout = False
     1080                try:
     1081                    if not self.ssh_cmd(user, host,
     1082                            "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
     1083                            "swapexp", timeout=10*60):
     1084                        return False
     1085                except self.ssh_cmd_timeout:
     1086                    timedout = True
     1087               
     1088                # If the command was terminated, but completed successfully,
     1089                # report success.
     1090                if timedout:
     1091                    self.log.debug("[start_segment]: swapin timed out " +\
     1092                            "checking state")
     1093                    state = self.get_state(user, host, tb, pid, eid)
     1094                    self.log.debug("[start_segment]: state is %s" % state)
     1095                    return state == 'active'
     1096            # Everything has gone OK.
     1097            return True
     1098
     1099    class stop_segment(emulab_segment):
     1100        def __init__(self, log=None, keyfile=None, debug=False):
     1101            experiment_control_local.emulab_segment.__init__(self,
     1102                    log=log, keyfile=keyfile, debug=debug)
     1103
     1104        def __call__(self, tb, eid, tbparams):
     1105            """
     1106            Stop a sub experiment by calling swapexp on the federant
     1107            """
     1108            user = tbparams[tb]['user']
     1109            host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
     1110            pid = tbparams[tb]['project']
     1111
     1112            self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
     1113            rv = False
     1114            try:
     1115                # Clean out tar files: we've gone over quota in the past
     1116                self.ssh_cmd(user, host, "rm -rf /proj/%s/rpms/%s" % (pid, eid))
     1117                self.ssh_cmd(user, host, "rm -rf /proj/%s/tarfiles/%s" % \
     1118                        (pid, eid))
     1119                rv = self.ssh_cmd(user, host,
     1120                        "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
     1121            except self.ssh_cmd_timeout:
     1122                rv = False
     1123            return rv
     1124
     1125    def StartSegment(self, req, fid):
     1126        try:
     1127            req = req['StartSegmentRequestBody']
     1128        except KeyError:
     1129            raise service_error(server_error.req, "Badly formed request")
     1130        auth_attr = req['allocID']['fedid']
     1131        if self.auth.check_attribute(fid, auth_attr):
     1132            print "OK"
     1133        else:
     1134            print "Fail"
     1135        return { 'allocID': req['allocID'] }
Note: See TracChangeset for help on using the changeset viewer.