Changeset 3441fe3


Ignore:
Timestamp:
Aug 27, 2008 3:25:01 PM (16 years ago)
Author:
Ted Faber <faber@…>
Branches:
axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
Children:
0d830de
Parents:
1af38d6
Message:

closer to fedd integration

Location:
fedd
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • fedd/fedd_create_experiment.py

    r1af38d6 r3441fe3  
    1616import tempfile
    1717import copy
     18
     19import traceback
    1820
    1921from threading import *
     
    3537            cert_file=None,
    3638            cert_pwd=None,
     39            exp_stem="faber-splitter",
    3740            debug=False,
    3841            muxmax=2,
    3942            nthreads=2,
    40             project_user = "faber",
     43            randomize_experiments=False,
    4144            scp_exec="/usr/bin/scp",
    4245            scripts_dir="./",
     
    5053            tclsh="/usr/local/bin/otclsh",
    5154            tcl_splitter="/usr/testbed/lib/ns2ir/parse.tcl",
     55            trace_file=None,
    5256            trusted_certs=None,
    5357            ):
     
    5862        self.cert_file = cert_file
    5963        self.cert_pwd = cert_pwd
     64        self.exp_stem = exp_stem
    6065        self.debug = debug
    6166        self.muxmax = muxmax
    6267        self.nthreads = nthreads
    63         self.project_user = project_user
     68        self.randomize_experiments = randomize_experiments
    6469        self.scp_exec = scp_exec
    6570        self.scripts_dir = scripts_dir
     
    7277        self.tcl_splitter = tcl_splitter
    7378        self.tbmap = tbmap
     79        self.trace_file = trace_file
    7480        self.trusted_certs=trusted_certs
    7581
     
    137143    class pooled_thread(Thread):
    138144        def __init__(self, group=None, target=None, name=None, args=(),
    139                 kwargs={}, pdata=None):
     145                kwargs={}, pdata=None, trace_file=None):
    140146            Thread.__init__(self, group, target, name, args, kwargs)
    141147            self.rv = None
     
    145151            self.kwargs = kwargs
    146152            self.pdata = pdata
     153            self.trace_file = trace_file
    147154       
    148155        def run(self):
     
    153160                try:
    154161                    self.rv = self.target(*self.args, **self.kwargs)
    155                 except e:
    156                     self.exception = e
    157 
    158             print "%s done: %s" % (self.getName(), self.rv)
     162                except service_error, s:
     163                    self.exception = s
     164                    if self.trace_file:
     165                        print >>self.trace_file, "Thread exception: %s %s" % \
     166                                (s.code_string(), s.desc)
     167                   
     168                except:
     169                    self.exception = sys.exc_info()[1]
     170                    if self.trace_file:
     171                        print >>self.trace_file, \
     172                                "Unexpected thread exception: %s" % \
     173                                self.exception
     174                        print >>self.trace_file, "Trace: %s" % \
     175                                traceback.format_exc()
    159176            if self.pdata:
    160177                self.pdata.terminate()
     
    180197
    181198        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
     199
     200        trace = self.trace_file
     201        if not trace:
     202            try:
     203                trace = open("/dev/null", "w")
     204            except IOError:
     205                raise service_error(service_error.internal,
     206                        "Cannot open /dev/null??");
     207
    182208        if not self.debug:
    183             rv = call(scp_cmd)
     209            rv = call(scp_cmd, stdout=trace, stderr=trace)
    184210        else:
    185             print "debug: %s" % " ".join(scp_cmd)
     211            if self.trace_file:
     212                print >>self.trace_file, "debug [scp_file]: %s" % \
     213                        " ".join(scp_cmd)
    186214            rv = 0
    187215
    188216        return rv == 0
    189217
    190     def ssh_cmd(self, user, host, cmd, wname=None, timeout=0):
     218    def ssh_cmd(self, user, host, cmd, wname=None):
    191219        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
    192220
     221        trace = self.trace_file
     222        if not trace:
     223            try:
     224                trace = open("/dev/null", "w")
     225            except IOError:
     226                raise service_error(service_error.internal,
     227                        "Cannot open /dev/null??");
     228
    193229        if not self.debug:
    194             # This should be done more carefully
    195             sub = Popen(sh_str, shell=True)
     230            sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace)
    196231            return sub.wait() == 0
    197232        else:
    198             print "debug: %s" % sh_str
     233            if self.trace_file:
     234                print >>self.trace_file,"debug [ssh_cmd]: %s" % sh_str
    199235            return True
    200236
     
    238274        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
    239275        state_re = re.compile("State:\s+(\w+)")
    240         state = "none"
    241 
    242         status = Popen([self.ssh_exec, "%s@%s" % (user, host),
    243                 expinfo_exec, pid, eid], stdout=PIPE)
     276        no_exp_re = re.compile("^No\s+such\s+experiment")
     277        state = None
     278        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
     279
     280
     281        if self.trace_file:
     282            print >>self.trace_file, "status request: %s" % " ".join(cmd)
     283       
     284        if not self.trace_file:
     285            try:
     286                st_file = open("/dev/null", "w")
     287            except IOError:
     288                raise service_error(service_error.internal,
     289                        "Cannot open /dev/null!?")
     290        else:
     291            st_file = self.trace_file
     292
     293        status = Popen(cmd, stdout=PIPE, stderr=st_file)
    244294        for line in status.stdout:
    245295            m = state_re.match(line)
    246296            if m: state = m.group(1)
     297            else:
     298                m = no_exp_re.match(line)
     299                if m: state = "none"
    247300        rv = status.wait()
    248         if rv != 0:
     301        # No experiment returns a non-zero rv.  If we successfully parsed a
     302        # "none" outcome, ignore teh return code.
     303        if rv != 0 and state != "none":
    249304            raise service_error(service_error.internal,
    250305                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
    251         # XXX
    252         print "%s: %s" % (tb, state)
    253         print "transferring experiment to %s" % tb
     306        if self.trace_file:
     307            print >>self.trace_file, "%s: %s" % (tb, state)
     308            print >>self.trace_file, "transferring experiment to %s" % tb
    254309
    255310        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
     
    283338                        "%s/rpms" % tmpdir, tarfiles_dir):
    284339                    return False
    285             print "Modifying %s on %s" % (eid, tb)
     340            if self.trace_file:
     341                print >>self.trace_file, "Modifying %s on %s" % (eid, tb)
    286342            if not self.ssh_cmd(user, host,
    287343                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
     
    308364                        "%s/rpms" % tmpdir, tarfiles_dir):
    309365                    return False
    310             print "Modifying %s on %s" % (eid, tb)
     366            if self.trace_file:
     367                print >>self.trace_file, "Modifying %s on %s" % (eid, tb)
    311368            if not self.ssh_cmd(user, host,
    312369                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
    313370                    "modexp"):
    314371                return False
    315             print "Swapping %s in on %s" % (eid, tb)
     372            if self.trace_file:
     373                print >>self.trace_file, "Swapping %s in on %s" % (eid, tb)
    316374            if not self.ssh_cmd(user, host,
    317375                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    318                     "swapexp", timeout):
     376                    "swapexp"):
    319377                return False
    320378            return True
     
    333391                        "%s/rpms" % tmpdir, tarfiles_dir):
    334392                    return False
    335             print "Creating %s on %s" % (eid, tb)
     393            if self.trace_file:
     394                print >>self.trace_file, "Creating %s on %s" % (eid, tb)
    336395            if not self.ssh_cmd(user, host,
    337396                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
    338                             (pid, eid, tclfile), "startexp", timeout):
     397                            (pid, eid, tclfile), "startexp"):
    339398                return False
    340399            # After startexp the per-experiment directories exist
     
    348407                    proj_dir):
    349408                return False
    350             print "Swapping %s in on %s" % (eid, tb)
     409            if self.trace_file:
     410                print >>self.trace_file, "Swapping %s in on %s" % (eid, tb)
    351411            if not self.ssh_cmd(user, host,
    352412                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
    353                     "swapexp", timeout):
     413                    "swapexp"):
    354414                return False
    355415            return True
    356416        else:
    357             # XXX
    358             print "unknown state %s" % state
     417            if self.trace_file:
     418                print >>self.trace_file, "unknown state %s" % state
    359419            return False
    360420
     
    364424        pid = tbparams[tb]['project']
    365425
    366         # XXX:
    367         print "Stopping %s on %s" % (eid, tb)
     426        if self.trace_file:
     427            print >>self.trace_file, "Stopping %s on %s" % (eid, tb)
    368428        return self.ssh_cmd(user, host,
    369                 "/usr/testbed/bin/swapexp -w %d %d out" % (pid, eid))
     429                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
    370430
    371431       
     
    379439        t = type.lower();
    380440        if t not in valid_types: raise ValueError
     441
     442        trace = self.trace_file
     443        if not trace:
     444            try:
     445                trace = open("/dev/null", "w")
     446            except IOError:
     447                raise service_error(service_error.internal,
     448                        "Cannot open /dev/null??");
     449
    381450        # May raise CalledProcessError
    382         rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest])
     451        rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest],
     452                stdout=trace, stderr=trace)
    383453        if rv != 0:
    384454            raise service_error(service_error.internal,
     
    535605            'connectortype': 'gwtype',
    536606            'tunnelcfg': 'tun',
     607            'smbshare': 'smbshare',
    537608        }
    538609
     
    605676                "fs": e['fileServer'],
    606677                "eventserver": e['eventServer'],
    607                 "project": unpack_id(p['name'])
     678                "project": unpack_id(p['name']),
     679                "emulab" : e
    608680                }
    609681
     
    730802
    731803    class gateways:
    732         def __init__(self, eid, smbshare, master, tmpdir, gw_pubkey,
     804        def __init__(self, eid, master, tmpdir, gw_pubkey,
    733805                gw_secretkey, copy_file):
    734806            self.begin_gateways = \
     
    740812
    741813            self.eid = eid
    742             self.smbshare = smbshare
    743814            self.master = master
    744815            self.tmpdir = tmpdir
     
    850921                            print >>cc, "ControlGateway: %s" % \
    851922                                    self.control_gateway
    852                             print >>cc, "SMBSHare: %s" % self.smbshare
     923                            if tbparams[self.master].has_key('smbshare'):
     924                                print >>cc, "SMBSHare: %s" % \
     925                                        tbparams[self.master]['smbshare']
    853926                            print >>cc, "ProjectUser: %s" % \
    854                                     tbparams[self.current_gateways]['user']
     927                                    tbparams[self.master]['user']
    855928                            print >>cc, "ProjectName: %s" % \
    856929                                    tbparams[self.master]['project']
     
    860933                                    "Error creating client config")
    861934                    else:
    862                         # XXX
    863                         print >>sys.stderr, "No control gateway for %s" %\
    864                                 self.current_gateways
     935                        if self.trace_file:
     936                            print >>sys.stderr, "No control gateway for %s" %\
     937                                    self.current_gateways
    865938                    self.current_gateways = None
    866939                else:
     
    9771050        pid = "dummy"
    9781051        gid = "dummy"
    979         # XXX
    980         eid = "faber-splitter"
    981         # XXX
    982         master = "deter"
    983         # XXX
    984         smbshare="USERS"
     1052        eid = self.exp_stem
     1053        if self.randomize_experiments:
     1054            for i in range(0,5):
     1055                eid += random.choice(string.ascii_letters)
    9851056        # XXX
    9861057        fail_soft = False
    987         # XXX
    988         startem = True
    989 
    9901058
    9911059        try:
     
    10171085        if user == None:
    10181086            raise service_error(service_error.req, "No user")
     1087
     1088        master = req.get('master', None)
     1089        if master == None:
     1090            raise service_error(service_error.req, "No master testbed label")
    10191091       
    10201092       
     
    10281100        parse_current_testbed = self.current_testbed(eid, tmpdir)
    10291101        parse_allbeds = self.allbeds(self.get_access)
    1030         parse_gateways = self.gateways(eid, smbshare, master, tmpdir,
     1102        parse_gateways = self.gateways(eid, master, tmpdir,
    10311103                gw_pubkey_base, gw_secretkey_base, self.copy_file)
    10321104        parse_vtopo = self.shunt_to_file("^#\s+Begin\s+Vtopo",
     
    10601132
    10611133        self.genviz(tmpdir + "/vtopo.xml", tmpdir + "/viz.xml")
    1062         if not startem: return True
    10631134
    10641135        # Copy tarfiles and rpms needed at remote sites into a staging area
     
    10931164            t  = self.pooled_thread(target=self.start_segment,
    10941165                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
    1095                     pdata=thread_pool_info)
     1166                    pdata=thread_pool_info, trace_file=self.trace_file)
    10961167            threads.append(t)
    10971168            t.start()
     
    11131184
    11141185        # If one failed clean up
    1115         if not fail_soft and len(failed) > 0:
    1116             for tb in failed:
    1117                 self.stop_segment(tb, eid, tbparams)
     1186        if len(failed) > 0:
     1187            succeeded = [tb for tb in allocated.keys() if tb not in failed]
     1188            if fail_soft:
     1189                raise service_error(service_error.partial, \
     1190                        "Partial swap in on %s" % ",".join(succeeded))
     1191            else:
     1192                for tb in succeeded:
     1193                    self.stop_segment(tb, eid, tbparams)
     1194                raise service_error(service_error.federant,
     1195                    "Swap in failed on %s" % ",".join(failed))
    11181196        else:
    1119             print "Experiment started"
    1120 
    1121         # XXX: return value
     1197            if self.trace_file:
     1198                print >>self.trace_file, "Experiment started"
     1199
     1200        return { 'emulab' : [ tbparams[tb]['emulab'] \
     1201                for tb in tbparams.keys() \
     1202                    if tbparams[tb].has_key('emulab') ] }
    11221203
    11231204if __name__ == '__main__':
     
    11281209            action='store_true', help='print actions rather than take them')
    11291210    parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse')
     1211    parser.add_option('-m', '--master', dest='master',
     1212            help='testbed label for matster testbd')
     1213    parser.add_option('-t', '--trace', dest='trace', default=None,
     1214            help='file to print intermediate messages to')
     1215    parser.add_option('-T', '--trace-stderr', dest='trace',
     1216            action='store_const',const=sys.stderr,
     1217            help='file to print intermediate messages to')
    11301218    opts, args  = parser.parse_args()
     1219
     1220    trace_file = None
     1221    if opts.trace:
     1222        try:
     1223            trace_file = open(opts.trace, 'w')
     1224        except IOError:
     1225            print >>sys.stderr, "Can't open trace file"
     1226
     1227    if opts.debug:
     1228        if not trace_file:
     1229            trace_file = sys.stderr
    11311230
    11321231    if opts.tcl != None:
     
    11391238    else:
    11401239        sys.exit("Must specify a file name")
     1240
     1241    if not opts.master:
     1242        sys.exit("Must supply master tb label (--master)");
    11411243
    11421244    obj = fedd_create_experiment_local(
     
    11511253                'ucb':'https://users.isi.deterlab.net:23237',
    11521254                },
     1255            trace_file=trace_file
    11531256        )
    1154     obj.create_experiment( {\
     1257    rv = obj.create_experiment( {\
    11551258            'experimentdescription' : content,
     1259            'master' : opts.master,
    11561260            'user': [ {'userID' : { 'username' : 'faber' } } ],
    11571261            },
    11581262            None)
     1263
     1264    print rv
  • fedd/fedd_types.xsd

    r1af38d6 r3441fe3  
    235235        maxOccurs="unbounded"/>
    236236      <xsd:element name="experimentdescription" type="xsd:base64Binary"/>
     237      <xsd:element name="master" type="xsd:string"/>
    237238    </xsd:sequence>
    238239  </xsd:complexType>
     
    245246    </xsd:annotation>
    246247    <xsd:sequence>
    247       <xsd:element name="project" type="projectType" minOccurs="1"
     248      <xsd:element name="emulab" type="emulabType" minOccurs="1"
    248249        maxOccurs="unbounded"/>
    249250    </xsd:sequence>
     
    257258          <xsd:enumeration value="2"/>  <!-- proxy error -->
    258259          <xsd:enumeration value="3"/>  <!-- badly formed request -->
    259           <xsd:enumeration value="4"/>  <!-- internal error -->
     260          <xsd:enumeration value="4"/>  <!-- server configuration error -->
     261          <xsd:enumeration value="5"/>  <!-- internal error -->
     262          <xsd:enumeration value="6"/>  <!-- partial instantiation -->
     263          <xsd:enumeration value="7"/>  <!-- federant error -->
    260264        </xsd:restriction>
    261265      </xsd:element>
  • fedd/service_error.py

    r1af38d6 r3441fe3  
    1010    server_config = 4
    1111    internal = 5
     12    partial = 6
     13    federant = 7
    1214    code_str = {
    1315        access : "Access Denied",
     
    1517        req : "Badly Formed Request",
    1618        server_config: "Server Configuration Error",
    17         internal : "Internal Error"
     19        internal : "Internal Error",
     20        partial: "Partial embedding",
     21        federant: "Federant error"
    1822    }
    1923    str_code = dict([ (v, k) for k, v in code_str.iteritems() ])
Note: See TracChangeset for help on using the changeset viewer.