Changeset 3441fe3
- Timestamp:
- Aug 27, 2008 3:25:01 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 0d830de
- Parents:
- 1af38d6
- Location:
- fedd
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/fedd_create_experiment.py
r1af38d6 r3441fe3 16 16 import tempfile 17 17 import copy 18 19 import traceback 18 20 19 21 from threading import * … … 35 37 cert_file=None, 36 38 cert_pwd=None, 39 exp_stem="faber-splitter", 37 40 debug=False, 38 41 muxmax=2, 39 42 nthreads=2, 40 project_user = "faber",43 randomize_experiments=False, 41 44 scp_exec="/usr/bin/scp", 42 45 scripts_dir="./", … … 50 53 tclsh="/usr/local/bin/otclsh", 51 54 tcl_splitter="/usr/testbed/lib/ns2ir/parse.tcl", 55 trace_file=None, 52 56 trusted_certs=None, 53 57 ): … … 58 62 self.cert_file = cert_file 59 63 self.cert_pwd = cert_pwd 64 self.exp_stem = exp_stem 60 65 self.debug = debug 61 66 self.muxmax = muxmax 62 67 self.nthreads = nthreads 63 self. project_user = project_user68 self.randomize_experiments = randomize_experiments 64 69 self.scp_exec = scp_exec 65 70 self.scripts_dir = scripts_dir … … 72 77 self.tcl_splitter = tcl_splitter 73 78 self.tbmap = tbmap 79 self.trace_file = trace_file 74 80 self.trusted_certs=trusted_certs 75 81 … … 137 143 class pooled_thread(Thread): 138 144 def __init__(self, group=None, target=None, name=None, args=(), 139 kwargs={}, pdata=None ):145 kwargs={}, pdata=None, trace_file=None): 140 146 Thread.__init__(self, group, target, name, args, kwargs) 141 147 self.rv = None … … 145 151 self.kwargs = kwargs 146 152 self.pdata = pdata 153 self.trace_file = trace_file 147 154 148 155 def run(self): … … 153 160 try: 154 161 self.rv = self.target(*self.args, **self.kwargs) 155 except e: 156 self.exception = e 157 158 print "%s done: %s" % (self.getName(), self.rv) 162 except service_error, s: 163 self.exception = s 164 if self.trace_file: 165 print >>self.trace_file, "Thread exception: %s %s" % \ 166 (s.code_string(), s.desc) 167 168 except: 169 self.exception = sys.exc_info()[1] 170 if self.trace_file: 171 print >>self.trace_file, \ 172 "Unexpected thread exception: %s" % \ 173 self.exception 174 print >>self.trace_file, "Trace: %s" % \ 175 traceback.format_exc() 159 176 if self.pdata: 160 177 self.pdata.terminate() … … 180 197 181 198 scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)] 199 200 trace = self.trace_file 201 if not trace: 202 try: 203 trace = open("/dev/null", "w") 204 except IOError: 205 raise service_error(service_error.internal, 206 "Cannot open /dev/null??"); 207 182 208 if not self.debug: 183 rv = call(scp_cmd )209 rv = call(scp_cmd, stdout=trace, stderr=trace) 184 210 else: 185 print "debug: %s" % " ".join(scp_cmd) 211 if self.trace_file: 212 print >>self.trace_file, "debug [scp_file]: %s" % \ 213 " ".join(scp_cmd) 186 214 rv = 0 187 215 188 216 return rv == 0 189 217 190 def ssh_cmd(self, user, host, cmd, wname=None , timeout=0):218 def ssh_cmd(self, user, host, cmd, wname=None): 191 219 sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd) 192 220 221 trace = self.trace_file 222 if not trace: 223 try: 224 trace = open("/dev/null", "w") 225 except IOError: 226 raise service_error(service_error.internal, 227 "Cannot open /dev/null??"); 228 193 229 if not self.debug: 194 # This should be done more carefully 195 sub = Popen(sh_str, shell=True) 230 sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace) 196 231 return sub.wait() == 0 197 232 else: 198 print "debug: %s" % sh_str 233 if self.trace_file: 234 print >>self.trace_file,"debug [ssh_cmd]: %s" % sh_str 199 235 return True 200 236 … … 238 274 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 239 275 state_re = re.compile("State:\s+(\w+)") 240 state = "none" 241 242 status = Popen([self.ssh_exec, "%s@%s" % (user, host), 243 expinfo_exec, pid, eid], stdout=PIPE) 276 no_exp_re = re.compile("^No\s+such\s+experiment") 277 state = None 278 cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid] 279 280 281 if self.trace_file: 282 print >>self.trace_file, "status request: %s" % " ".join(cmd) 283 284 if not self.trace_file: 285 try: 286 st_file = open("/dev/null", "w") 287 except IOError: 288 raise service_error(service_error.internal, 289 "Cannot open /dev/null!?") 290 else: 291 st_file = self.trace_file 292 293 status = Popen(cmd, stdout=PIPE, stderr=st_file) 244 294 for line in status.stdout: 245 295 m = state_re.match(line) 246 296 if m: state = m.group(1) 297 else: 298 m = no_exp_re.match(line) 299 if m: state = "none" 247 300 rv = status.wait() 248 if rv != 0: 301 # No experiment returns a non-zero rv. If we successfully parsed a 302 # "none" outcome, ignore teh return code. 303 if rv != 0 and state != "none": 249 304 raise service_error(service_error.internal, 250 305 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 251 # XXX252 print"%s: %s" % (tb, state)253 print"transferring experiment to %s" % tb306 if self.trace_file: 307 print >>self.trace_file, "%s: %s" % (tb, state) 308 print >>self.trace_file, "transferring experiment to %s" % tb 254 309 255 310 if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host): … … 283 338 "%s/rpms" % tmpdir, tarfiles_dir): 284 339 return False 285 print "Modifying %s on %s" % (eid, tb) 340 if self.trace_file: 341 print >>self.trace_file, "Modifying %s on %s" % (eid, tb) 286 342 if not self.ssh_cmd(user, host, 287 343 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ … … 308 364 "%s/rpms" % tmpdir, tarfiles_dir): 309 365 return False 310 print "Modifying %s on %s" % (eid, tb) 366 if self.trace_file: 367 print >>self.trace_file, "Modifying %s on %s" % (eid, tb) 311 368 if not self.ssh_cmd(user, host, 312 369 "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile), 313 370 "modexp"): 314 371 return False 315 print "Swapping %s in on %s" % (eid, tb) 372 if self.trace_file: 373 print >>self.trace_file, "Swapping %s in on %s" % (eid, tb) 316 374 if not self.ssh_cmd(user, host, 317 375 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 318 "swapexp" , timeout):376 "swapexp"): 319 377 return False 320 378 return True … … 333 391 "%s/rpms" % tmpdir, tarfiles_dir): 334 392 return False 335 print "Creating %s on %s" % (eid, tb) 393 if self.trace_file: 394 print >>self.trace_file, "Creating %s on %s" % (eid, tb) 336 395 if not self.ssh_cmd(user, host, 337 396 "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \ 338 (pid, eid, tclfile), "startexp" , timeout):397 (pid, eid, tclfile), "startexp"): 339 398 return False 340 399 # After startexp the per-experiment directories exist … … 348 407 proj_dir): 349 408 return False 350 print "Swapping %s in on %s" % (eid, tb) 409 if self.trace_file: 410 print >>self.trace_file, "Swapping %s in on %s" % (eid, tb) 351 411 if not self.ssh_cmd(user, host, 352 412 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 353 "swapexp" , timeout):413 "swapexp"): 354 414 return False 355 415 return True 356 416 else: 357 # XXX358 print"unknown state %s" % state417 if self.trace_file: 418 print >>self.trace_file, "unknown state %s" % state 359 419 return False 360 420 … … 364 424 pid = tbparams[tb]['project'] 365 425 366 # XXX:367 print"Stopping %s on %s" % (eid, tb)426 if self.trace_file: 427 print >>self.trace_file, "Stopping %s on %s" % (eid, tb) 368 428 return self.ssh_cmd(user, host, 369 "/usr/testbed/bin/swapexp -w % d %dout" % (pid, eid))429 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) 370 430 371 431 … … 379 439 t = type.lower(); 380 440 if t not in valid_types: raise ValueError 441 442 trace = self.trace_file 443 if not trace: 444 try: 445 trace = open("/dev/null", "w") 446 except IOError: 447 raise service_error(service_error.internal, 448 "Cannot open /dev/null??"); 449 381 450 # May raise CalledProcessError 382 rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest]) 451 rv = call([self.ssh_keygen, '-t', t, '-N', '', '-f', dest], 452 stdout=trace, stderr=trace) 383 453 if rv != 0: 384 454 raise service_error(service_error.internal, … … 535 605 'connectortype': 'gwtype', 536 606 'tunnelcfg': 'tun', 607 'smbshare': 'smbshare', 537 608 } 538 609 … … 605 676 "fs": e['fileServer'], 606 677 "eventserver": e['eventServer'], 607 "project": unpack_id(p['name']) 678 "project": unpack_id(p['name']), 679 "emulab" : e 608 680 } 609 681 … … 730 802 731 803 class gateways: 732 def __init__(self, eid, smbshare,master, tmpdir, gw_pubkey,804 def __init__(self, eid, master, tmpdir, gw_pubkey, 733 805 gw_secretkey, copy_file): 734 806 self.begin_gateways = \ … … 740 812 741 813 self.eid = eid 742 self.smbshare = smbshare743 814 self.master = master 744 815 self.tmpdir = tmpdir … … 850 921 print >>cc, "ControlGateway: %s" % \ 851 922 self.control_gateway 852 print >>cc, "SMBSHare: %s" % self.smbshare 923 if tbparams[self.master].has_key('smbshare'): 924 print >>cc, "SMBSHare: %s" % \ 925 tbparams[self.master]['smbshare'] 853 926 print >>cc, "ProjectUser: %s" % \ 854 tbparams[self. current_gateways]['user']927 tbparams[self.master]['user'] 855 928 print >>cc, "ProjectName: %s" % \ 856 929 tbparams[self.master]['project'] … … 860 933 "Error creating client config") 861 934 else: 862 # XXX863 print >>sys.stderr, "No control gateway for %s" %\864 self.current_gateways935 if self.trace_file: 936 print >>sys.stderr, "No control gateway for %s" %\ 937 self.current_gateways 865 938 self.current_gateways = None 866 939 else: … … 977 1050 pid = "dummy" 978 1051 gid = "dummy" 979 # XXX 980 eid = "faber-splitter" 981 # XXX 982 master = "deter" 983 # XXX 984 smbshare="USERS" 1052 eid = self.exp_stem 1053 if self.randomize_experiments: 1054 for i in range(0,5): 1055 eid += random.choice(string.ascii_letters) 985 1056 # XXX 986 1057 fail_soft = False 987 # XXX988 startem = True989 990 1058 991 1059 try: … … 1017 1085 if user == None: 1018 1086 raise service_error(service_error.req, "No user") 1087 1088 master = req.get('master', None) 1089 if master == None: 1090 raise service_error(service_error.req, "No master testbed label") 1019 1091 1020 1092 … … 1028 1100 parse_current_testbed = self.current_testbed(eid, tmpdir) 1029 1101 parse_allbeds = self.allbeds(self.get_access) 1030 parse_gateways = self.gateways(eid, smbshare,master, tmpdir,1102 parse_gateways = self.gateways(eid, master, tmpdir, 1031 1103 gw_pubkey_base, gw_secretkey_base, self.copy_file) 1032 1104 parse_vtopo = self.shunt_to_file("^#\s+Begin\s+Vtopo", … … 1060 1132 1061 1133 self.genviz(tmpdir + "/vtopo.xml", tmpdir + "/viz.xml") 1062 if not startem: return True1063 1134 1064 1135 # Copy tarfiles and rpms needed at remote sites into a staging area … … 1093 1164 t = self.pooled_thread(target=self.start_segment, 1094 1165 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1095 pdata=thread_pool_info )1166 pdata=thread_pool_info, trace_file=self.trace_file) 1096 1167 threads.append(t) 1097 1168 t.start() … … 1113 1184 1114 1185 # If one failed clean up 1115 if not fail_soft and len(failed) > 0: 1116 for tb in failed: 1117 self.stop_segment(tb, eid, tbparams) 1186 if len(failed) > 0: 1187 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1188 if fail_soft: 1189 raise service_error(service_error.partial, \ 1190 "Partial swap in on %s" % ",".join(succeeded)) 1191 else: 1192 for tb in succeeded: 1193 self.stop_segment(tb, eid, tbparams) 1194 raise service_error(service_error.federant, 1195 "Swap in failed on %s" % ",".join(failed)) 1118 1196 else: 1119 print "Experiment started" 1120 1121 # XXX: return value 1197 if self.trace_file: 1198 print >>self.trace_file, "Experiment started" 1199 1200 return { 'emulab' : [ tbparams[tb]['emulab'] \ 1201 for tb in tbparams.keys() \ 1202 if tbparams[tb].has_key('emulab') ] } 1122 1203 1123 1204 if __name__ == '__main__': … … 1128 1209 action='store_true', help='print actions rather than take them') 1129 1210 parser.add_option('-f', '--file', dest='tcl', help='tcl file to parse') 1211 parser.add_option('-m', '--master', dest='master', 1212 help='testbed label for matster testbd') 1213 parser.add_option('-t', '--trace', dest='trace', default=None, 1214 help='file to print intermediate messages to') 1215 parser.add_option('-T', '--trace-stderr', dest='trace', 1216 action='store_const',const=sys.stderr, 1217 help='file to print intermediate messages to') 1130 1218 opts, args = parser.parse_args() 1219 1220 trace_file = None 1221 if opts.trace: 1222 try: 1223 trace_file = open(opts.trace, 'w') 1224 except IOError: 1225 print >>sys.stderr, "Can't open trace file" 1226 1227 if opts.debug: 1228 if not trace_file: 1229 trace_file = sys.stderr 1131 1230 1132 1231 if opts.tcl != None: … … 1139 1238 else: 1140 1239 sys.exit("Must specify a file name") 1240 1241 if not opts.master: 1242 sys.exit("Must supply master tb label (--master)"); 1141 1243 1142 1244 obj = fedd_create_experiment_local( … … 1151 1253 'ucb':'https://users.isi.deterlab.net:23237', 1152 1254 }, 1255 trace_file=trace_file 1153 1256 ) 1154 obj.create_experiment( {\1257 rv = obj.create_experiment( {\ 1155 1258 'experimentdescription' : content, 1259 'master' : opts.master, 1156 1260 'user': [ {'userID' : { 'username' : 'faber' } } ], 1157 1261 }, 1158 1262 None) 1263 1264 print rv -
fedd/fedd_types.xsd
r1af38d6 r3441fe3 235 235 maxOccurs="unbounded"/> 236 236 <xsd:element name="experimentdescription" type="xsd:base64Binary"/> 237 <xsd:element name="master" type="xsd:string"/> 237 238 </xsd:sequence> 238 239 </xsd:complexType> … … 245 246 </xsd:annotation> 246 247 <xsd:sequence> 247 <xsd:element name=" project" type="projectType" minOccurs="1"248 <xsd:element name="emulab" type="emulabType" minOccurs="1" 248 249 maxOccurs="unbounded"/> 249 250 </xsd:sequence> … … 257 258 <xsd:enumeration value="2"/> <!-- proxy error --> 258 259 <xsd:enumeration value="3"/> <!-- badly formed request --> 259 <xsd:enumeration value="4"/> <!-- internal error --> 260 <xsd:enumeration value="4"/> <!-- server configuration error --> 261 <xsd:enumeration value="5"/> <!-- internal error --> 262 <xsd:enumeration value="6"/> <!-- partial instantiation --> 263 <xsd:enumeration value="7"/> <!-- federant error --> 260 264 </xsd:restriction> 261 265 </xsd:element> -
fedd/service_error.py
r1af38d6 r3441fe3 10 10 server_config = 4 11 11 internal = 5 12 partial = 6 13 federant = 7 12 14 code_str = { 13 15 access : "Access Denied", … … 15 17 req : "Badly Formed Request", 16 18 server_config: "Server Configuration Error", 17 internal : "Internal Error" 19 internal : "Internal Error", 20 partial: "Partial embedding", 21 federant: "Federant error" 18 22 } 19 23 str_code = dict([ (v, k) for k, v in code_str.iteritems() ])
Note: See TracChangeset
for help on using the changeset viewer.