- Timestamp:
- Jul 21, 2009 2:23:40 PM (15 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 55c074c
- Parents:
- 8780cbec
- Location:
- fedd/federation
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r8780cbec r866c983 41 41 42 42 def __init__(self, config=None, auth=None): 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 43 """ 44 Initializer. Pulls parameters out of the ConfigParser's access section. 45 """ 46 47 # Make sure that the configuration is in place 48 if not config: 49 raise RunTimeError("No config to fedd.access") 50 51 self.project_priority = config.getboolean("access", "project_priority") 52 self.allow_proxy = config.getboolean("access", "allow_proxy") 53 54 self.boss = config.get("access", "boss") 55 self.ops = config.get("access", "ops") 56 self.domain = config.get("access", "domain") 57 self.fileserver = config.get("access", "fileserver") 58 self.eventserver = config.get("access", "eventserver") 59 60 self.attrs = { } 61 self.access = { } 62 self.restricted = [ ] 63 self.projects = { } 64 self.keys = { } 65 self.types = { } 66 self.allocation = { } 67 self.state = { 68 'projects': self.projects, 69 'allocation' : self.allocation, 70 'keys' : self.keys, 71 'types': self.types 72 } 73 self.log = logging.getLogger("fedd.access") 74 set_log_level(config, "access", self.log) 75 self.state_lock = Lock() 76 77 if auth: self.auth = auth 78 else: 79 self.log.error(\ 80 "[access]: No authorizer initialized, creating local one.") 81 auth = authorizer() 82 83 tb = config.get('access', 'testbed') 84 if tb: self.testbed = [ t.strip() for t in tb.split(',') ] 85 else: self.testbed = [ ] 86 87 if config.has_option("access", "accessdb"): 88 self.read_access(config.get("access", "accessdb")) 89 90 self.state_filename = config.get("access", "access_state") 91 self.read_state() 92 93 # Keep cert_file and cert_pwd coming from the same place 94 self.cert_file = config.get("access", "cert_file") 95 if self.cert_file: 96 self.sert_pwd = config.get("access", "cert_pw") 97 else: 98 self.cert_file = config.get("globals", "cert_file") 99 self.sert_pwd = config.get("globals", "cert_pw") 100 101 self.trusted_certs = config.get("access", "trusted_certs") or \ 102 config.get("globals", "trusted_certs") 103 104 self.soap_services = {\ 105 'RequestAccess': soap_handler("RequestAccess", self.RequestAccess), 106 'ReleaseAccess': soap_handler("ReleaseAccess", self.ReleaseAccess), 107 } 108 self.xmlrpc_services = {\ 109 'RequestAccess': xmlrpc_handler('RequestAccess', 110 self.RequestAccess), 111 'ReleaseAccess': xmlrpc_handler('ReleaseAccess', 112 self.ReleaseAccess), 113 } 114 115 116 if not config.has_option("allocate", "uri"): 117 self.allocate_project = \ 118 allocate_project_local(config, auth) 119 else: 120 self.allocate_project = \ 121 allocate_project_remote(config, auth) 122 123 # If the project allocator exports services, put them in this object's 124 # maps so that classes that instantiate this can call the services. 125 self.soap_services.update(self.allocate_project.soap_services) 126 self.xmlrpc_services.update(self.allocate_project.xmlrpc_services) 127 127 128 128 129 129 def read_access(self, config): 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 130 """ 131 Read a configuration file and set internal parameters. 132 133 The format is more complex than one might hope. The basic format is 134 attribute value pairs separated by colons(:) on a signle line. The 135 attributes in bool_attrs, emulab_attrs and id_attrs can all be set 136 directly using the name: value syntax. E.g. 137 boss: hostname 138 sets self.boss to hostname. In addition, there are access lines of the 139 form (tb, proj, user) -> (aproj, auser) that map the first tuple of 140 names to the second for access purposes. Names in the key (left side) 141 can include "<NONE> or <ANY>" to act as wildcards or to require the 142 fields to be empty. Similarly aproj or auser can be <SAME> or 143 <DYNAMIC> indicating that either the matching key is to be used or a 144 dynamic user or project will be created. These names can also be 145 federated IDs (fedid's) if prefixed with fedid:. Finally, the aproj 146 can be followed with a colon-separated list of node types to which that 147 project has access (or will have access if dynamic). 148 Testbed attributes outside the forms above can be given using the 149 format attribute: name value: value. The name is a single word and the 150 value continues to the end of the line. Empty lines and lines startin 151 with a # are ignored. 152 153 Parsing errors result in a self.parse_error exception being raised. 154 """ 155 lineno=0 156 name_expr = "["+string.ascii_letters + string.digits + "\.\-_]+" 157 fedid_expr = "fedid:[" + string.hexdigits + "]+" 158 key_name = "(<ANY>|<NONE>|"+fedid_expr + "|"+ name_expr + ")" 159 access_proj = "(<DYNAMIC>(?::" + name_expr +")*|"+ \ 160 "<SAME>" + "(?::" + name_expr + ")*|" + \ 161 fedid_expr + "(?::" + name_expr + ")*|" + \ 162 name_expr + "(?::" + name_expr + ")*)" 163 access_name = "(<DYNAMIC>|<SAME>|" + fedid_expr + "|"+ name_expr + ")" 164 165 restricted_re = re.compile("restricted:\s*(.*)", re.IGNORECASE) 166 attr_re = re.compile('attribute:\s*([\._\-a-z0-9]+)\s+value:\s*(.*)', 167 re.IGNORECASE) 168 access_re = re.compile('\('+key_name+'\s*,\s*'+key_name+'\s*,\s*'+ 169 key_name+'\s*\)\s*->\s*\('+access_proj + '\s*,\s*' + 170 access_name + '\s*,\s*' + access_name + '\s*\)', re.IGNORECASE) 171 172 def parse_name(n): 173 if n.startswith('fedid:'): return fedid(hexstr=n[len('fedid:'):]) 174 else: return n 175 176 def auth_name(n): 177 if isinstance(n, basestring): 178 if n =='<any>' or n =='<none>': return None 179 else: return unicode(n) 180 else: 181 return n 182 183 f = open(config, "r"); 184 for line in f: 185 lineno += 1 186 line = line.strip(); 187 if len(line) == 0 or line.startswith('#'): 188 continue 189 190 # Extended (attribute: x value: y) attribute line 191 m = attr_re.match(line) 192 if m != None: 193 attr, val = m.group(1,2) 194 self.attrs[attr] = val 195 continue 196 197 # Restricted entry 198 m = restricted_re.match(line) 199 if m != None: 200 val = m.group(1) 201 self.restricted.append(val) 202 continue 203 204 # Access line (t, p, u) -> (ap, cu, su) line 205 m = access_re.match(line) 206 if m != None: 207 access_key = tuple([ parse_name(x) for x in m.group(1,2,3)]) 208 auth_key = tuple([ auth_name(x) for x in access_key]) 209 aps = m.group(4).split(":"); 210 if aps[0] == 'fedid:': 211 del aps[0] 212 aps[0] = fedid(hexstr=aps[0]) 213 214 cu = parse_name(m.group(5)) 215 su = parse_name(m.group(6)) 216 217 access_val = (access_project(aps[0], aps[1:]), 218 parse_name(m.group(5)), parse_name(m.group(6))) 219 220 self.access[access_key] = access_val 221 self.auth.set_attribute(auth_key, "access") 222 continue 223 224 # Nothing matched to here: unknown line - raise exception 225 f.close() 226 raise self.parse_error("Unknown statement at line %d of %s" % \ 227 (lineno, config)) 228 f.close() 229 229 230 230 def get_users(self, obj): 231 232 233 234 235 236 237 238 231 """ 232 Return a list of the IDs of the users in dict 233 """ 234 if obj.has_key('user'): 235 return [ unpack_id(u['userID']) \ 236 for u in obj['user'] if u.has_key('userID') ] 237 else: 238 return None 239 239 240 240 def write_state(self): 241 242 243 244 245 246 247 248 249 250 251 241 if self.state_filename: 242 try: 243 f = open(self.state_filename, 'w') 244 pickle.dump(self.state, f) 245 except IOError, e: 246 self.log.error("Can't write file %s: %s" % \ 247 (self.state_filename, e)) 248 except pickle.PicklingError, e: 249 self.log.error("Pickling problem: %s" % e) 250 except TypeError, e: 251 self.log.error("Pickling problem (TypeError): %s" % e) 252 252 253 253 254 254 def read_state(self): 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 255 """ 256 Read a new copy of access state. Old state is overwritten. 257 258 State format is a simple pickling of the state dictionary. 259 """ 260 if self.state_filename: 261 try: 262 f = open(self.state_filename, "r") 263 self.state = pickle.load(f) 264 265 self.allocation = self.state['allocation'] 266 self.projects = self.state['projects'] 267 self.keys = self.state['keys'] 268 self.types = self.state['types'] 269 270 self.log.debug("[read_state]: Read state from %s" % \ 271 self.state_filename) 272 except IOError, e: 273 self.log.warning(("[read_state]: No saved state: " +\ 274 "Can't open %s: %s") % (self.state_filename, e)) 275 except EOFError, e: 276 self.log.warning(("[read_state]: " +\ 277 "Empty or damaged state file: %s:") % \ 278 self.state_filename) 279 except pickle.UnpicklingError, e: 280 self.log.warning(("[read_state]: No saved state: " + \ 281 "Unpickling failed: %s") % e) 282 283 # Add the ownership attributes to the authorizer. Note that the 284 # indices of the allocation dict are strings, but the attributes are 285 # fedids, so there is a conversion. 286 for k in self.allocation.keys(): 287 for o in self.allocation[k].get('owners', []): 288 self.auth.set_attribute(o, fedid(hexstr=k)) 289 289 290 290 291 291 def permute_wildcards(self, a, p): 292 293 294 295 296 297 298 299 300 301 302 303 304 292 """Return a copy of a with various fields wildcarded. 293 294 The bits of p control the wildcards. A set bit is a wildcard 295 replacement with the lowest bit being user then project then testbed. 296 """ 297 if p & 1: user = ["<any>"] 298 else: user = a[2] 299 if p & 2: proj = "<any>" 300 else: proj = a[1] 301 if p & 4: tb = "<any>" 302 else: tb = a[0] 303 304 return (tb, proj, user) 305 305 306 306 def find_access(self, search): 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 307 """ 308 Search the access DB for a match on this tuple. Return the matching 309 access tuple and the user that matched. 310 311 NB, if the initial tuple fails to match we start inserting wildcards in 312 an order determined by self.project_priority. Try the list of users in 313 order (when wildcarded, there's only one user in the list). 314 """ 315 if self.project_priority: perm = (0, 1, 2, 3, 4, 5, 6, 7) 316 else: perm = (0, 2, 1, 3, 4, 6, 5, 7) 317 318 for p in perm: 319 s = self.permute_wildcards(search, p) 320 # s[2] is None on an anonymous, unwildcarded request 321 if s[2] != None: 322 for u in s[2]: 323 if self.access.has_key((s[0], s[1], u)): 324 return (self.access[(s[0], s[1], u)], u) 325 else: 326 if self.access.has_key(s): 327 return (self.access[s], None) 328 return None, None 329 329 330 330 def lookup_access(self, req, fid): 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 331 """ 332 Determine the allowed access for this request. Return the access and 333 which fields are dynamic. 334 335 The fedid is needed to construct the request 336 """ 337 # Search keys 338 tb = None 339 project = None 340 user = None 341 # Return values 342 rp = access_project(None, ()) 343 ru = None 344 345 if req.has_key('project'): 346 p = req['project'] 347 if p.has_key('name'): 348 project = unpack_id(p['name']) 349 user = self.get_users(p) 350 else: 351 user = self.get_users(req) 352 353 user_fedids = [ u for u in user if isinstance(u, fedid)] 354 # Determine how the caller is representing itself. If its fedid shows 355 # up as a project or a singleton user, let that stand. If neither the 356 # usernames nor the project name is a fedid, the caller is a testbed. 357 if project and isinstance(project, fedid): 358 if project == fid: 359 # The caller is the project (which is already in the tuple 360 # passed in to the authorizer) 361 owners = user_fedids 362 owners.append(project) 363 else: 364 raise service_error(service_error.req, 365 "Project asserting different fedid") 366 else: 367 if fid not in user_fedids: 368 tb = fid 369 owners = user_fedids 370 owners.append(fid) 371 else: 372 if len(fedids) > 1: 373 raise service_error(service_error.req, 374 "User asserting different fedid") 375 else: 376 # Which is a singleton 377 owners = user_fedids 378 # Confirm authorization 379 380 for u in user: 381 self.log.debug("[lookup_access] Checking access for %s" % \ 382 ((tb, project, u),)) 383 if self.auth.check_attribute((tb, project, u), 'access'): 384 self.log.debug("[lookup_access] Access granted") 385 break 386 else: 387 self.log.debug("[lookup_access] Access Denied") 388 else: 389 raise service_error(service_error.access, "Access denied") 390 391 # This maps a valid user to the Emulab projects and users to use 392 found, user_match = self.find_access((tb, project, user)) 393 394 if found == None: 395 raise service_error(service_error.access, 396 "Access denied - cannot map access") 397 398 # resolve <dynamic> and <same> in found 399 dyn_proj = False 400 dyn_create_user = False 401 dyn_service_user = False 402 403 if found[0].name == "<same>": 404 if project != None: 405 rp.name = project 406 else : 407 raise service_error(\ 408 service_error.server_config, 409 "Project matched <same> when no project given") 410 elif found[0].name == "<dynamic>": 411 rp.name = None 412 dyn_proj = True 413 else: 414 rp.name = found[0].name 415 rp.node_types = found[0].node_types; 416 417 if found[1] == "<same>": 418 if user_match == "<any>": 419 if user != None: rcu = user[0] 420 else: raise service_error(\ 421 service_error.server_config, 422 "Matched <same> on anonymous request") 423 else: 424 rcu = user_match 425 elif found[1] == "<dynamic>": 426 rcu = None 427 dyn_create_user = True 428 else: 429 rcu = found[1] 430 431 if found[2] == "<same>": 432 if user_match == "<any>": 433 if user != None: rsu = user[0] 434 else: raise service_error(\ 435 service_error.server_config, 436 "Matched <same> on anonymous request") 437 else: 438 rsu = user_match 439 elif found[2] == "<dynamic>": 440 rsu = None 441 dyn_service_user = True 442 else: 443 rsu = found[2] 444 445 return (rp, rcu, rsu), (dyn_create_user, dyn_service_user, dyn_proj),\ 446 owners 447 447 448 448 def build_response(self, alloc_id, ap): 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 449 """ 450 Create the SOAP response. 451 452 Build the dictionary description of the response and use 453 fedd_utils.pack_soap to create the soap message. ap is the allocate 454 project message returned from a remote project allocation (even if that 455 allocation was done locally). 456 """ 457 # Because alloc_id is already a fedd_services_types.IDType_Holder, 458 # there's no need to repack it 459 msg = { 460 'allocID': alloc_id, 461 'emulab': { 462 'domain': self.domain, 463 'boss': self.boss, 464 'ops': self.ops, 465 'fileServer': self.fileserver, 466 'eventServer': self.eventserver, 467 'project': ap['project'] 468 }, 469 } 470 if len(self.attrs) > 0: 471 msg['emulab']['fedAttr'] = \ 472 [ { 'attribute': x, 'value' : y } \ 473 for x,y in self.attrs.iteritems()] 474 return msg 475 475 476 476 def RequestAccess(self, req, fid): 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 477 """ 478 Handle the access request. Proxy if not for us. 479 480 Parse out the fields and make the allocations or rejections if for us, 481 otherwise, assuming we're willing to proxy, proxy the request out. 482 """ 483 484 def gateway_hardware(h): 485 if h == 'GWTYPE': return self.attrs.get('connectorType', 'GWTYPE') 486 else: return h 487 488 # The dance to get into the request body 489 if req.has_key('RequestAccessRequestBody'): 490 req = req['RequestAccessRequestBody'] 491 else: 492 raise service_error(service_error.req, "No request!?") 493 494 if req.has_key('destinationTestbed'): 495 dt = unpack_id(req['destinationTestbed']) 496 497 if dt == None or dt in self.testbed: 498 # Request for this fedd 499 found, dyn, owners = self.lookup_access(req, fid) 500 restricted = None 501 ap = None 502 503 # If this is a request to export a project and the access project 504 # is not the project to export, access denied. 505 if req.has_key('exportProject'): 506 ep = unpack_id(req['exportProject']) 507 if ep != found[0].name: 508 raise service_error(service_error.access, 509 "Cannot export %s" % ep) 510 511 # Check for access to restricted nodes 512 if req.has_key('resources') and req['resources'].has_key('node'): 513 resources = req['resources'] 514 restricted = [ gateway_hardware(t) for n in resources['node'] \ 515 if n.has_key('hardware') \ 516 for t in n['hardware'] \ 517 if gateway_hardware(t) \ 518 in self.restricted ] 519 inaccessible = [ t for t in restricted \ 520 if t not in found[0].node_types] 521 if len(inaccessible) > 0: 522 raise service_error(service_error.access, 523 "Access denied (nodetypes %s)" % \ 524 str(', ').join(inaccessible)) 525 # These collect the keys for the two roles into single sets, one 526 # for creation and one for service. The sets are a simple way to 527 # eliminate duplicates 528 create_ssh = set([ x['sshPubkey'] \ 529 for x in req['createAccess'] \ 530 if x.has_key('sshPubkey')]) 531 532 service_ssh = set([ x['sshPubkey'] \ 533 for x in req['serviceAccess'] \ 534 if x.has_key('sshPubkey')]) 535 536 if len(create_ssh) > 0 and len(service_ssh) >0: 537 if dyn[1]: 538 # Compose the dynamic project request 539 # (only dynamic, dynamic currently allowed) 540 preq = { 'AllocateProjectRequestBody': \ 541 { 'project' : {\ 542 'user': [ \ 543 { \ 544 'access': [ { 'sshPubkey': s } \ 545 for s in service_ssh ], 546 'role': "serviceAccess",\ 547 }, \ 548 { \ 549 'access': [ { 'sshPubkey': s } \ 550 for s in create_ssh ], 551 'role': "experimentCreation",\ 552 }, \ 553 ], \ 554 }\ 555 }\ 556 } 557 if restricted != None and len(restricted) > 0: 558 preq['AllocateProjectRequestBody']['resources'] = \ 559 {'node': [ { 'hardware' : [ h ] } \ 560 for h in restricted ] } 561 ap = self.allocate_project.dynamic_project(preq) 562 else: 563 preq = {'StaticProjectRequestBody' : \ 564 { 'project': \ 565 { 'name' : { 'localname' : found[0].name },\ 566 'user' : [ \ 567 {\ 568 'userID': { 'localname' : found[1] }, \ 569 'access': [ { 'sshPubkey': s } 570 for s in create_ssh ], 571 'role': 'experimentCreation'\ 572 },\ 573 {\ 574 'userID': { 'localname' : found[2] }, \ 575 'access': [ { 'sshPubkey': s } 576 for s in service_ssh ], 577 'role': 'serviceAccess'\ 578 },\ 579 ]}\ 580 }\ 581 } 582 if restricted != None and len(restricted) > 0: 583 preq['StaticProjectRequestBody']['resources'] = \ 584 {'node': [ { 'hardware' : [ h ] } \ 585 for h in restricted ] } 586 ap = self.allocate_project.static_project(preq) 587 else: 588 raise service_error(service_error.req, 589 "SSH access parameters required") 590 # keep track of what's been added 591 allocID, alloc_cert = generate_fedid(subj="alloc", log=self.log) 592 aid = unicode(allocID) 593 594 self.state_lock.acquire() 595 self.allocation[aid] = { } 596 try: 597 pname = ap['project']['name']['localname'] 598 except KeyError: 599 pname = None 600 601 if dyn[1]: 602 if not pname: 603 self.state_lock.release() 604 raise service_error(service_error.internal, 605 "Misformed allocation response?") 606 if self.projects.has_key(pname): self.projects[pname] += 1 607 else: self.projects[pname] = 1 608 self.allocation[aid]['project'] = pname 609 610 if ap.has_key('resources'): 611 if not pname: 612 self.state_lock.release() 613 raise service_error(service_error.internal, 614 "Misformed allocation response?") 615 self.allocation[aid]['types'] = set() 616 nodes = ap['resources'].get('node', []) 617 for n in nodes: 618 for h in n.get('hardware', []): 619 if self.types.has_key((pname, h)): 620 self.types[(pname, h)] += 1 621 else: 622 self.types[(pname, h)] = 1 623 self.allocation[aid]['types'].add((pname,h)) 624 625 626 self.allocation[aid]['keys'] = [ ] 627 628 try: 629 for u in ap['project']['user']: 630 uname = u['userID']['localname'] 631 for k in [ k['sshPubkey'] for k in u['access'] \ 632 if k.has_key('sshPubkey') ]: 633 kv = "%s:%s" % (uname, k) 634 if self.keys.has_key(kv): self.keys[kv] += 1 635 else: self.keys[kv] = 1 636 self.allocation[aid]['keys'].append((uname, k)) 637 except KeyError: 638 self.state_lock.release() 639 raise service_error(service_error.internal, 640 "Misformed allocation response?") 641 642 643 self.allocation[aid]['owners'] = owners 644 self.write_state() 645 self.state_lock.release() 646 for o in owners: 647 self.auth.set_attribute(o, allocID) 648 resp = self.build_response({ 'fedid': allocID } , ap) 649 return resp 650 else: 651 if self.allow_proxy: 652 resp = self.proxy_RequestAccess.call_service(dt, req, 653 self.cert_file, self.cert_pwd, 654 self.trusted_certs) 655 if resp.has_key('RequestAccessResponseBody'): 656 return resp['RequestAccessResponseBody'] 657 else: 658 return None 659 else: 660 raise service_error(service_error.access, 661 "Access proxying denied") 662 662 663 663 def ReleaseAccess(self, req, fid): 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 664 # The dance to get into the request body 665 if req.has_key('ReleaseAccessRequestBody'): 666 req = req['ReleaseAccessRequestBody'] 667 else: 668 raise service_error(service_error.req, "No request!?") 669 670 if req.has_key('destinationTestbed'): 671 dt = unpack_id(req['destinationTestbed']) 672 else: 673 dt = None 674 675 if dt == None or dt in self.testbed: 676 # Local request 677 try: 678 if req['allocID'].has_key('localname'): 679 auth_attr = aid = req['allocID']['localname'] 680 elif req['allocID'].has_key('fedid'): 681 aid = unicode(req['allocID']['fedid']) 682 auth_attr = req['allocID']['fedid'] 683 else: 684 raise service_error(service_error.req, 685 "Only localnames and fedids are understood") 686 except KeyError: 687 raise service_error(service_error.req, "Badly formed request") 688 689 self.log.debug("[access] deallocation requested for %s", aid) 690 if not self.auth.check_attribute(fid, auth_attr): 691 self.log.debug("[access] deallocation denied for %s", aid) 692 raise service_error(service_error.access, "Access Denied") 693 694 # If we know this allocation, reduce the reference counts and 695 # remove the local allocations. Otherwise report an error. If 696 # there is an allocation to delete, del_users will be a dictonary 697 # of sets where the key is the user that owns the keys in the set. 698 # We use a set to avoid duplicates. del_project is just the name 699 # of any dynamic project to delete. We're somewhat lazy about 700 # deleting authorization attributes. Having access to something 701 # that doesn't exist isn't harmful. 702 del_users = { } 703 del_project = None 704 del_types = set() 705 706 if self.allocation.has_key(aid): 707 self.log.debug("Found allocation for %s" %aid) 708 self.state_lock.acquire() 709 for k in self.allocation[aid]['keys']: 710 kk = "%s:%s" % k 711 self.keys[kk] -= 1 712 if self.keys[kk] == 0: 713 if not del_users.has_key(k[0]): 714 del_users[k[0]] = set() 715 del_users[k[0]].add(k[1]) 716 del self.keys[kk] 717 718 if self.allocation[aid].has_key('project'): 719 pname = self.allocation[aid]['project'] 720 self.projects[pname] -= 1 721 if self.projects[pname] == 0: 722 del_project = pname 723 del self.projects[pname] 724 725 if self.allocation[aid].has_key('types'): 726 for t in self.allocation[aid]['types']: 727 self.types[t] -= 1 728 if self.types[t] == 0: 729 if not del_project: del_project = t[0] 730 del_types.add(t[1]) 731 del self.types[t] 732 733 del self.allocation[aid] 734 self.write_state() 735 self.state_lock.release() 736 # If we actually have resources to deallocate, prepare the call. 737 if del_project or del_users: 738 msg = { 'project': { }} 739 if del_project: 740 msg['project']['name']= {'localname': del_project} 741 users = [ ] 742 for u in del_users.keys(): 743 users.append({ 'userID': { 'localname': u },\ 744 'access' : \ 745 [ {'sshPubkey' : s } for s in del_users[u]]\ 746 }) 747 if users: 748 msg['project']['user'] = users 749 if len(del_types) > 0: 750 msg['resources'] = { 'node': \ 751 [ {'hardware': [ h ] } for h in del_types ]\ 752 } 753 if self.allocate_project.release_project: 754 msg = { 'ReleaseProjectRequestBody' : msg} 755 self.allocate_project.release_project(msg) 756 return { 'allocID': req['allocID'] } 757 else: 758 raise service_error(service_error.req, "No such allocation") 759 760 else: 761 if self.allow_proxy: 762 resp = self.proxy_ReleaseAccess.call_service(dt, req, 763 self.cert_file, self.cert_pwd, 764 self.trusted_certs) 765 if resp.has_key('ReleaseAccessResponseBody'): 766 return resp['ReleaseAccessResponseBody'] 767 else: 768 return None 769 else: 770 raise service_error(service_error.access, 771 "Access proxying denied") 772 773 -
fedd/federation/experiment_control.py
r8780cbec r866c983 40 40 41 41 class thread_pool: 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 42 """ 43 A class to keep track of a set of threads all invoked for the same 44 task. Manages the mutual exclusion of the states. 45 """ 46 def __init__(self, nthreads): 47 """ 48 Start a pool. 49 """ 50 self.changed = Condition() 51 self.started = 0 52 self.terminated = 0 53 self.nthreads = nthreads 54 55 def acquire(self): 56 """ 57 Get the pool's lock. 58 """ 59 self.changed.acquire() 60 61 def release(self): 62 """ 63 Release the pool's lock. 64 """ 65 self.changed.release() 66 67 def wait(self, timeout = None): 68 """ 69 Wait for a pool thread to start or stop. 70 """ 71 self.changed.wait(timeout) 72 73 def start(self): 74 """ 75 Called by a pool thread to report starting. 76 """ 77 self.changed.acquire() 78 self.started += 1 79 self.changed.notifyAll() 80 self.changed.release() 81 82 def terminate(self): 83 """ 84 Called by a pool thread to report finishing. 85 """ 86 self.changed.acquire() 87 self.terminated += 1 88 self.changed.notifyAll() 89 self.changed.release() 90 91 def clear(self): 92 """ 93 Clear all pool data. 94 """ 95 self.changed.acquire() 96 self.started = 0 97 self.terminated =0 98 self.changed.notifyAll() 99 self.changed.release() 100 101 def wait_for_slot(self): 102 """ 103 Wait until we have a free slot to start another pooled thread 104 """ 105 self.acquire() 106 while self.started - self.terminated >= self.nthreads: 107 self.wait() 108 self.release() 109 110 def wait_for_all_done(self): 111 """ 112 Wait until all active threads finish (and at least one has started) 113 """ 114 self.acquire() 115 while self.started == 0 or self.started > self.terminated: 116 self.wait() 117 self.release() 118 118 119 119 class pooled_thread(Thread): 120 121 122 123 124 125 126 127 self.rv = None# Return value of the ops in this thread128 129 self.target=target# Target function to run on start()130 self.args = args# Args to pass to target131 132 self.pdata = pdata# thread_pool for this class133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 120 """ 121 One of a set of threads dedicated to a specific task. Uses the 122 thread_pool class above for coordination. 123 """ 124 def __init__(self, group=None, target=None, name=None, args=(), 125 kwargs={}, pdata=None, trace_file=None): 126 Thread.__init__(self, group, target, name, args, kwargs) 127 self.rv = None # Return value of the ops in this thread 128 self.exception = None # Exception that terminated this thread 129 self.target=target # Target function to run on start() 130 self.args = args # Args to pass to target 131 self.kwargs = kwargs # Additional kw args 132 self.pdata = pdata # thread_pool for this class 133 # Logger for this thread 134 self.log = logging.getLogger("fedd.experiment_control") 135 136 def run(self): 137 """ 138 Emulate Thread.run, except add pool data manipulation and error 139 logging. 140 """ 141 if self.pdata: 142 self.pdata.start() 143 144 if self.target: 145 try: 146 self.rv = self.target(*self.args, **self.kwargs) 147 except service_error, s: 148 self.exception = s 149 self.log.error("Thread exception: %s %s" % \ 150 (s.code_string(), s.desc)) 151 except: 152 self.exception = sys.exc_info()[1] 153 self.log.error(("Unexpected thread exception: %s" +\ 154 "Trace %s") % (self.exception,\ 155 traceback.format_exc())) 156 if self.pdata: 157 self.pdata.terminate() 158 158 159 159 call_RequestAccess = service_caller('RequestAccess') … … 162 162 163 163 def __init__(self, config=None, auth=None): 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 164 """ 165 Intialize the various attributes, most from the config object 166 """ 167 168 def parse_tarfile_list(tf): 169 """ 170 Parse a tarfile list from the configuration. This is a set of 171 paths and tarfiles separated by spaces. 172 """ 173 rv = [ ] 174 if tf is not None: 175 tl = tf.split() 176 while len(tl) > 1: 177 p, t = tl[0:2] 178 del tl[0:2] 179 rv.append((p, t)) 180 return rv 181 182 self.thread_with_rv = experiment_control_local.pooled_thread 183 self.thread_pool = experiment_control_local.thread_pool 184 185 self.cert_file = config.get("experiment_control", "cert_file") 186 if self.cert_file: 187 self.cert_pwd = config.get("experiment_control", "cert_pwd") 188 else: 189 self.cert_file = config.get("globals", "cert_file") 190 self.cert_pwd = config.get("globals", "cert_pwd") 191 192 self.trusted_certs = config.get("experiment_control", "trusted_certs") \ 193 or config.get("globals", "trusted_certs") 194 195 self.exp_stem = "fed-stem" 196 self.log = logging.getLogger("fedd.experiment_control") 197 set_log_level(config, "experiment_control", self.log) 198 self.muxmax = 2 199 self.nthreads = 2 200 self.randomize_experiments = False 201 202 self.scp_exec = "/usr/bin/scp" 203 self.splitter = None 204 self.ssh_exec="/usr/bin/ssh" 205 self.ssh_keygen = "/usr/bin/ssh-keygen" 206 self.ssh_identity_file = None 207 208 209 self.debug = config.getboolean("experiment_control", "create_debug") 210 self.state_filename = config.get("experiment_control", 211 "experiment_state") 212 self.splitter_url = config.get("experiment_control", "splitter_uri") 213 self.fedkit = parse_tarfile_list(\ 214 config.get("experiment_control", "fedkit")) 215 self.gatewaykit = parse_tarfile_list(\ 216 config.get("experiment_control", "gatewaykit")) 217 accessdb_file = config.get("experiment_control", "accessdb") 218 219 self.ssh_pubkey_file = config.get("experiment_control", 220 "ssh_pubkey_file") 221 self.ssh_privkey_file = config.get("experiment_control", 222 "ssh_privkey_file") 223 # NB for internal master/slave ops, not experiment setup 224 self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa") 225 self.state = { } 226 self.state_lock = Lock() 227 self.tclsh = "/usr/local/bin/otclsh" 228 self.tcl_splitter = config.get("splitter", "tcl_splitter") or \ 229 config.get("experiment_control", "tcl_splitter", 230 "/usr/testbed/lib/ns2ir/parse.tcl") 231 mapdb_file = config.get("experiment_control", "mapdb") 232 self.trace_file = sys.stderr 233 234 self.def_expstart = \ 235 "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\ 236 "/tmp/federate"; 237 self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\ 238 "FEDDIR/hosts"; 239 self.def_gwstart = \ 240 "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\ 241 "/tmp/bridge.log"; 242 self.def_mgwstart = \ 243 "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\ 244 "/tmp/bridge.log"; 245 self.def_gwimage = "FBSD61-TUNNEL2"; 246 self.def_gwtype = "pc"; 247 self.local_access = { } 248 249 if auth: 250 self.auth = auth 251 else: 252 self.log.error(\ 253 "[access]: No authorizer initialized, creating local one.") 254 auth = authorizer() 255 256 257 if self.ssh_pubkey_file: 258 try: 259 f = open(self.ssh_pubkey_file, 'r') 260 self.ssh_pubkey = f.read() 261 f.close() 262 except IOError: 263 raise service_error(service_error.internal, 264 "Cannot read sshpubkey") 265 else: 266 raise service_error(service_error.internal, 267 "No SSH public key file?") 268 269 if not self.ssh_privkey_file: 270 raise service_error(service_error.internal, 271 "No SSH public key file?") 272 273 274 if mapdb_file: 275 self.read_mapdb(mapdb_file) 276 else: 277 self.log.warn("[experiment_control] No testbed map, using defaults") 278 self.tbmap = { 279 'deter':'https://users.isi.deterlab.net:23235', 280 'emulab':'https://users.isi.deterlab.net:23236', 281 'ucb':'https://users.isi.deterlab.net:23237', 282 } 283 284 if accessdb_file: 285 self.read_accessdb(accessdb_file) 286 else: 287 raise service_error(service_error.internal, 288 "No accessdb specified in config") 289 290 # Grab saved state. OK to do this w/o locking because it's read only 291 # and only one thread should be in existence that can see self.state at 292 # this point. 293 if self.state_filename: 294 self.read_state() 295 296 # Dispatch tables 297 self.soap_services = {\ 298 'Create': soap_handler('Create', self.create_experiment), 299 'Vtopo': soap_handler('Vtopo', self.get_vtopo), 300 'Vis': soap_handler('Vis', self.get_vis), 301 'Info': soap_handler('Info', self.get_info), 302 'Terminate': soap_handler('Terminate', 303 self.terminate_experiment), 304 } 305 306 self.xmlrpc_services = {\ 307 'Create': xmlrpc_handler('Create', self.create_experiment), 308 'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo), 309 'Vis': xmlrpc_handler('Vis', self.get_vis), 310 'Info': xmlrpc_handler('Info', self.get_info), 311 'Terminate': xmlrpc_handler('Terminate', 312 self.terminate_experiment), 313 } 314 314 315 315 def copy_file(self, src, dest, size=1024): 316 317 318 319 320 321 322 323 324 325 326 327 316 """ 317 Exceedingly simple file copy. 318 """ 319 s = open(src,'r') 320 d = open(dest, 'w') 321 322 buf = "x" 323 while buf != "": 324 buf = s.read(size) 325 d.write(buf) 326 s.close() 327 d.close() 328 328 329 329 # Call while holding self.state_lock 330 330 def write_state(self): 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 331 """ 332 Write a new copy of experiment state after copying the existing state 333 to a backup. 334 335 State format is a simple pickling of the state dictionary. 336 """ 337 if os.access(self.state_filename, os.W_OK): 338 self.copy_file(self.state_filename, \ 339 "%s.bak" % self.state_filename) 340 try: 341 f = open(self.state_filename, 'w') 342 pickle.dump(self.state, f) 343 except IOError, e: 344 self.log.error("Can't write file %s: %s" % \ 345 (self.state_filename, e)) 346 except pickle.PicklingError, e: 347 self.log.error("Pickling problem: %s" % e) 348 except TypeError, e: 349 self.log.error("Pickling problem (TypeError): %s" % e) 350 350 351 351 # Call while holding self.state_lock 352 352 def read_state(self): 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 353 """ 354 Read a new copy of experiment state. Old state is overwritten. 355 356 State format is a simple pickling of the state dictionary. 357 """ 358 try: 359 f = open(self.state_filename, "r") 360 self.state = pickle.load(f) 361 self.log.debug("[read_state]: Read state from %s" % \ 362 self.state_filename) 363 except IOError, e: 364 self.log.warning("[read_state]: No saved state: Can't open %s: %s"\ 365 % (self.state_filename, e)) 366 except pickle.UnpicklingError, e: 367 self.log.warning(("[read_state]: No saved state: " + \ 368 "Unpickling failed: %s") % e) 369 370 for k in self.state.keys(): 371 try: 372 # This list should only have one element in it, but phrasing it 373 # as a for loop doesn't cost much, really. We have to find the 374 # fedid elements anyway. 375 for eid in [ f['fedid'] \ 376 for f in self.state[k]['experimentID']\ 377 if f.has_key('fedid') ]: 378 self.auth.set_attribute(self.state[k]['owner'], eid) 379 except KeyError, e: 380 self.log.warning("[read_state]: State ownership or identity " +\ 381 "misformatted in %s: %s" % (self.state_filename, e)) 382 382 383 383 384 384 def read_accessdb(self, accessdb_file): 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 385 """ 386 Read the mapping from fedids that can create experiments to their name 387 in the 3-level access namespace. All will be asserted from this 388 testbed and can include the local username and porject that will be 389 asserted on their behalf by this fedd. Each fedid is also added to the 390 authorization system with the "create" attribute. 391 """ 392 self.accessdb = {} 393 # These are the regexps for parsing the db 394 name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+" 395 project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \ 396 "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$") 397 user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \ 398 "\s*->\s*(" + name_expr + ")\s*$") 399 lineno = 0 400 401 # Parse the mappings and store in self.authdb, a dict of 402 # fedid -> (proj, user) 403 try: 404 f = open(accessdb_file, "r") 405 for line in f: 406 lineno += 1 407 line = line.strip() 408 if len(line) == 0 or line.startswith('#'): 409 continue 410 m = project_line.match(line) 411 if m: 412 fid = fedid(hexstr=m.group(1)) 413 project, user = m.group(2,3) 414 if not self.accessdb.has_key(fid): 415 self.accessdb[fid] = [] 416 self.accessdb[fid].append((project, user)) 417 continue 418 419 m = user_line.match(line) 420 if m: 421 fid = fedid(hexstr=m.group(1)) 422 project = None 423 user = m.group(2) 424 if not self.accessdb.has_key(fid): 425 self.accessdb[fid] = [] 426 self.accessdb[fid].append((project, user)) 427 continue 428 self.log.warn("[experiment_control] Error parsing access " +\ 429 "db %s at line %d" % (accessdb_file, lineno)) 430 except IOError: 431 raise service_error(service_error.internal, 432 "Error opening/reading %s as experiment " +\ 433 "control accessdb" % accessdb_file) 434 f.close() 435 436 # Initialize the authorization attributes 437 for fid in self.accessdb.keys(): 438 self.auth.set_attribute(fid, 'create') 439 439 440 440 def read_mapdb(self, file): 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 441 """ 442 Read a simple colon separated list of mappings for the 443 label-to-testbed-URL mappings. Clears or creates self.tbmap. 444 """ 445 446 self.tbmap = { } 447 lineno =0 448 try: 449 f = open(file, "r") 450 for line in f: 451 lineno += 1 452 line = line.strip() 453 if line.startswith('#') or len(line) == 0: 454 continue 455 try: 456 label, url = line.split(':', 1) 457 self.tbmap[label] = url 458 except ValueError, e: 459 self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\ 460 "map db: %s %s" % (lineno, line, e)) 461 except IOError, e: 462 self.log.warning("[read_mapdb]: No saved map database: Can't " +\ 463 "open %s: %s" % (file, e)) 464 f.close() 465 465 466 466 def scp_file(self, file, user, host, dest=""): 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 467 """ 468 scp a file to the remote host. If debug is set the action is only 469 logged. 470 """ 471 472 scp_cmd = [self.scp_exec, '-o', 'IdentitiesOnly yes', 473 '-o', 'StrictHostKeyChecking yes', '-i', 474 self.ssh_privkey_file, file, "%s@%s:%s" % (user, host, dest)] 475 rv = 0 476 477 try: 478 dnull = open("/dev/null", "w") 479 except IOError: 480 self.log.debug("[ssh_file]: failed to open /dev/null for redirect") 481 dnull = Null 482 483 self.log.debug("[scp_file]: %s" % " ".join(scp_cmd)) 484 if not self.debug: 485 rv = call(scp_cmd, stdout=dnull, stderr=dnull) 486 487 return rv == 0 488 488 489 489 def ssh_cmd(self, user, host, cmd, wname=None): 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 490 """ 491 Run a remote command on host as user. If debug is set, the action is 492 only logged. 493 """ 494 sh_str = ("%s -o 'IdentitiesOnly yes' -o " + \ 495 "'StrictHostKeyChecking yes' -i %s %s@%s %s") % \ 496 (self.ssh_exec, self.ssh_privkey_file, 497 user, host, cmd) 498 499 try: 500 dnull = open("/dev/null", "r") 501 except IOError: 502 self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect") 503 dnull = Null 504 505 self.log.debug("[ssh_cmd]: %s" % sh_str) 506 if not self.debug: 507 if dnull: 508 sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull) 509 else: 510 sub = Popen(sh_str, shell=True) 511 return sub.wait() == 0 512 else: 513 return True 514 514 515 515 def ship_configs(self, host, user, src_dir, dest_dir): 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 516 """ 517 Copy federant-specific configuration files to the federant. 518 """ 519 if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir): 520 return False 521 if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir): 522 return False 523 524 for f in os.listdir(src_dir): 525 if os.path.isdir(f): 526 if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 527 "%s/%s" % (dest_dir, f)): 528 return False 529 else: 530 if not self.scp_file("%s/%s" % (src_dir, f), 531 user, host, dest_dir): 532 return False 533 return True 534 534 535 535 def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0): 536 537 538 539 540 541 542 543 544 545 user = tbparams[tb]['user']# federant user546 pid = tbparams[tb]['project']# federant project547 548 549 550 551 552 553 554 555 556 557 558 559 state = None# Experiment state parsed from expinfo560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 536 """ 537 Start a sub-experiment on a federant. 538 539 Get the current state, modify or create as appropriate, ship data and 540 configs and start the experiment. There are small ordering differences 541 based on the initial state of the sub-experiment. 542 """ 543 # ops node in the federant 544 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 545 user = tbparams[tb]['user'] # federant user 546 pid = tbparams[tb]['project'] # federant project 547 # XXX 548 base_confs = ( "hosts",) 549 tclfile = "%s.%s.tcl" % (eid, tb) # sub-experiment description 550 # command to test experiment state 551 expinfo_exec = "/usr/testbed/bin/expinfo" 552 # Configuration directories on the remote machine 553 proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid) 554 tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid) 555 rpms_dir = "/proj/%s/rpms/%s" % (pid, eid) 556 # Regular expressions to parse the expinfo response 557 state_re = re.compile("State:\s+(\w+)") 558 no_exp_re = re.compile("^No\s+such\s+experiment") 559 state = None # Experiment state parsed from expinfo 560 # The expinfo ssh command. Note the identity restriction to use only 561 # the identity provided in the pubkey given. 562 cmd = [self.ssh_exec, '-o', 'IdentitiesOnly yes', '-o', 563 'StrictHostKeyChecking yes', '-i', 564 self.ssh_privkey_file, "%s@%s" % (user, host), 565 expinfo_exec, pid, eid] 566 567 # Get status 568 self.log.debug("[start_segment]: %s"% " ".join(cmd)) 569 dev_null = None 570 try: 571 dev_null = open("/dev/null", "a") 572 except IOError, e: 573 self.log.error("[start_segment]: can't open /dev/null: %s" %e) 574 575 if self.debug: 576 state = 'swapped' 577 rv = 0 578 else: 579 status = Popen(cmd, stdout=PIPE, stderr=dev_null) 580 for line in status.stdout: 581 m = state_re.match(line) 582 if m: state = m.group(1) 583 else: 584 m = no_exp_re.match(line) 585 if m: state = "none" 586 rv = status.wait() 587 588 # If the experiment is not present the subcommand returns a non-zero 589 # return value. If we successfully parsed a "none" outcome, ignore the 590 # return code. 591 if rv != 0 and state != 'none': 592 raise service_error(service_error.internal, 593 "Cannot get status of segment %s:%s/%s" % (tb, pid, eid)) 594 595 if state not in ('active', 'swapped', 'none'): 596 self.log.debug("[start_segment]:unknown state %s" % state) 597 return False 598 599 self.log.debug("[start_segment]: %s: %s" % (tb, state)) 600 self.log.info("[start_segment]:transferring experiment to %s" % tb) 601 602 if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host): 603 return False 604 # Clear the federation config dirs 605 if not self.ssh_cmd(user, host, 606 "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir): 607 return False 608 # Clear and create the tarfiles and rpm directories 609 for d in (tarfiles_dir, rpms_dir): 610 if not self.ssh_cmd(user, host, 611 "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d): 612 return False 613 if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 614 "create tarfiles"): 615 return False 616 617 if state == 'none': 618 # Create a null copy of the experiment so that we capture any logs 619 # there if the modify fails. Emulab software discards the logs 620 # from a failed startexp 621 if not self.scp_file("%s/null.tcl" % tmpdir, user, host): 622 return False 623 self.log.info("[start_segment]: Creating %s on %s" % (eid, tb)) 624 if not self.ssh_cmd(user, host, 625 "/usr/testbed/bin/startexp -i -f -w -p %s -e %s null.tcl" \ 626 % (pid, eid), "startexp"): 627 return False 628 629 # Create the federation config dirs 630 if not self.ssh_cmd(user, host, 631 "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir): 632 return False 633 for f in base_confs: 634 if not self.scp_file("%s/%s" % (tmpdir, f), user, host, 635 "%s/%s" % (proj_dir, f)): 636 return False 637 if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb), 638 proj_dir): 639 return False 640 if os.path.isdir("%s/tarfiles" % tmpdir): 641 if not self.ship_configs(host, user, 642 "%s/tarfiles" % tmpdir, tarfiles_dir): 643 return False 644 if os.path.isdir("%s/rpms" % tmpdir): 645 if not self.ship_configs(host, user, 646 "%s/rpms" % tmpdir, tarfiles_dir): 647 return False 648 # Stage the new configuration (active experiments will stay swapped in 649 # now) 650 self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb)) 651 if not self.ssh_cmd(user, host, 652 "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \ 653 (pid, eid, tclfile), 654 "modexp"): 655 return False 656 # Active experiments are still swapped, this swaps the others in. 657 if state != 'active': 658 self.log.info("[start_segment]: Swapping %s in on %s" % \ 659 (eid, tb)) 660 if not self.ssh_cmd(user, host, 661 "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid), 662 "swapexp"): 663 return False 664 return True 665 665 666 666 def stop_segment(self, tb, eid, tbparams): 667 668 669 670 671 672 673 674 675 676 677 678 667 """ 668 Stop a sub experiment by calling swapexp on the federant 669 """ 670 user = tbparams[tb]['user'] 671 host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain']) 672 pid = tbparams[tb]['project'] 673 674 self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb)) 675 return self.ssh_cmd(user, host, 676 "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid)) 677 678 679 679 def generate_ssh_keys(self, dest, type="rsa" ): 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 680 """ 681 Generate a set of keys for the gateways to use to talk. 682 683 Keys are of type type and are stored in the required dest file. 684 """ 685 valid_types = ("rsa", "dsa") 686 t = type.lower(); 687 if t not in valid_types: raise ValueError 688 cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest] 689 690 try: 691 trace = open("/dev/null", "w") 692 except IOError: 693 raise service_error(service_error.internal, 694 "Cannot open /dev/null??"); 695 696 # May raise CalledProcessError 697 self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd)) 698 rv = call(cmd, stdout=trace, stderr=trace) 699 if rv != 0: 700 raise service_error(service_error.internal, 701 "Cannot generate nonce ssh keys. %s return code %d" \ 702 % (self.ssh_keygen, rv)) 703 703 704 704 def gentopo(self, str): 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 self.chars = ""# Last text seen739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 705 """ 706 Generate the topology dtat structure from the splitter's XML 707 representation of it. 708 709 The topology XML looks like: 710 <experiment> 711 <nodes> 712 <node><vname></vname><ips>ip1:ip2</ips></node> 713 </nodes> 714 <lans> 715 <lan> 716 <vname></vname><vnode></vnode><ip></ip> 717 <bandwidth></bandwidth><member>node:port</member> 718 </lan> 719 </lans> 720 """ 721 class topo_parse: 722 """ 723 Parse the topology XML and create the dats structure. 724 """ 725 def __init__(self): 726 # Typing of the subelements for data conversion 727 self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member') 728 self.int_subelements = ( 'bandwidth',) 729 self.float_subelements = ( 'delay',) 730 # The final data structure 731 self.nodes = [ ] 732 self.lans = [ ] 733 self.topo = { \ 734 'node': self.nodes,\ 735 'lan' : self.lans,\ 736 } 737 self.element = { } # Current element being created 738 self.chars = "" # Last text seen 739 740 def end_element(self, name): 741 # After each sub element the contents is added to the current 742 # element or to the appropriate list. 743 if name == 'node': 744 self.nodes.append(self.element) 745 self.element = { } 746 elif name == 'lan': 747 self.lans.append(self.element) 748 self.element = { } 749 elif name in self.str_subelements: 750 self.element[name] = self.chars 751 self.chars = "" 752 elif name in self.int_subelements: 753 self.element[name] = int(self.chars) 754 self.chars = "" 755 elif name in self.float_subelements: 756 self.element[name] = float(self.chars) 757 self.chars = "" 758 759 def found_chars(self, data): 760 self.chars += data.rstrip() 761 762 763 tp = topo_parse(); 764 parser = xml.parsers.expat.ParserCreate() 765 parser.EndElementHandler = tp.end_element 766 parser.CharacterDataHandler = tp.found_chars 767 768 parser.Parse(str) 769 770 return tp.topo 771 772 772 773 773 def genviz(self, topo): 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 774 """ 775 Generate the visualization the virtual topology 776 """ 777 778 neato = "/usr/local/bin/neato" 779 # These are used to parse neato output and to create the visualization 780 # file. 781 vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"') 782 vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \ 783 "%s</type></node>" 784 785 try: 786 # Node names 787 nodes = [ n['vname'] for n in topo['node'] ] 788 topo_lans = topo['lan'] 789 except KeyError: 790 raise service_error(service_error.internal, "Bad topology") 791 792 lans = { } 793 links = { } 794 795 # Walk through the virtual topology, organizing the connections into 796 # 2-node connections (links) and more-than-2-node connections (lans). 797 # When a lan is created, it's added to the list of nodes (there's a 798 # node in the visualization for the lan). 799 for l in topo_lans: 800 if links.has_key(l['vname']): 801 if len(links[l['vname']]) < 2: 802 links[l['vname']].append(l['vnode']) 803 else: 804 nodes.append(l['vname']) 805 lans[l['vname']] = links[l['vname']] 806 del links[l['vname']] 807 lans[l['vname']].append(l['vnode']) 808 elif lans.has_key(l['vname']): 809 lans[l['vname']].append(l['vnode']) 810 else: 811 links[l['vname']] = [ l['vnode'] ] 812 813 814 # Open up a temporary file for dot to turn into a visualization 815 try: 816 df, dotname = tempfile.mkstemp() 817 dotfile = os.fdopen(df, 'w') 818 except IOError: 819 raise service_error(service_error.internal, 820 "Failed to open file in genviz") 821 822 # Generate a dot/neato input file from the links, nodes and lans 823 try: 824 print >>dotfile, "graph G {" 825 for n in nodes: 826 print >>dotfile, '\t"%s"' % n 827 for l in links.keys(): 828 print >>dotfile, '\t"%s" -- "%s"' % tuple(links[l]) 829 for l in lans.keys(): 830 for n in lans[l]: 831 print >>dotfile, '\t "%s" -- "%s"' % (n,l) 832 print >>dotfile, "}" 833 dotfile.close() 834 except TypeError: 835 raise service_error(service_error.internal, 836 "Single endpoint link in vtopo") 837 except IOError: 838 raise service_error(service_error.internal, "Cannot write dot file") 839 840 # Use dot to create a visualization 841 dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000', 842 '-Gpack=true', dotname], stdout=PIPE) 843 844 # Translate dot to vis format 845 vis_nodes = [ ] 846 vis = { 'node': vis_nodes } 847 for line in dot.stdout: 848 m = vis_re.match(line) 849 if m: 850 vn = m.group(1) 851 vis_node = {'name': vn, \ 852 'x': float(m.group(2)),\ 853 'y' : float(m.group(3)),\ 854 } 855 if vn in links.keys() or vn in lans.keys(): 856 vis_node['type'] = 'lan' 857 else: 858 vis_node['type'] = 'node' 859 vis_nodes.append(vis_node) 860 rv = dot.wait() 861 862 os.remove(dotname) 863 if rv == 0 : return vis 864 else: return None 865 865 866 866 def get_access(self, tb, nodes, user, tbparam, master, export_projectaccess_user): 868 """ 869 Get access to testbed through fedd and set the parameters for that tb 870 """ 871 uri = self.tbmap.get(tb, None) 872 if not uri: 873 raise service_error(serice_error.server_config, 874 "Unknown testbed: %s" % tb) 875 876 # currently this lumps all users into one service access group 877 service_keys = [ a for u in user \ 878 for a in u.get('access', []) \ 879 if a.has_key('sshPubkey')] 880 881 if len(service_keys) == 0: 882 raise service_error(service_error.req, 883 "Must have at least one SSH pubkey for services") 884 885 886 for p, u in access_user: 887 self.log.debug(("[get_access] Attempting access from (%s, %s) " + \ 888 "to %s") % ((p or "None"), u, uri)) 889 890 if p: 891 # Request with user and project specified 892 req = {\ 893 'destinationTestbed' : { 'uri' : uri }, 894 'project': { 895 'name': {'localname': p}, 896 'user': [ {'userID': { 'localname': u } } ], 897 }, 898 'user': user, 899 'allocID' : { 'localname': 'test' }, 900 'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ], 901 'serviceAccess' : service_keys 902 } 903 else: 904 # Request with only user specified 905 req = {\ 906 'destinationTestbed' : { 'uri' : uri }, 907 'user': [ {'userID': { 'localname': u } } ], 908 'allocID' : { 'localname': 'test' }, 909 'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ], 910 'serviceAccess' : service_keys 911 } 912 913 if tb == master: 914 # NB, the export_project parameter is a dict that includes 915 # the type 916 req['exportProject'] = export_project 917 918 # node resources if any 919 if nodes != None and len(nodes) > 0: 920 rnodes = [ ] 921 for n in nodes: 922 rn = { } 923 image, hw, count = n.split(":") 924 if image: rn['image'] = [ image ] 925 if hw: rn['hardware'] = [ hw ] 926 if count and int(count) >0 : rn['count'] = int(count) 927 rnodes.append(rn) 928 req['resources']= { } 929 req['resources']['node'] = rnodes 930 931 try: 932 if self.local_access.has_key(uri): 933 # Local access call 934 req = { 'RequestAccessRequestBody' : req } 935 r = self.local_access[uri].RequestAccess(req, 936 fedid(file=self.cert_file)) 937 r = { 'RequestAccessResponseBody' : r } 938 else: 939 r = self.call_RequestAccess(uri, req, 940 self.cert_file, self.cert_pwd, self.trusted_certs) 941 except service_error, e: 942 if e.code == service_error.access: 943 self.log.debug("[get_access] Access denied") 944 r = None 945 continue 946 else: 947 raise e 948 949 if r.has_key('RequestAccessResponseBody'): 950 # Through to here we have a valid response, not a fault. 951 # Access denied is a fault, so something better or worse than 952 # access denied has happened. 953 r = r['RequestAccessResponseBody'] 954 self.log.debug("[get_access] Access granted") 955 break 956 else: 957 raise service_error(service_error.protocol, 958 "Bad proxy response") 959 960 if not r: 961 raise service_error(service_error.access, 962 "Access denied by %s (%s)" % (tb, uri)) 963 964 e = r['emulab'] 965 p = e['project'] 966 tbparam[tb] = { 967 "boss": e['boss'], 968 "host": e['ops'], 969 "domain": e['domain'], 970 "fs": e['fileServer'], 971 "eventserver": e['eventServer'], 972 "project": unpack_id(p['name']), 973 "emulab" : e, 974 "allocID" : r['allocID'], 975 } 976 # Make the testbed name be the label the user applied 977 p['testbed'] = {'localname': tb } 978 979 for u in p['user']: 980 role = u.get('role', None) 981 if role == 'experimentCreation': 982 tbparam[tb]['user'] = unpack_id(u['userID']) 983 break 984 else: 985 raise service_error(service_error.internal, 986 "No createExperimentUser from %s" %tb) 987 988 # Add attributes to barameter space. We don't allow attributes to 989 # overlay any parameters already installed. 990 for a in e['fedAttr']: 991 try: 992 if a['attribute'] and isinstance(a['attribute'], basestring)\ 993 and not tbparam[tb].has_key(a['attribute'].lower()): 994 tbparam[tb][a['attribute'].lower()] = a['value'] 995 except KeyError: 996 self.log.error("Bad attribute in response: %s" % a) 997 998 998 def release_access(self, tb, aid): 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 999 """ 1000 Release access to testbed through fedd 1001 """ 1002 1003 uri = self.tbmap.get(tb, None) 1004 if not uri: 1005 raise service_error(serice_error.server_config, 1006 "Unknown testbed: %s" % tb) 1007 1008 if self.local_access.has_key(uri): 1009 resp = self.local_access[uri].ReleaseAccess(\ 1010 { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 1011 fedid(file=self.cert_file)) 1012 resp = { 'ReleaseAccessResponseBody': resp } 1013 else: 1014 resp = self.call_ReleaseAccess(uri, {'allocID': aid}, 1015 self.cert_file, self.cert_pwd, self.trusted_certs) 1016 1017 # better error coding 1018 1018 1019 1019 def remote_splitter(self, uri, desc, master): 1020 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1021 req = { 1022 'description' : { 'ns2description': desc }, 1023 'master': master, 1024 'include_fedkit': bool(self.fedkit), 1025 'include_gatewaykit': bool(self.gatewaykit) 1026 } 1027 1028 r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 1029 self.trusted_certs) 1030 1031 if r.has_key('Ns2SplitResponseBody'): 1032 r = r['Ns2SplitResponseBody'] 1033 if r.has_key('output'): 1034 return r['output'].splitlines() 1035 else: 1036 raise service_error(service_error.protocol, 1037 "Bad splitter response (no output)") 1038 else: 1039 raise service_error(service_error.protocol, "Bad splitter response") 1040 1041 1041 class current_testbed: 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1042 """ 1043 Object for collecting the current testbed description. The testbed 1044 description is saved to a file with the local testbed variables 1045 subsittuted line by line. 1046 """ 1047 def __init__(self, eid, tmpdir, fedkit, gatewaykit): 1048 def tar_list_to_string(tl): 1049 if tl is None: return None 1050 1051 rv = "" 1052 for t in tl: 1053 rv += " %s PROJDIR/tarfiles/EID/%s" % \ 1054 (t[0], os.path.basename(t[1])) 1055 return rv 1056 1057 1058 self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)") 1059 self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)") 1060 self.current_testbed = None 1061 self.testbed_file = None 1062 1063 self.def_expstart = \ 1064 "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate"; 1065 self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts"; 1066 self.def_gwstart = \ 1067 "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log"; 1068 self.def_mgwstart = \ 1069 "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log"; 1070 self.def_gwimage = "FBSD61-TUNNEL2"; 1071 self.def_gwtype = "pc"; 1072 self.def_mgwcmd = '# ' 1073 self.def_mgwcmdparams = '' 1074 self.def_gwcmd = '# ' 1075 self.def_gwcmdparams = '' 1076 1077 self.eid = eid 1078 self.tmpdir = tmpdir 1079 # Convert fedkit and gateway kit (which are lists of tuples) into a 1080 # substituition string. 1081 self.fedkit = tar_list_to_string(fedkit) 1082 self.gatewaykit = tar_list_to_string(gatewaykit) 1083 1084 def __call__(self, line, master, allocated, tbparams): 1085 # Capture testbed topology descriptions 1086 if self.current_testbed == None: 1087 m = self.begin_testbed.match(line) 1088 if m != None: 1089 self.current_testbed = m.group(1) 1090 if self.current_testbed == None: 1091 raise service_error(service_error.req, 1092 "Bad request format (unnamed testbed)") 1093 allocated[self.current_testbed] = \ 1094 allocated.get(self.current_testbed,0) + 1 1095 tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed) 1096 if not os.path.exists(tb_dir): 1097 try: 1098 os.mkdir(tb_dir) 1099 except IOError: 1100 raise service_error(service_error.internal, 1101 "Cannot create %s" % tb_dir) 1102 try: 1103 self.testbed_file = open("%s/%s.%s.tcl" % 1104 (tb_dir, self.eid, self.current_testbed), 'w') 1105 except IOError: 1106 self.testbed_file = None 1107 return True 1108 else: return False 1109 else: 1110 m = self.end_testbed.match(line) 1111 if m != None: 1112 if m.group(1) != self.current_testbed: 1113 raise service_error(service_error.internal, 1114 "Mismatched testbed markers!?") 1115 if self.testbed_file != None: 1116 self.testbed_file.close() 1117 self.testbed_file = None 1118 self.current_testbed = None 1119 elif self.testbed_file: 1120 # Substitute variables and put the line into the local 1121 # testbed file. 1122 gwtype = tbparams[self.current_testbed].get(\ 1123 'connectortype', self.def_gwtype) 1124 gwimage = tbparams[self.current_testbed].get(\ 1125 'connectorimage', self.def_gwimage) 1126 mgwstart = tbparams[self.current_testbed].get(\ 1127 'masterconnectorstartcmd', self.def_mgwstart) 1128 mexpstart = tbparams[self.current_testbed].get(\ 1129 'masternodestartcmd', self.def_mexpstart) 1130 gwstart = tbparams[self.current_testbed].get(\ 1131 'slaveconnectorstartcmd', self.def_gwstart) 1132 expstart = tbparams[self.current_testbed].get(\ 1133 'slavenodestartcmd', self.def_expstart) 1134 project = tbparams[self.current_testbed].get('project') 1135 gwcmd = tbparams[self.current_testbed].get(\ 1136 'slaveconnectorcmd', self.def_gwcmd) 1137 gwcmdparams = tbparams[self.current_testbed].get(\ 1138 'slaveconnectorcmdparams', self.def_gwcmdparams) 1139 mgwcmd = tbparams[self.current_testbed].get(\ 1140 'masterconnectorcmd', self.def_gwcmd) 1141 mgwcmdparams = tbparams[self.current_testbed].get(\ 1142 'masterconnectorcmdparams', self.def_gwcmdparams) 1143 line = re.sub("GWTYPE", gwtype, line) 1144 line = re.sub("GWIMAGE", gwimage, line) 1145 if self.current_testbed == master: 1146 line = re.sub("GWSTART", mgwstart, line) 1147 line = re.sub("EXPSTART", mexpstart, line) 1148 # NB GWCMDPARAMS is a prefix of GWCMD, so expand first 1149 line = re.sub("GWCMDPARAMS", mgwcmdparams, line) 1150 line = re.sub("(#\s*)?GWCMD", mgwcmd, line) 1151 else: 1152 line = re.sub("GWSTART", gwstart, line) 1153 line = re.sub("EXPSTART", expstart, line) 1154 # NB GWCMDPARAMS is a prefix of GWCMD, so expand first 1155 line = re.sub("GWCMDPARAMS", gwcmdparams, line) 1156 line = re.sub("(#\s*)?GWCMD", gwcmd, line) 1157 #These expansions contain EID and PROJDIR. NB these are 1158 # local fedkit and gatewaykit, which are strings. 1159 if self.fedkit: 1160 line = re.sub("FEDKIT", self.fedkit, line) 1161 if self.gatewaykit: 1162 line = re.sub("GATEWAYKIT", self.gatewaykit, line) 1163 line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line) 1164 line = re.sub("PROJDIR", "/proj/%s/" % project, line) 1165 line = re.sub("EID", self.eid, line) 1166 line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \ 1167 (project, self.eid), line) 1168 print >>self.testbed_file, line 1169 return True 1170 1170 1171 1171 class allbeds: 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1172 """ 1173 Process the Allbeds section. Get access to each federant and save the 1174 parameters in tbparams 1175 """ 1176 def __init__(self, get_access): 1177 self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds") 1178 self.end_allbeds = re.compile("^#\s+End\s+Allbeds") 1179 self.in_allbeds = False 1180 self.get_access = get_access 1181 1182 def __call__(self, line, user, tbparams, master, export_project, 1183 access_user): 1184 # Testbed access parameters 1185 if not self.in_allbeds: 1186 if self.begin_allbeds.match(line): 1187 self.in_allbeds = True 1188 return True 1189 else: 1190 return False 1191 else: 1192 if self.end_allbeds.match(line): 1193 self.in_allbeds = False 1194 else: 1195 nodes = line.split('|') 1196 tb = nodes.pop(0) 1197 self.get_access(tb, nodes, user, tbparams, master, 1198 export_project, access_user) 1199 return True 1200 1200 1201 1201 class gateways: 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1202 def __init__(self, eid, master, tmpdir, gw_pubkey, 1203 gw_secretkey, copy_file, fedkit): 1204 self.begin_gateways = \ 1205 re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)") 1206 self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)") 1207 self.current_gateways = None 1208 self.control_gateway = None 1209 self.active_end = { } 1210 1211 self.eid = eid 1212 self.master = master 1213 self.tmpdir = tmpdir 1214 self.gw_pubkey_base = gw_pubkey 1215 self.gw_secretkey_base = gw_secretkey 1216 1217 self.copy_file = copy_file 1218 self.fedkit = fedkit 1219 1220 1221 def gateway_conf_file(self, gw, master, eid, pubkey, privkey, 1222 active_end, tbparams, dtb, myname, desthost, type): 1223 """ 1224 Produce a gateway configuration file from a gateways line. 1225 """ 1226 1227 sproject = tbparams[gw].get('project', 'project') 1228 dproject = tbparams[dtb].get('project', 'project') 1229 sdomain = ".%s.%s%s" % (eid, sproject, 1230 tbparams[gw].get('domain', ".example.com")) 1231 ddomain = ".%s.%s%s" % (eid, dproject, 1232 tbparams[dtb].get('domain', ".example.com")) 1233 boss = tbparams[master].get('boss', "boss") 1234 fs = tbparams[master].get('fs', "fs") 1235 event_server = "%s%s" % \ 1236 (tbparams[gw].get('eventserver', "event_server"), 1237 tbparams[gw].get('domain', "example.com")) 1238 remote_event_server = "%s%s" % \ 1239 (tbparams[dtb].get('eventserver', "event_server"), 1240 tbparams[dtb].get('domain', "example.com")) 1241 seer_control = "%s%s" % \ 1242 (tbparams[gw].get('control', "control"), sdomain) 1243 tunnel_iface = tbparams[gw].get("tunnelinterface", None) 1244 1245 if self.fedkit: 1246 remote_script_dir = "/usr/local/federation/bin" 1247 local_script_dir = "/usr/local/federation/bin" 1248 else: 1249 remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 1250 local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 1251 1252 local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid) 1253 remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid) 1254 tunnel_cfg = tbparams[gw].get("tunnelcfg", "false") 1255 1256 conf_file = "%s%s.gw.conf" % (myname, sdomain) 1257 remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain) 1258 1259 # translate to lower case so the `hostname` hack for specifying 1260 # configuration files works. 1261 conf_file = conf_file.lower(); 1262 remote_conf_file = remote_conf_file.lower(); 1263 1264 if dtb == master: 1265 active = "false" 1266 elif gw == master: 1267 active = "true" 1268 elif active_end.has_key('%s-%s' % (dtb, gw)): 1269 active = "false" 1270 else: 1271 active_end['%s-%s' % (gw, dtb)] = 1 1272 active = "true" 1273 1274 gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w") 1275 print >>gwconfig, "Active: %s" % active 1276 print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg 1277 if tunnel_iface: 1278 print >>gwconfig, "Interface: %s" % tunnel_iface 1279 print >>gwconfig, "BossName: %s" % boss 1280 print >>gwconfig, "FsName: %s" % fs 1281 print >>gwconfig, "EventServerName: %s" % event_server 1282 print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server 1283 print >>gwconfig, "SeerControl: %s" % seer_control 1284 print >>gwconfig, "Type: %s" % type 1285 print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir 1286 print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \ 1287 local_script_dir 1288 print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid) 1289 print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid) 1290 print >>gwconfig, "RemoteConfigFile: %s/%s" % \ 1291 (remote_conf_dir, remote_conf_file) 1292 print >>gwconfig, "Peer: %s%s" % (desthost, ddomain) 1293 print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey) 1294 print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey) 1295 gwconfig.close() 1296 1297 return active == "true" 1298 1299 def __call__(self, line, allocated, tbparams): 1300 # Process gateways 1301 if not self.current_gateways: 1302 m = self.begin_gateways.match(line) 1303 if m: 1304 self.current_gateways = m.group(1) 1305 if allocated.has_key(self.current_gateways): 1306 # This test should always succeed 1307 tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways) 1308 if not os.path.exists(tb_dir): 1309 try: 1310 os.mkdir(tb_dir) 1311 except IOError: 1312 raise service_error(service_error.internal, 1313 "Cannot create %s" % tb_dir) 1314 else: 1315 # XXX 1316 self.log.error("[gateways]: Ignoring gateways for " + \ 1317 "unknown testbed %s" % self.current_gateways) 1318 self.current_gateways = None 1319 return True 1320 else: 1321 return False 1322 else: 1323 m = self.end_gateways.match(line) 1324 if m : 1325 if m.group(1) != self.current_gateways: 1326 raise service_error(service_error.internal, 1327 "Mismatched gateway markers!?") 1328 if self.control_gateway: 1329 try: 1330 cc = open("%s/%s/client.conf" % 1331 (self.tmpdir, self.current_gateways), 'w') 1332 print >>cc, "ControlGateway: %s" % \ 1333 self.control_gateway 1334 if tbparams[self.master].has_key('smbshare'): 1335 print >>cc, "SMBSHare: %s" % \ 1336 tbparams[self.master]['smbshare'] 1337 print >>cc, "ProjectUser: %s" % \ 1338 tbparams[self.master]['user'] 1339 print >>cc, "ProjectName: %s" % \ 1340 tbparams[self.master]['project'] 1341 print >>cc, "ExperimentID: %s/%s" % \ 1342 ( tbparams[self.master]['project'], \ 1343 self.eid ) 1344 cc.close() 1345 except IOError: 1346 raise service_error(service_error.internal, 1347 "Error creating client config") 1348 # XXX: This seer specific file should disappear 1349 try: 1350 cc = open("%s/%s/seer.conf" % 1351 (self.tmpdir, self.current_gateways), 1352 'w') 1353 if self.current_gateways != self.master: 1354 print >>cc, "ControlNode: %s" % \ 1355 self.control_gateway 1356 print >>cc, "ExperimentID: %s/%s" % \ 1357 ( tbparams[self.master]['project'], \ 1358 self.eid ) 1359 cc.close() 1360 except IOError: 1361 raise service_error(service_error.internal, 1362 "Error creating seer config") 1363 else: 1364 debug.error("[gateways]: No control gateway for %s" %\ 1365 self.current_gateways) 1366 self.current_gateways = None 1367 else: 1368 dtb, myname, desthost, type = line.split(" ") 1369 1370 if type == "control" or type == "both": 1371 self.control_gateway = "%s.%s.%s%s" % (myname, 1372 self.eid, 1373 tbparams[self.current_gateways]['project'], 1374 tbparams[self.current_gateways]['domain']) 1375 try: 1376 active = self.gateway_conf_file(self.current_gateways, 1377 self.master, self.eid, self.gw_pubkey_base, 1378 self.gw_secretkey_base, 1379 self.active_end, tbparams, dtb, myname, 1380 desthost, type) 1381 except IOError, e: 1382 raise service_error(service_error.internal, 1383 "Failed to write config file for %s" % \ 1384 self.current_gateway) 1385 1386 gw_pubkey = "%s/keys/%s" % \ 1387 (self.tmpdir, self.gw_pubkey_base) 1388 gw_secretkey = "%s/keys/%s" % \ 1389 (self.tmpdir, self.gw_secretkey_base) 1390 1391 pkfile = "%s/%s/%s" % \ 1392 ( self.tmpdir, self.current_gateways, 1393 self.gw_pubkey_base) 1394 skfile = "%s/%s/%s" % \ 1395 ( self.tmpdir, self.current_gateways, 1396 self.gw_secretkey_base) 1397 1398 if not os.path.exists(pkfile): 1399 try: 1400 self.copy_file(gw_pubkey, pkfile) 1401 except IOError: 1402 service_error(service_error.internal, 1403 "Failed to copy pubkey file") 1404 1405 if active and not os.path.exists(skfile): 1406 try: 1407 self.copy_file(gw_secretkey, skfile) 1408 except IOError: 1409 service_error(service_error.internal, 1410 "Failed to copy secretkey file") 1411 return True 1412 1412 1413 1413 class shunt_to_file: 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1414 """ 1415 Simple class to write data between two regexps to a file. 1416 """ 1417 def __init__(self, begin, end, filename): 1418 """ 1419 Begin shunting on a match of begin, stop on end, send data to 1420 filename. 1421 """ 1422 self.begin = re.compile(begin) 1423 self.end = re.compile(end) 1424 self.in_shunt = False 1425 self.file = None 1426 self.filename = filename 1427 1428 def __call__(self, line): 1429 """ 1430 Call this on each line in the input that may be shunted. 1431 """ 1432 if not self.in_shunt: 1433 if self.begin.match(line): 1434 self.in_shunt = True 1435 try: 1436 self.file = open(self.filename, "w") 1437 except: 1438 self.file = None 1439 raise 1440 return True 1441 else: 1442 return False 1443 else: 1444 if self.end.match(line): 1445 if self.file: 1446 self.file.close() 1447 self.file = None 1448 self.in_shunt = False 1449 else: 1450 if self.file: 1451 print >>self.file, line 1452 return True 1453 1453 1454 1454 class shunt_to_list: 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1455 """ 1456 Same interface as shunt_to_file. Data collected in self.list, one list 1457 element per line. 1458 """ 1459 def __init__(self, begin, end): 1460 self.begin = re.compile(begin) 1461 self.end = re.compile(end) 1462 self.in_shunt = False 1463 self.list = [ ] 1464 1465 def __call__(self, line): 1466 if not self.in_shunt: 1467 if self.begin.match(line): 1468 self.in_shunt = True 1469 return True 1470 else: 1471 return False 1472 else: 1473 if self.end.match(line): 1474 self.in_shunt = False 1475 else: 1476 self.list.append(line) 1477 return True 1478 1478 1479 1479 class shunt_to_string: 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1480 """ 1481 Same interface as shunt_to_file. Data collected in self.str, all in 1482 one string. 1483 """ 1484 def __init__(self, begin, end): 1485 self.begin = re.compile(begin) 1486 self.end = re.compile(end) 1487 self.in_shunt = False 1488 self.str = "" 1489 1490 def __call__(self, line): 1491 if not self.in_shunt: 1492 if self.begin.match(line): 1493 self.in_shunt = True 1494 return True 1495 else: 1496 return False 1497 else: 1498 if self.end.match(line): 1499 self.in_shunt = False 1500 else: 1501 self.str += line 1502 return True 1503 1503 1504 1504 def create_experiment(self, req, fid): 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 allocated = { }# Testbeds we can access1633 started = { }# Testbeds where a sub-experiment started1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1505 """ 1506 The external interface to experiment creation called from the 1507 dispatcher. 1508 1509 Creates a working directory, splits the incoming description using the 1510 splitter script and parses out the avrious subsections using the 1511 lcasses above. Once each sub-experiment is created, use pooled threads 1512 to instantiate them and start it all up. 1513 """ 1514 1515 if not self.auth.check_attribute(fid, 'create'): 1516 raise service_error(service_error.access, "Create access denied") 1517 1518 try: 1519 tmpdir = tempfile.mkdtemp(prefix="split-") 1520 except IOError: 1521 raise service_error(service_error.internal, "Cannot create tmp dir") 1522 1523 gw_pubkey_base = "fed.%s.pub" % self.ssh_type 1524 gw_secretkey_base = "fed.%s" % self.ssh_type 1525 gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base 1526 gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base 1527 tclfile = tmpdir + "/experiment.tcl" 1528 tbparams = { } 1529 try: 1530 access_user = self.accessdb[fid] 1531 except KeyError: 1532 raise service_error(service_error.internal, 1533 "Access map and authorizer out of sync in " + \ 1534 "create_experiment for fedid %s" % fid) 1535 1536 pid = "dummy" 1537 gid = "dummy" 1538 # XXX 1539 fail_soft = False 1540 1541 try: 1542 os.mkdir(tmpdir+"/keys") 1543 except OSError: 1544 raise service_error(service_error.internal, 1545 "Can't make temporary dir") 1546 1547 req = req.get('CreateRequestBody', None) 1548 if not req: 1549 raise service_error(service_error.req, 1550 "Bad request format (no CreateRequestBody)") 1551 # The tcl parser needs to read a file so put the content into that file 1552 descr=req.get('experimentdescription', None) 1553 if descr: 1554 file_content=descr.get('ns2description', None) 1555 if file_content: 1556 try: 1557 f = open(tclfile, 'w') 1558 f.write(file_content) 1559 f.close() 1560 except IOError: 1561 raise service_error(service_error.internal, 1562 "Cannot write temp experiment description") 1563 else: 1564 raise service_error(service_error.req, 1565 "Only ns2descriptions supported") 1566 else: 1567 raise service_error(service_error.req, "No experiment description") 1568 1569 if req.has_key('experimentID') and \ 1570 req['experimentID'].has_key('localname'): 1571 eid = req['experimentID']['localname'] 1572 self.state_lock.acquire() 1573 while (self.state.has_key(eid)): 1574 eid += random.choice(string.ascii_letters) 1575 # To avoid another thread picking this localname 1576 self.state[eid] = "placeholder" 1577 self.state_lock.release() 1578 else: 1579 eid = self.exp_stem 1580 for i in range(0,5): 1581 eid += random.choice(string.ascii_letters) 1582 self.state_lock.acquire() 1583 while (self.state.has_key(eid)): 1584 eid = self.exp_stem 1585 for i in range(0,5): 1586 eid += random.choice(string.ascii_letters) 1587 # To avoid another thread picking this localname 1588 self.state[eid] = "placeholder" 1589 self.state_lock.release() 1590 1591 try: 1592 # This catches exceptions to clear the placeholder if necessary 1593 try: 1594 self.generate_ssh_keys(gw_secretkey, self.ssh_type) 1595 except ValueError: 1596 raise service_error(service_error.server_config, 1597 "Bad key type (%s)" % self.ssh_type) 1598 1599 user = req.get('user', None) 1600 if user == None: 1601 raise service_error(service_error.req, "No user") 1602 1603 master = req.get('master', None) 1604 if not master: 1605 raise service_error(service_error.req, 1606 "No master testbed label") 1607 export_project = req.get('exportProject', None) 1608 if not export_project: 1609 raise service_error(service_error.req, "No export project") 1610 1611 if self.splitter_url: 1612 self.log.debug("Calling remote splitter at %s" % \ 1613 self.splitter_url) 1614 split_data = self.remote_splitter(self.splitter_url, 1615 file_content, master) 1616 else: 1617 tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 1618 str(self.muxmax), '-m', master] 1619 1620 if self.fedkit: 1621 tclcmd.append('-k') 1622 1623 if self.gatewaykit: 1624 tclcmd.append('-K') 1625 1626 tclcmd.extend([pid, gid, eid, tclfile]) 1627 1628 self.log.debug("running local splitter %s", " ".join(tclcmd)) 1629 tclparser = Popen(tclcmd, stdout=PIPE) 1630 split_data = tclparser.stdout 1631 1632 allocated = { } # Testbeds we can access 1633 started = { } # Testbeds where a sub-experiment started 1634 # successfully 1635 1636 # Objects to parse the splitter output (defined above) 1637 parse_current_testbed = self.current_testbed(eid, tmpdir, 1638 self.fedkit, self.gatewaykit) 1639 parse_allbeds = self.allbeds(self.get_access) 1640 parse_gateways = self.gateways(eid, master, tmpdir, 1641 gw_pubkey_base, gw_secretkey_base, self.copy_file, 1642 self.fedkit) 1643 parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo", 1644 "^#\s+End\s+Vtopo") 1645 parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames", 1646 "^#\s+End\s+hostnames", tmpdir + "/hosts") 1647 parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles", 1648 "^#\s+End\s+tarfiles") 1649 parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms", 1650 "^#\s+End\s+rpms") 1651 1652 # Working on the split data 1653 for line in split_data: 1654 line = line.rstrip() 1655 if parse_current_testbed(line, master, allocated, tbparams): 1656 continue 1657 elif parse_allbeds(line, user, tbparams, master, export_project, 1658 access_user): 1659 continue 1660 elif parse_gateways(line, allocated, tbparams): 1661 continue 1662 elif parse_vtopo(line): 1663 continue 1664 elif parse_hostnames(line): 1665 continue 1666 elif parse_tarfiles(line): 1667 continue 1668 elif parse_rpms(line): 1669 continue 1670 else: 1671 raise service_error(service_error.internal, 1672 "Bad tcl parse? %s" % line) 1673 # Virtual topology and visualization 1674 vtopo = self.gentopo(parse_vtopo.str) 1675 if not vtopo: 1676 raise service_error(service_error.internal, 1677 "Failed to generate virtual topology") 1678 1679 vis = self.genviz(vtopo) 1680 if not vis: 1681 raise service_error(service_error.internal, 1682 "Failed to generate visualization") 1683 1684 # save federant information 1685 for k in allocated.keys(): 1686 tbparams[k]['federant'] = {\ 1687 'name': [ { 'localname' : eid} ],\ 1688 'emulab': tbparams[k]['emulab'],\ 1689 'allocID' : tbparams[k]['allocID'],\ 1690 'master' : k == master,\ 1691 } 1692 1693 1694 # Copy tarfiles and rpms needed at remote sites into a staging area 1695 try: 1696 if self.fedkit: 1697 for t in self.fedkit: 1698 parse_tarfiles.list.append(t[1]) 1699 if self.gatewaykit: 1700 for t in self.gatewaykit: 1701 parse_tarfiles.list.append(t[1]) 1702 for t in parse_tarfiles.list: 1703 if not os.path.exists("%s/tarfiles" % tmpdir): 1704 os.mkdir("%s/tarfiles" % tmpdir) 1705 self.copy_file(t, "%s/tarfiles/%s" % \ 1706 (tmpdir, os.path.basename(t))) 1707 for r in parse_rpms.list: 1708 if not os.path.exists("%s/rpms" % tmpdir): 1709 os.mkdir("%s/rpms" % tmpdir) 1710 self.copy_file(r, "%s/rpms/%s" % \ 1711 (tmpdir, os.path.basename(r))) 1712 # A null experiment file in case we need to create a remote 1713 # experiment from scratch 1714 f = open("%s/null.tcl" % tmpdir, "w") 1715 print >>f, """ 1716 1716 set ns [new Simulator] 1717 1717 source tb_compat.tcl … … 1722 1722 $ns run 1723 1723 """ 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1724 f.close() 1725 1726 except IOError, e: 1727 raise service_error(service_error.internal, 1728 "Cannot stage tarfile/rpm: %s" % e.strerror) 1729 1730 except service_error, e: 1731 # If something goes wrong in the parse (usually an access error) 1732 # clear the placeholder state. From here on out the code delays 1733 # exceptions. 1734 self.state_lock.acquire() 1735 del self.state[eid] 1736 self.state_lock.release() 1737 raise e 1738 1739 thread_pool = self.thread_pool(self.nthreads) 1740 threads = [ ] 1741 1742 for tb in [ k for k in allocated.keys() if k != master]: 1743 # Create and start a thread to start the segment, and save it to 1744 # get the return value later 1745 thread_pool.wait_for_slot() 1746 t = self.pooled_thread(target=self.start_segment, 1747 args=(tb, eid, tbparams, tmpdir, 0), name=tb, 1748 pdata=thread_pool, trace_file=self.trace_file) 1749 threads.append(t) 1750 t.start() 1751 1752 # Wait until all finish 1753 thread_pool.wait_for_all_done() 1754 1755 # If none failed, start the master 1756 failed = [ t.getName() for t in threads if not t.rv ] 1757 1758 if len(failed) == 0: 1759 if not self.start_segment(master, eid, tbparams, tmpdir): 1760 failed.append(master) 1761 1762 succeeded = [tb for tb in allocated.keys() if tb not in failed] 1763 # If one failed clean up, unless fail_soft is set 1764 if failed: 1765 if not fail_soft: 1766 thread_pool.clear() 1767 for tb in succeeded: 1768 # Create and start a thread to stop the segment 1769 thread_pool.wait_for_slot() 1770 t = self.pooled_thread(target=self.stop_segment, 1771 args=(tb, eid, tbparams), name=tb, 1772 pdata=thread_pool, trace_file=self.trace_file) 1773 t.start() 1774 # Wait until all finish 1775 thread_pool.wait_for_all_done() 1776 1777 # release the allocations 1778 for tb in tbparams.keys(): 1779 self.release_access(tb, tbparams[tb]['allocID']) 1780 # Remove the placeholder 1781 self.state_lock.acquire() 1782 del self.state[eid] 1783 self.state_lock.release() 1784 1785 raise service_error(service_error.federant, 1786 "Swap in failed on %s" % ",".join(failed)) 1787 else: 1788 self.log.info("[start_segment]: Experiment %s started" % eid) 1789 1790 # Generate an ID for the experiment (slice) and a certificate that the 1791 # allocator can use to prove they own it. We'll ship it back through 1792 # the encrypted connection. 1793 (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log) 1794 1795 self.log.debug("[start_experiment]: removing %s" % tmpdir) 1796 1797 # Walk up tmpdir, deleting as we go 1798 for path, dirs, files in os.walk(tmpdir, topdown=False): 1799 for f in files: 1800 os.remove(os.path.join(path, f)) 1801 for d in dirs: 1802 os.rmdir(os.path.join(path, d)) 1803 os.rmdir(tmpdir) 1804 1805 # The deepcopy prevents the allocation ID and other binaries from being 1806 # translated into other formats 1807 resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \ 1808 for tb in tbparams.keys() \ 1809 if tbparams[tb].has_key('federant') ],\ 1810 'vtopo': vtopo,\ 1811 'vis' : vis, 1812 'experimentID' : [\ 1813 { 'fedid': copy.copy(expid) }, \ 1814 { 'localname': eid },\ 1815 ],\ 1816 'experimentAccess': { 'X509' : expcert },\ 1817 } 1818 # remove the allocationID info from each federant 1819 for f in resp['federant']: 1820 if f.has_key('allocID'): del f['allocID'] 1821 1822 # Insert the experiment into our state and update the disk copy 1823 self.state_lock.acquire() 1824 self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \ 1825 for tb in tbparams.keys() \ 1826 if tbparams[tb].has_key('federant') ],\ 1827 'vtopo': vtopo,\ 1828 'vis' : vis, 1829 'owner': fid, 1830 'experimentID' : [\ 1831 { 'fedid': expid }, { 'localname': eid },\ 1832 ],\ 1833 } 1834 self.state[eid] = self.state[expid] 1835 if self.state_filename: self.write_state() 1836 self.state_lock.release() 1837 1838 self.auth.set_attribute(fid, expid) 1839 self.auth.set_attribute(expid, expid) 1840 1841 if not failed: 1842 return resp 1843 else: 1844 raise service_error(service_error.partial, \ 1845 "Partial swap in on %s" % ",".join(succeeded)) 1846 1846 1847 1847 def check_experiment_access(self, fid, key): 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1848 """ 1849 Confirm that the fid has access to the experiment. Though a request 1850 may be made in terms of a local name, the access attribute is always 1851 the experiment's fedid. 1852 """ 1853 if not isinstance(key, fedid): 1854 self.state_lock.acquire() 1855 if self.state.has_key(key): 1856 if isinstance(self.state[key], dict): 1857 try: 1858 kl = [ f['fedid'] for f in \ 1859 self.state[key]['experimentID']\ 1860 if f.has_key('fedid') ] 1861 except KeyError: 1862 self.state_lock.release() 1863 raise service_error(service_error.internal, 1864 "No fedid for experiment %s when checking " +\ 1865 "access(!?)" % key) 1866 if len(kl) == 1: 1867 key = kl[0] 1868 else: 1869 self.state_lock.release() 1870 raise service_error(service_error.internal, 1871 "multiple fedids for experiment %s when " +\ 1872 "checking access(!?)" % key) 1873 elif isinstance(self.state[key], str): 1874 self.state_lock.release() 1875 raise service_error(service_error.internal, 1876 ("experiment %s is placeholder. " +\ 1877 "Creation in progress or aborted oddly") \ 1878 % key) 1879 else: 1880 self.state_lock.release() 1881 raise service_error(service_error.internal, 1882 "Unexpected state for %s" % key) 1883 1884 else: 1885 self.state_lock.release() 1886 raise service_error(service_error.access, "Access Denied") 1887 self.state_lock.release() 1888 1889 if self.auth.check_attribute(fid, key): 1890 return True 1891 else: 1892 raise service_error(service_error.access, "Access Denied") 1893 1893 1894 1894 1895 1895 1896 1896 def get_vtopo(self, req, fid): 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1897 """ 1898 Return the stored virtual topology for this experiment 1899 """ 1900 rv = None 1901 1902 req = req.get('VtopoRequestBody', None) 1903 if not req: 1904 raise service_error(service_error.req, 1905 "Bad request format (no VtopoRequestBody)") 1906 exp = req.get('experiment', None) 1907 if exp: 1908 if exp.has_key('fedid'): 1909 key = exp['fedid'] 1910 keytype = "fedid" 1911 elif exp.has_key('localname'): 1912 key = exp['localname'] 1913 keytype = "localname" 1914 else: 1915 raise service_error(service_error.req, "Unknown lookup type") 1916 else: 1917 raise service_error(service_error.req, "No request?") 1918 1919 self.check_experiment_access(fid, key) 1920 1921 self.state_lock.acquire() 1922 if self.state.has_key(key): 1923 rv = { 'experiment' : {keytype: key },\ 1924 'vtopo': self.state[key]['vtopo'],\ 1925 } 1926 self.state_lock.release() 1927 1928 if rv: return rv 1929 else: raise service_error(service_error.req, "No such experiment") 1930 1930 1931 1931 def get_vis(self, req, fid): 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1932 """ 1933 Return the stored visualization for this experiment 1934 """ 1935 rv = None 1936 1937 req = req.get('VisRequestBody', None) 1938 if not req: 1939 raise service_error(service_error.req, 1940 "Bad request format (no VisRequestBody)") 1941 exp = req.get('experiment', None) 1942 if exp: 1943 if exp.has_key('fedid'): 1944 key = exp['fedid'] 1945 keytype = "fedid" 1946 elif exp.has_key('localname'): 1947 key = exp['localname'] 1948 keytype = "localname" 1949 else: 1950 raise service_error(service_error.req, "Unknown lookup type") 1951 else: 1952 raise service_error(service_error.req, "No request?") 1953 1954 self.check_experiment_access(fid, key) 1955 1956 self.state_lock.acquire() 1957 if self.state.has_key(key): 1958 rv = { 'experiment' : {keytype: key },\ 1959 'vis': self.state[key]['vis'],\ 1960 } 1961 self.state_lock.release() 1962 1963 if rv: return rv 1964 else: raise service_error(service_error.req, "No such experiment") 1965 1965 1966 1966 def get_info(self, req, fid): 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 1967 """ 1968 Return all the stored info about this experiment 1969 """ 1970 rv = None 1971 1972 req = req.get('InfoRequestBody', None) 1973 if not req: 1974 raise service_error(service_error.req, 1975 "Bad request format (no VisRequestBody)") 1976 exp = req.get('experiment', None) 1977 if exp: 1978 if exp.has_key('fedid'): 1979 key = exp['fedid'] 1980 keytype = "fedid" 1981 elif exp.has_key('localname'): 1982 key = exp['localname'] 1983 keytype = "localname" 1984 else: 1985 raise service_error(service_error.req, "Unknown lookup type") 1986 else: 1987 raise service_error(service_error.req, "No request?") 1988 1989 self.check_experiment_access(fid, key) 1990 1991 # The state may be massaged by the service function that called 1992 # get_info (e.g., encoded for XMLRPC transport) so send a copy of the 1993 # state. 1994 self.state_lock.acquire() 1995 if self.state.has_key(key): 1996 rv = copy.deepcopy(self.state[key]) 1997 self.state_lock.release() 1998 # Remove the owner info 1999 del rv['owner'] 2000 # remove the allocationID info from each federant 2001 for f in rv['federant']: 2002 if f.has_key('allocID'): del f['allocID'] 2003 2004 if rv: return rv 2005 else: raise service_error(service_error.req, "No such experiment") 2006 2006 2007 2007 2008 2008 def terminate_experiment(self, req, fid): 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2009 """ 2010 Swap this experiment out on the federants and delete the shared 2011 information 2012 """ 2013 tbparams = { } 2014 req = req.get('TerminateRequestBody', None) 2015 if not req: 2016 raise service_error(service_error.req, 2017 "Bad request format (no TerminateRequestBody)") 2018 exp = req.get('experiment', None) 2019 if exp: 2020 if exp.has_key('fedid'): 2021 key = exp['fedid'] 2022 keytype = "fedid" 2023 elif exp.has_key('localname'): 2024 key = exp['localname'] 2025 keytype = "localname" 2026 else: 2027 raise service_error(service_error.req, "Unknown lookup type") 2028 else: 2029 raise service_error(service_error.req, "No request?") 2030 2031 self.check_experiment_access(fid, key) 2032 2033 self.state_lock.acquire() 2034 fed_exp = self.state.get(key, None) 2035 2036 if fed_exp: 2037 # This branch of the conditional holds the lock to generate a 2038 # consistent temporary tbparams variable to deallocate experiments. 2039 # It releases the lock to do the deallocations and reacquires it to 2040 # remove the experiment state when the termination is complete. 2041 ids = [] 2042 # experimentID is a list of dicts that are self-describing 2043 # identifiers. This finds all the fedids and localnames - the 2044 # keys of self.state - and puts them into ids. 2045 for id in fed_exp.get('experimentID', []): 2046 if id.has_key('fedid'): ids.append(id['fedid']) 2047 if id.has_key('localname'): ids.append(id['localname']) 2048 2049 # Construct enough of the tbparams to make the stop_segment calls 2050 # work 2051 for fed in fed_exp['federant']: 2052 try: 2053 for e in fed['name']: 2054 eid = e.get('localname', None) 2055 if eid: break 2056 else: 2057 continue 2058 2059 p = fed['emulab']['project'] 2060 2061 project = p['name']['localname'] 2062 tb = p['testbed']['localname'] 2063 user = p['user'][0]['userID']['localname'] 2064 2065 domain = fed['emulab']['domain'] 2066 host = fed['emulab']['ops'] 2067 aid = fed['allocID'] 2068 except KeyError, e: 2069 continue 2070 tbparams[tb] = {\ 2071 'user': user,\ 2072 'domain': domain,\ 2073 'project': project,\ 2074 'host': host,\ 2075 'eid': eid,\ 2076 'aid': aid,\ 2077 } 2078 self.state_lock.release() 2079 2080 # Stop everyone. 2081 thread_pool = self.thread_pool(self.nthreads) 2082 for tb in tbparams.keys(): 2083 # Create and start a thread to stop the segment 2084 thread_pool.wait_for_slot() 2085 t = self.pooled_thread(target=self.stop_segment, 2086 args=(tb, tbparams[tb]['eid'], tbparams), name=tb, 2087 pdata=thread_pool, trace_file=self.trace_file) 2088 t.start() 2089 # Wait for completions 2090 thread_pool.wait_for_all_done() 2091 2092 # release the allocations 2093 for tb in tbparams.keys(): 2094 self.release_access(tb, tbparams[tb]['aid']) 2095 2096 # Remove the terminated experiment 2097 self.state_lock.acquire() 2098 for id in ids: 2099 if self.state.has_key(id): del self.state[id] 2100 2101 if self.state_filename: self.write_state() 2102 self.state_lock.release() 2103 2104 return { 'experiment': exp } 2105 else: 2106 # Don't forget to release the lock 2107 self.state_lock.release() 2108 raise service_error(service_error.req, "No saved state")
Note: See TracChangeset
for help on using the changeset viewer.