Changeset c3dcf48 for fedd/federation
- Timestamp:
- Dec 4, 2008 9:49:39 PM (16 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-1.30, version-2.00, version-3.01, version-3.02
- Children:
- 416292f
- Parents:
- 2ac63f7d
- Location:
- fedd/federation
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/access.py
r2ac63f7d rc3dcf48 80 80 self.projects = { } 81 81 self.keys = { } 82 self.types = { } 82 83 self.allocation = { } 83 84 self.state = { 84 85 'projects': self.projects, 85 86 'allocation' : self.allocation, 86 'keys' : self.keys 87 'keys' : self.keys, 88 'types': self.types 87 89 } 88 90 self.log = logging.getLogger("fedd.access") … … 331 333 self.projects = self.state['projects'] 332 334 self.keys = self.state['keys'] 335 self.types = self.state['types'] 333 336 334 337 self.log.debug("[read_state]: Read state from %s" % \ … … 653 656 self.state_lock.acquire() 654 657 self.allocation[aid] = { } 658 try: 659 pname = ap['project']['name']['localname'] 660 except KeyError: 661 pname = None 662 655 663 if dyn[1]: 656 try: 657 pname = ap['project']['name']['localname'] 658 except KeyError: 664 if not pname: 659 665 self.state_lock.release() 660 666 raise service_error(service_error.internal, … … 663 669 else: self.projects[pname] = 1 664 670 self.allocation[aid]['project'] = pname 671 672 if ap.has_key('resources'): 673 if not pname: 674 self.state_lock.release() 675 raise service_error(service_error.internal, 676 "Misformed allocation response?") 677 self.allocation[aid]['types'] = set() 678 nodes = ap['resources'].get('node', []) 679 for n in nodes: 680 for h in n.get('hardware', []): 681 if self.types.has_key((pname, h)): 682 self.types[(pname, h)] += 1 683 else: 684 self.types[(pname, h)] = 1 685 self.allocation[aid]['types'].add((pname,h)) 686 665 687 666 688 self.allocation[aid]['keys'] = [ ] … … 680 702 "Misformed allocation response?") 681 703 704 682 705 self.allocation[aid]['owners'] = owners 683 706 self.write_state() … … 741 764 del_users = { } 742 765 del_project = None 766 del_types = set() 767 743 768 if self.allocation.has_key(aid): 744 769 self.log.debug("Found allocation for %s" %aid) … … 759 784 del_project = pname 760 785 del self.projects[pname] 786 787 if self.allocation[aid].has_key('types'): 788 for t in self.allocation[aid]['types']: 789 self.types[t] -= 1 790 if self.types[t] == 0: 791 if not del_project: del_project = t[0] 792 del_types.add(t[1]) 793 del self.types[t] 761 794 762 795 del self.allocation[aid] … … 776 809 if users: 777 810 msg['project']['user'] = users 811 if len(del_types) > 0: 812 msg['resources'] = { 'node': \ 813 [ {'hardware': [ h ] } for h in del_types ]\ 814 } 778 815 if self.allocate_project.release_project: 779 816 msg = { 'ReleaseProjectRequestBody' : msg} -
fedd/federation/allocate_project.py
r2ac63f7d rc3dcf48 6 6 import string 7 7 import subprocess 8 import threading 9 import pickle 8 10 import tempfile 9 11 … … 87 89 self.allocation_level = self.none 88 90 91 self.state = { 92 'keys': set(), 93 'types': set(), 94 'projects': set(), 95 'users': set(), 96 } 97 self.state_filename = config.get('allocate', 'allocation_state') 98 self.state_lock = threading.Lock() 99 self.read_state() 100 89 101 access_db = config.get("allocate", "accessdb") 90 102 if access_db: … … 127 139 self.xmlrpc_services = { } 128 140 141 def read_state(self): 142 """ 143 Read a new copy of access state. Old state is overwritten. 144 145 State format is a simple pickling of the state dictionary. 146 """ 147 if self.state_filename: 148 try: 149 f = open(self.state_filename, "r") 150 self.state = pickle.load(f) 151 self.log.debug("[allocation]: Read state from %s" % \ 152 self.state_filename) 153 except IOError, e: 154 self.log.warning(("[allocation]: No saved state: " +\ 155 "Can't open %s: %s") % (self.state_filename, e)) 156 except EOFError, e: 157 self.log.warning(("[allocation]: " +\ 158 "Empty or damaged state file: %s:") % \ 159 self.state_filename) 160 except pickle.UnpicklingError, e: 161 self.log.warning(("[allocation]: No saved state: " + \ 162 "Unpickling failed: %s") % e) 163 # These should all be in the picked representation, but make sure 164 if not self.state.has_key('keys'): self.state['keys'] = set() 165 if not self.state.has_key('types'): self.state['types'] = set() 166 if not self.state.has_key('projects'): 167 self.state['projects'] = set() 168 if not self.state.has_key('users'): self.state['users'] = set() 169 170 def write_state(self): 171 if self.state_filename: 172 try: 173 f = open(self.state_filename, 'w') 174 pickle.dump(self.state, f) 175 except IOError, e: 176 self.log.error("Can't write file %s: %s" % \ 177 (self.state_filename, e)) 178 except pickle.PicklingError, e: 179 self.log.error("Pickling problem: %s" % e) 180 except TypeError, e: 181 self.log.error("Pickling problem (TypeError): %s" % e) 182 183 129 184 def random_string(self, s, n=3): 130 185 """Append n random ASCII characters to s and return the string""" … … 149 204 f.write("</%s>\n" % root) 150 205 f.close() 206 207 def run_cmd(self, cmd, log_prefix='allocate'): 208 """ 209 Run the command passed in. Cmd is a list containing the words of the 210 command. Return the exit value from the subprocess - that is 0 on 211 success. On an error running the command - python or OS error, raise 212 a service exception. 213 """ 214 self.log.debug("[%s]: %s" % (log_prefix, ' '.join(cmd))) 215 if not self.debug: 216 try: 217 return subprocess.call(cmd) 218 except OSError, e: 219 raise service_error(service_error.internal, 220 "Static project subprocess creation error "+ \ 221 "[%s] (%s)" % (cmd[0], e.strerror)) 222 else: 223 return 0 224 225 def confirm_key(self, user, key): 226 """ 227 Call run_cmd to comfirm the key. Return a boolean rather 228 than the subprocess code. 229 """ 230 return self.run_cmd((self.wap, self.confirmkey, '-C', 231 '-u', user, '-k', key)) ==0 232 233 def add_key(self, user, key): 234 """ 235 Call run_cmd to add the key. Return a boolean rather 236 than the subprocess code. 237 """ 238 return self.run_cmd((self.wap, self.addpubkey, '-u', user, 239 '-k', key)) == 0 240 241 def remove_key(self, user, key): 242 """ 243 Call run_cmd to remove the key. Return a boolean rather 244 than the subprocess code. 245 """ 246 return self.run_cmd((self.wap, self.addpubkey, '-R', '-u', user, 247 '-k', key)) == 0 248 249 def confirm_access(self, project, type): 250 """ 251 Call run_cmd to comfirm the key. Return a boolean rather 252 than the subprocess code. 253 """ 254 return self.run_cmd((self.wap, self.grantnodetype, '-C', 255 '-p', project, type)) ==0 256 257 def add_access(self, project, type): 258 """ 259 Call run_cmd to add the key. Return a boolean rather 260 than the subprocess code. 261 """ 262 return self.run_cmd((self.wap, self.grantnodetype, 263 '-p', project, type)) == 0 264 265 def remove_access(self, project, type): 266 """ 267 Call run_cmd to remove the key. Return a boolean rather 268 than the subprocess code. 269 """ 270 271 return self.run_cmd((self.wap, self.grantnodetype, '-R', 272 '-p', project, type)) == 0 273 274 def add_project(self, project, projfile): 275 """ 276 Create a project using run_cmd. This is two steps, and assumes that 277 the relevant XML files are in place and correct. Make the return value 278 boolean. Note that if a new user is specified in the XML, that user is 279 created on success. 280 """ 281 282 if self.run_cmd((self.wap, self.newproj, projfile)) == 0: 283 return self.run_cmd((self.wap, self.mkproj, project)) ==0 284 else: 285 return False 286 287 def remove_project(self, project): 288 """ 289 Call run_cmd to remove the project. Make the return value boolean. 290 """ 291 292 return self.run_cmd(self.wap, self.rmproj, project) == 0 293 294 295 def add_user(self, name, param_file, project): 296 """ 297 Create a user and link them to the given project. Similar to 298 add_project, this requires a two step approach. Returns True on success 299 False on failure. 300 """ 301 302 if self.run_cmd((self.wap, self.newuser, param_file)) == 0: 303 return self.run_cmd((self.wap, self.user_to_project, 304 user, project)) == 0 305 else: 306 return False 307 308 def remove_user(self, user): 309 """ 310 Call run_cmd to remove the user. Make the return value boolean. 311 """ 312 313 return self.run_cmd(self.wap, self.rmuser, user) == 0 314 151 315 152 316 … … 269 433 270 434 271 # Write out the files 272 self.write_attr_xml(cuf, "user", create_user_fields) 273 self.write_attr_xml(suf, "user", service_user_fields) 274 self.write_attr_xml(pf, "project", proj_fields) 275 276 # Generate the commands (only grantnodetype's are dynamic) 277 cmds = [ 278 (self.wap, self.newproj, projfile), 279 (self.wap, self.mkproj, name), 280 (self.wap, self.newuser, service_userfile), 281 (self.wap, self.user_to_project, uname['serviceAccess'], name), 282 ] 283 284 # Add commands to grant access to any resources in the request. The 285 # list comprehension pulls out the hardware types in the node entries 286 # in the resources list. 287 if resources.has_key('node'): 288 for nt in [ h for n in resources['node']\ 289 if n.has_key('hardware') for h in n['hardware'] ] : 290 if self.allocation_level >= self.confirm_keys: 291 cmds.append((self.wap, self.grantnodetype, '-p', pname, nt)) 292 293 294 # Create the projects 295 rc = 0 296 for cmd in cmds: 297 self.log.debug("[dynamic_project]: %s" % ' '.join(cmd)) 298 if not self.debug: 299 try: 300 rc = subprocess.call(cmd) 301 except OSerror, e: 302 raise service_error(service_error.internal, 303 "Dynamic project subprocess creation error "+ \ 304 "[%s] (%s)" % (cmd[1], e.strerror)) 305 306 if rc != 0: 307 raise service_error(service_error.internal, 308 "Dynamic project subprocess error " +\ 309 "[%s] (%d)" % (cmd[1], rc)) 310 # Clean up tempfiles 311 #os.unlink(create_userfile) 312 #os.unlink(service_userfile) 313 #os.unlink(projfile) 435 436 added_projects = [ ] 437 added_users = [ ] 438 added_types = [ ] 439 440 self.state_lock.acquire() 441 try: 442 # Write out the files 443 self.write_attr_xml(cuf, "user", create_user_fields) 444 self.write_attr_xml(suf, "user", service_user_fields) 445 self.write_attr_xml(pf, "project", proj_fields) 446 try: 447 if self.add_project(name, projfile): 448 # add_project adds a user as well in this case 449 added_projects.append(name) 450 added_users.append(uname['createExperiment']) 451 self.state['projects'].add(name) 452 self.state['users'].add(uname['createExperiment']) 453 454 if self.add_user(uname['serviceAccess'], 455 service_userfile, name): 456 added_users.append(uname['serviceAccess']) 457 self.state['users'].add(uname['serviceAccess']) 458 else: 459 raise service_error("Unable to create user %s" % \ 460 uname['serviceAccess']) 461 else: 462 raise service_error("Unable to create project/user %s/%s" % \ 463 (name, uname['experimentCreation'])) 464 465 nodes = resources.get('node', []) 466 # Grant access to restricted resources. This is simpler than 467 # the corresponding loop from static_project because this is a 468 # clean slate. 469 for nt in [ h for n in nodes\ 470 if n.has_key('hardware')\ 471 for h in n['hardware'] ] : 472 if self.add_access(name, nt): 473 self.state['types'].add((name, nt)) 474 added_types.append((name, nt)) 475 else: 476 raise service_error(service_error.internal, 477 "Failed to add access for %s to %s"\ 478 % (name, nt)) 479 except service_error, e: 480 # Something failed. Back out the partial allocation as 481 # completely as possible and re-raise the error. 482 for p, t in added_types: 483 self.state['types'].discard((p, t)) 484 try: 485 self.remove_access(p, t) 486 except service_error: 487 pass 488 for u in added_users: 489 self.state['users'].discard(u) 490 try: 491 self.remove_user(u) 492 except service_error: 493 pass 494 495 for p in added_projects: 496 self.state['projects'].discard(p) 497 try: 498 self.remove_project(p) 499 except service_error: 500 pass 501 self.state_lock.release() 502 raise e 503 finally: 504 # Clean up tempfiles 505 os.unlink(create_userfile) 506 os.unlink(service_userfile) 507 os.unlink(projfile) 508 314 509 rv = {\ 315 510 'project': {\ … … 336 531 proper resources and users have correct keys. Add them if necessary. 337 532 """ 338 339 cmds = []340 341 533 # Internal calls do not have a fedid parameter (i.e., local calls on 342 534 # behalf of already vetted fedids) … … 355 547 raise service_error(service_error.req, "Badly formed request") 356 548 357 358 for u in users: 359 try: 360 name = u['userID']['localname'] 361 except KeyError: 362 raise service_error(service_error.req, "Badly formed user") 363 for sk in [ k['sshPubkey'] for k in u.get('access', []) \ 364 if k.has_key('sshPubkey')]: 365 if self.allocation_level >= self.dynamic_keys: 366 cmds.append((self.wap, self.addpubkey, '-r', \ 367 '-u', name, '-k', sk)) 368 elif self.allocation_level >= self.confirm_keys: 369 cmds.append((self.wap, self.confirmkey, '-C', \ 370 '-u', name, '-k', sk)) 549 added_keys = [ ] 550 added_types = [ ] 551 # Keep track of changes made to the system 552 self.state_lock.acquire() 553 554 try: 555 for u in users: 556 try: 557 name = u['userID']['localname'] 558 except KeyError: 559 raise service_error(service_error.req, "Badly formed user") 560 for sk in [ k['sshPubkey'] for k in u.get('access', []) \ 561 if k.has_key('sshPubkey')]: 562 if self.allocation_level >=self.confirm_keys: 563 key_ok = self.confirm_key(name, sk) 564 if not key_ok: 565 if self.allocation_level >= self.dynamic_keys: 566 if self.add_key(name, sk): 567 self.state['keys'].add((name, sk)) 568 added_keys.append((name, sk)) 569 else: 570 raise service_error(service_error.internal, 571 "Failed to add key for %s" % name) 572 else: 573 raise service_error(service_error.internal, 574 "Failed to confirm key for %s" % name) 575 else: 576 self.log.warning("[static_project] no checking of " + \ 577 "static keys") 578 579 # Grant access to any resources in the request. The 580 # list comprehension pulls out the hardware types in the node 581 # entries in the resources list. The access module knows to 582 # only send resources that are restricted and needed by the 583 # project. 584 nodes = resources.get('node', []) 585 for nt in [ h for n in nodes\ 586 if n.has_key('hardware')\ 587 for h in n['hardware'] ] : 588 if self.allocation_level >= self.confirm_keys: 589 access_ok = self.confirm_access(pname, nt) 590 if not access_ok: 591 if self.allocation_level >= self.dynamic_keys: 592 if self.add_access(pname, nt): 593 self.state['types'].add((pname, nt)) 594 added_types.append((pname, nt)) 595 else: 596 raise service_error(service_error.internal, 597 "Failed to add access for %s to %s"\ 598 % (pname, nt)) 599 else: 600 raise service_error(service_error.internal, 601 "Failed to confirm access for %s to %s"\ 602 % (pname, nt)) 371 603 else: 372 604 self.log.warning("[static_project] no checking of " + \ 373 "static keys") 374 375 376 # Add commands to grant access to any resources in the request. The 377 # list comprehension pulls out the hardware types in the node entries 378 # in the resources list. 379 if resources.has_key('node'): 380 for nt in [ h for n in resources['node']\ 381 if n.has_key('hardware') for h in n['hardware'] ] : 382 if self.allocation_level >= self.confirm_keys: 383 cmds.append((self.wap, self.grantnodetype, '-p', pname, nt)) 384 385 # Run the commands 386 rc = 0 387 for cmd in cmds: 388 self.log.debug("[static_project]: %s" % ' '.join(cmd)) 389 if not self.debug: 605 "node access") 606 except service_error, e: 607 # Do our best to clean up partial allocation and reraise the 608 # error. Do our best to make sure that both allocation state and 609 # testbed state is restored. 610 for u, k in added_keys: 611 self.state['keys'].discard((u, k)) 390 612 try: 391 rc = subprocess.call(cmd) 392 except OSError, e: 393 raise service_error(service_error.internal, 394 "Static project subprocess creation error "+ \ 395 "[%s] (%s)" % (cmd[0], e.strerror)) 396 397 if rc != 0: 398 raise service_error(service_error.internal, 399 "Static project subprocess error " +\ 400 "[%s] (%d)" % (cmd[0], rc)) 401 402 return { 'project': req['StaticProjectRequestBody']['project']} 613 self.remove_key(u, k) 614 except service_error: 615 pass 616 for p, t in added_types: 617 self.state['types'].discard((p, t)) 618 try: 619 self.remove_access(p, t) 620 except service_error: 621 pass 622 self.state_lock.release() 623 raise e 624 # All is well, save state and release the lock 625 self.write_state() 626 self.state_lock.release() 627 # return { 'project': req['StaticProjectRequestBody']['project']} 628 return req['StaticProjectRequestBody'] 403 629 404 630 def release_project(self, req, fedid=None): … … 415 641 raise service_error(service_error.access, "Access Denied") 416 642 417 cmds = []418 643 pname = None 419 644 users = [] 645 nodes = [ ] 646 647 print req 420 648 421 649 try: … … 425 653 if req['ReleaseProjectRequestBody']['project'].has_key('user'): 426 654 users = req['ReleaseProjectRequestBody']['project']['user'] 655 if req['ReleaseProjectRequestBody'].has_key('resources'): 656 nodes = req['ReleaseProjectRequestBody']\ 657 ['resources'].get('node', []) 427 658 except KeyError: 428 659 raise service_error(service_error.req, "Badly formed request") 429 660 430 if pname and pname not in self.fixed_projects and \ 431 self.allocation_level >= self.dynamic_projects: 432 cmds.append((self.wap, self.rmproj, pname)) 433 434 for u in users: 435 try: 436 name = u['userID']['localname'] 437 except KeyError: 438 raise service_error(service_error.req, "Badly formed user") 439 if self.allocation_level >= self.dynamic_projects and \ 440 name not in self.fixed_users: 441 cmds.append((self.wap, self.rmuser, name)) 442 else: 443 for sk in [ k['sshPubkey'] for k in u.get('access', []) \ 444 if k.has_key('sshPubkey')]: 445 if (name.rstrip(), sk.rstrip()) not in self.fixed_keys: 446 if self.allocation_level >= self.dynamic_keys: 447 cmds.append((self.wap, self.addpubkey, '-R', '-r', \ 448 '-u', name, '-k', sk)) 449 450 # Run the commands 451 rc = 0 452 for cmd in cmds: 453 self.log.debug("[release_project]: %s" % ' '.join(cmd)) 454 if not self.debug: 661 if nodes and not pname: 662 raise service_error(service_error.req, 663 "Badly formed request (nodes without project)") 664 665 self.state_lock.acquire() 666 try: 667 for nt in [ h for n in nodes if n.has_key('hardware')\ 668 for h in n['hardware'] ] : 669 if (pname, nt ) in self.state['types']: 670 self.remove_access(pname, nt) 671 self.state['types'].discard((pname, nt)) 672 673 for u in users: 455 674 try: 456 rc = subprocess.call(cmd) 457 except OSError, e: 458 raise service_error(service_error.internal, 459 "Release project subprocess creation error "+ \ 460 "[%s] (%s)" % (cmd[0], e.strerror)) 461 462 if rc != 0: 463 raise service_error(service_error.internal, 464 "Release project subprocess error " +\ 465 "[%s] (%d)" % (cmd[0], rc)) 675 name = u['userID']['localname'] 676 except KeyError: 677 raise service_error(service_error.req, "Badly formed user") 678 if name in self.state['users']: 679 # If we created this user, discard the user, keys and all 680 self.remove_user(name) 681 self.state['users'].discard(name) 682 else: 683 # If not, just strip any keys we added 684 for sk in [ k['sshPubkey'] for k in u.get('access', []) \ 685 if k.has_key('sshPubkey')]: 686 if (name, sk) in self.state['keys']: 687 self.remove_key(name, sk) 688 self.state['keys'].discard((name, sk)) 689 if pname in self.state['projects']: 690 self.remove_project(pname) 691 self.state['projects'].discard(pname) 692 693 except service_error, e: 694 self.write_state() 695 self.state_lock.release() 696 raise e 697 self.write_state() 698 self.state_lock.release() 466 699 467 700 return { 'project': req['ReleaseProjectRequestBody']['project']}
Note: See TracChangeset
for help on using the changeset viewer.