- Timestamp:
- May 30, 2010 9:17:35 AM (14 years ago)
- Branches:
- axis_example, compt_changes, info-ops, master, version-3.01, version-3.02
- Children:
- 37ed9a5
- Parents:
- 703859f
- Location:
- fedd/federation
- Files:
-
- 1 deleted
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
fedd/federation/protogeni_access.py
r703859f r42cd8a7 10 10 import logging 11 11 import subprocess 12 import random 12 13 import traceback 13 14 14 from threading import *15 from threading import Thread, Timer, Lock 15 16 from M2Crypto.SSL import SSLError 16 17 … … 27 28 28 29 from access import access_base 30 from proxy_segment import proxy_segment 29 31 30 32 import topdl 31 33 import list_log 32 import proxy_protogeni_segment33 34 34 35 … … 39 40 fl = logging.getLogger("fedd.access") 40 41 fl.addHandler(nullHandler()) 42 43 class protogeni_proxy(proxy_segment): 44 class ProtoGENIError(Exception): 45 def __init__(self, op, code, output): 46 Exception.__init__(self, output) 47 self.op = op 48 self.code = code 49 self.output = output 50 51 def __init__(self, log=None, keyfile=None, debug=False, 52 ch_url=None, sa_url=None, cm_url=None): 53 proxy_segment.__init__(self, log=log, keyfile=keyfile, debug=debug) 54 55 self.ProtoGENIError = protogeni_proxy.ProtoGENIError 56 self.ch_url = ch_url 57 self.sa_url = sa_url 58 self.cm_url = cm_url 59 60 self.call_SetValue = service_caller('SetValue') 61 62 self.debug_fail = ['Resolve'] 63 self.debug_response = { 64 'RedeemTicket': ("XML blob1", "XML blob2"), 65 'SliceStatus': { 'status': 'ready' }, 66 } 67 68 69 def pg_call(self, url, method, params, context): 70 max_retries = 5 71 retries = 0 72 73 s = service_caller(method, request_body_name="", strict=False) 74 self.log.debug("Calling %s %s" % (url, method)) 75 if not self.debug: 76 while retries < max_retries: 77 r = s.call_xmlrpc_service(url, params, context=context) 78 code = r.get('code', -1) 79 if code == 0: 80 # Success leaves the loop here 81 return r.get('value', None) 82 elif code == 14 and retries +1 < max_retries: 83 # Busy resource 84 retries+= 1 85 self.log.info("Resource busy, retrying in 30 secs") 86 time.sleep(30) 87 else: 88 # NB: out of retries falls through to here 89 raise self.ProtoGENIError(op=method, 90 code=r.get('code', 'unknown'), 91 output=r.get('output', 'No output')) 92 else: 93 if method in self.debug_fail: 94 raise self.ProtoGENIError(op=method, code='unknown', 95 output='No output') 96 elif self.debug_response.has_key(method): 97 return self.debug_response[method] 98 else: 99 return "%s XML blob" % method 100 101 41 102 42 103 class access(access_base): … … 123 184 self.read_access(config.get("access", "accessdb"), 124 185 access_obj=self.make_access_info) 125 126 self.start_segment = proxy_protogeni_segment.start_segment127 self.stop_segment = proxy_protogeni_segment.stop_segment128 self.renew_segment = proxy_protogeni_segment.renew_segment129 186 130 187 self.lookup_access = self.lookup_access_base … … 308 365 self.state_lock.release() 309 366 raise service_error(service_error.req, "No such allocation") 367 368 # Turn the manifest into a dict were each virtual nodename (i.e. the topdl 369 # name) has an entry with the allocated machine in hostname and the 370 # interfaces in 'interfaces'. I love having XML parser code lying around. 371 def manifest_to_dict(self, manifest, ignore_debug=False): 372 if self.create_debug and not ignore_debug: 373 self.log.debug("Returning null manifest dict") 374 return { } 375 376 # The class allows us to keep a little state - the dict under 377 # consteruction and the current entry in that dict for the interface 378 # element code. 379 class manifest_parser: 380 def __init__(self): 381 self.d = { } 382 self.current_key=None 383 384 # If the element is a node, create a dict entry for it. If it's an 385 # interface inside a node, add an entry in the interfaces list with 386 # the virtual name and component id. 387 def start_element(self, name, attrs): 388 if name == 'node': 389 self.current_key = attrs.get('virtual_id',"") 390 if self.current_key: 391 self.d[self.current_key] = { 392 'hostname': attrs.get('hostname', None), 393 'interfaces': { } 394 } 395 elif name == 'interface' and self.current_key: 396 self.d[self.current_key]['interfaces']\ 397 [attrs.get('virtual_id','')] = \ 398 attrs.get('component_id', None) 399 # When a node is finished, clear current_key 400 def end_element(self, name): 401 if name == 'node': self.current_key = None 402 403 node = { } 404 405 mp = manifest_parser() 406 p = xml.parsers.expat.ParserCreate() 407 # These are bound to the class we just created 408 p.StartElementHandler = mp.start_element 409 p.EndElementHandler = mp.end_element 410 411 p.Parse(manifest) 412 # Make the node dict that the callers expect 413 for k in mp.d: 414 node[k] = mp.d.get('hostname', '') 415 return mp.d 416 417 def fake_manifest(self, topo): 418 node = { } 419 for i, e in enumerate([ e for e in topo.elements \ 420 if isinstance(e, topdl.Computer)]): 421 node[e.name] = { 422 'hostname': "node%03d" % i, 423 'interfaces': { } 424 } 425 for j, inf in enumerate(e.interface): 426 node[e.name]['interfaces'][inf.name] = 'eth%d' % j 427 428 return node 429 430 431 def generate_portal_configs(self, topo, pubkey_base, 432 secretkey_base, tmpdir, leid, connInfo, services, nodes): 433 434 def conninfo_to_dict(key, info): 435 """ 436 Make a cpoy of the connection information about key, and flatten it 437 into a single dict by parsing out any feddAttrs. 438 """ 439 440 rv = None 441 for i in info: 442 if key == i.get('portal', "") or \ 443 key in [e.get('element', "") \ 444 for e in i.get('member', [])]: 445 rv = i.copy() 446 break 447 448 else: 449 return rv 450 451 if 'fedAttr' in rv: 452 for a in rv['fedAttr']: 453 attr = a.get('attribute', "") 454 val = a.get('value', "") 455 if attr and attr not in rv: 456 rv[attr] = val 457 del rv['fedAttr'] 458 return rv 459 460 # XXX: un hardcode this 461 def client_null(f, s): 462 print >>f, "Service: %s" % s['name'] 463 464 def client_seer_master(f, s): 465 print >>f, 'PortalAlias: seer-master' 466 467 def client_smb(f, s): 468 print >>f, "Service: %s" % s['name'] 469 smbshare = None 470 smbuser = None 471 smbproj = None 472 for a in s.get('fedAttr', []): 473 if a.get('attribute', '') == 'SMBSHARE': 474 smbshare = a.get('value', None) 475 elif a.get('attribute', '') == 'SMBUSER': 476 smbuser = a.get('value', None) 477 elif a.get('attribute', '') == 'SMBPROJ': 478 smbproj = a.get('value', None) 479 480 if all((smbshare, smbuser, smbproj)): 481 print >>f, "SMBshare: %s" % smbshare 482 print >>f, "ProjectUser: %s" % smbuser 483 print >>f, "ProjectName: %s" % smbproj 484 485 def client_hide_hosts(f, s): 486 for a in s.get('fedAttr', [ ]): 487 if a.get('attribute', "") == 'hosts': 488 print >>f, 'Hide: %s' % a.get('value', "") 489 490 client_service_out = { 491 'SMB': client_smb, 492 'tmcd': client_null, 493 'seer': client_null, 494 'userconfig': client_null, 495 'project_export': client_null, 496 'seer_master': client_seer_master, 497 'hide_hosts': client_hide_hosts, 498 } 499 500 def client_seer_master_export(f, s): 501 print >>f, "AddedNode: seer-master" 502 503 def client_seer_local_export(f, s): 504 print >>f, "AddedNode: control" 505 506 client_export_service_out = { 507 'seer_master': client_seer_master_export, 508 'local_seer_control': client_seer_local_export, 509 } 510 511 def server_port(f, s): 512 p = urlparse(s.get('server', 'http://localhost')) 513 print >>f, 'port: remote:%s:%s:%s' % (p.port, p.hostname, p.port) 514 515 def server_null(f,s): pass 516 517 def server_seer(f, s): 518 print >>f, 'seer: true' 519 520 server_service_out = { 521 'SMB': server_port, 522 'tmcd': server_port, 523 'userconfig': server_null, 524 'project_export': server_null, 525 'seer': server_seer, 526 'seer_master': server_port, 527 'hide_hosts': server_null, 528 } 529 # XXX: end un hardcode this 530 531 532 seer_out = False 533 client_out = False 534 for e in [ e for e in topo.elements \ 535 if isinstance(e, topdl.Computer) and e.get_attribute('portal')]: 536 myname = e.name 537 type = e.get_attribute('portal_type') 538 539 info = conninfo_to_dict(myname, connInfo) 540 541 if not info: 542 raise service_error(service_error.req, 543 "No connectivity info for %s" % myname) 544 545 # Translate to physical name (ProtoGENI doesn't have DNS) 546 physname = nodes.get(myname, { }).get('hostname', None) 547 peer = info.get('peer', "") 548 ldomain = self.domain 549 ssh_port = info.get('ssh_port', 22) 550 551 # Collect this for the client.conf file 552 if 'masterexperiment' in info: 553 mproj, meid = info['masterexperiment'].split("/", 1) 554 555 active = info.get('active', 'False') 556 557 if type in ('control', 'both'): 558 testbed = e.get_attribute('testbed') 559 control_gw = myname 560 561 cfn = "%s/%s.gw.conf" % (tmpdir, myname.lower()) 562 tunnelconfig = self.tunnel_config 563 try: 564 f = open(cfn, "w") 565 if active == 'True': 566 print >>f, "active: True" 567 print >>f, "ssh_port: %s" % ssh_port 568 if type in ('control', 'both'): 569 for s in [s for s in services \ 570 if s.get('name', "") in self.imports]: 571 server_service_out[s['name']](f, s) 572 573 if tunnelconfig: 574 print >>f, "tunnelip: %s" % tunnelconfig 575 print >>f, "peer: %s" % peer.lower() 576 print >>f, "ssh_pubkey: /usr/local/federation/etc/%s" % \ 577 pubkey_base 578 print >>f, "ssh_privkey: /usr/local/federation/etc/%s" % \ 579 secretkey_base 580 f.close() 581 except EnvironmentError, e: 582 raise service_error(service_error.internal, 583 "Can't write protal config %s: %s" % (cfn, e)) 584 585 # Done with portals, write the client config file. 586 try: 587 f = open("%s/client.conf" % tmpdir, "w") 588 if control_gw: 589 print >>f, "ControlGateway: %s" % physname.lower() 590 for s in services: 591 if s.get('name',"") in self.imports and \ 592 s.get('visibility','') == 'import': 593 client_service_out[s['name']](f, s) 594 if s.get('name', '') in self.exports and \ 595 s.get('visibility', '') == 'export' and \ 596 s['name'] in client_export_service_out: 597 client_export_service_out[s['name']](f, s) 598 # Seer uses this. 599 if mproj and meid: 600 print >>f, "ExperimentID: %s/%s" % (mproj, meid) 601 f.close() 602 except EnvironmentError, e: 603 raise service_error(service_error.internal, 604 "Cannot write client.conf: %s" %s) 605 606 607 608 def export_store_info(self, cf, nodes, ssh_port, connInfo): 609 """ 610 For the export requests in the connection info, install the peer names 611 at the experiment controller via SetValue calls. 612 """ 613 614 for c in connInfo: 615 for p in [ p for p in c.get('parameter', []) \ 616 if p.get('type', '') == 'output']: 617 618 if p.get('name', '') == 'peer': 619 k = p.get('key', None) 620 surl = p.get('store', None) 621 if surl and k and k.index('/') != -1: 622 if self.create_debug: 623 req = { 'name': k, 'value': 'debug' } 624 self.call_SetValue(surl, req, cf) 625 else: 626 n = nodes.get(k[k.index('/')+1:], { }) 627 value = n.get('hostname', None) 628 if value: 629 req = { 'name': k, 'value': value } 630 self.call_SetValue(surl, req, cf) 631 else: 632 self.log.error("No hostname for %s" % \ 633 k[k.index('/'):]) 634 else: 635 self.log.error("Bad export request: %s" % p) 636 elif p.get('name', '') == 'ssh_port': 637 k = p.get('key', None) 638 surl = p.get('store', None) 639 if surl and k: 640 req = { 'name': k, 'value': ssh_port } 641 self.call_SetValue(surl, req, cf) 642 else: 643 self.log.error("Bad export request: %s" % p) 644 else: 645 646 self.log.error("Unknown export parameter: %s" % \ 647 p.get('name')) 648 continue 649 650 def configure_nodes(self, segment_commands, topo, nodes, user, pubkey, secretkey, 651 stagingdir, tmpdir): 652 653 # These little functions/functors just make things more readable 654 class stage_file_type: 655 def __init__(self, user, host, stagingdir): 656 self.user = user 657 self.host = host 658 self.stagingdir = stagingdir 659 self.scp = "/usr/bin/scp -i .ssh/id_rsa -o " + \ 660 "'ForwardX11 no' -o 'StrictHostKeyChecking no' " 661 662 def __call__(self, script, file, dest="."): 663 # If the file is a full pathname, do not use stagingdir 664 if file.find('/') == -1: 665 file = "%s/%s" % (self.stagingdir, file) 666 print >>script, "%s %s@%s:%s %s" % \ 667 (self.scp, self.user, self.host, file, dest) 668 669 def install_tar(script, loc, base): 670 tar = "/bin/tar" 671 mkdir="/bin/mkdir" 672 673 print >>script, "%s -p %s" % (mkdir, loc) 674 print >>script, "%s -C %s -xzf %s" % (tar, loc, base) 675 676 def install_rpm(script, base): 677 rpm = "/bin/rpm" 678 print >>script, "%s --install %s" % (rpm, base) 679 680 fed_dir = "/usr/local/federation" 681 fed_etc_dir = "%s/etc" % fed_dir 682 fed_bin_dir = "%s/bin" % fed_dir 683 fed_lib_dir = "%s/lib" % fed_dir 684 685 ifconfig = "/sbin/ifconfig" 686 687 stage_file = stage_file_type(user, self.staging_host, stagingdir) 688 689 for e in [ e for e in topo.elements if isinstance(e, topdl.Computer)]: 690 vname = e.name 691 node = nodes.get(vname, { }) 692 pname = node.get('hostname', None) 693 if pname: 694 script = open("%s/%s.startup" %(tmpdir, pname), "w") 695 # Reset the interfaces to the ones in the topo file 696 for i in [ i for i in e.interface \ 697 if not i.get_attribute('portal')]: 698 pinf = node['interfaces'].get(i.name, None) 699 addr = i.get_attribute('ip4_address') 700 netmask = i.get_attribute('ip4_netmask') or '255.255.255.0' 701 if pinf and addr: 702 print >>script, \ 703 "%s %s %s netmask %s" % \ 704 (ifconfig, pinf, addr, netmask) 705 else: 706 self.log.error("Missing interface or address for %s" \ 707 % i.name) 708 709 for l, f in self.federation_software: 710 base = os.path.basename(f) 711 stage_file(script, base) 712 if l: install_tar(script, l, base) 713 else: install_rpm(script, base) 714 715 for s in e.software: 716 s_base = s.location.rpartition('/')[2] 717 stage_file(script, s_base) 718 if s.install: install_tar(script, s.install, s_base) 719 else: install_rpm(script, s_base) 720 721 for f in ('hosts', pubkey, secretkey, 'client.conf', 722 'userconf'): 723 stage_file(script, f, fed_etc_dir) 724 if self.sshd: 725 stage_file(script, self.sshd, fed_bin_dir) 726 if self.sshd_config: 727 stage_file(script, self.sshd_config, fed_etc_dir) 728 729 # Look in tmpdir to get the names. They've all been copied 730 # into the (remote) staging dir 731 if os.access("%s/%s.gw.conf" % (tmpdir, vname), os.R_OK): 732 stage_file(script, "%s.gw.conf" % vname, fed_etc_dir) 733 734 # Hackery dackery dock: the ProtoGENI python is really ancient. 735 # A modern version (though packaged for Mandrake (remember 736 # Mandrake? good times, good times)) should be in the 737 # federation_software list, but we need to move rename is for 738 # SEER. 739 print >>script, "rm /usr/bin/python" 740 print >>script, "ln /usr/bin/python2.4 /usr/bin/python" 741 # Back to less hacky stuff 742 743 # Start commands 744 if e.get_attribute('portal') and self.portal_startcommand: 745 # Install portal software 746 for l, f in self.portal_software: 747 base = os.path.basename(f) 748 stage_file(script, base) 749 if l: install_tar(script, l, base) 750 else: install_rpm(script, base) 751 752 # Portals never have a user-specified start command 753 print >>script, self.portal_startcommand 754 elif self.node_startcommand: 755 # XXX: debug 756 print >>script, "sudo perl -I%s %simport_key.pl /users/%s/.ssh/authorized_keys /root/.ssh/authorized_keys" % (fed_lib_dir, fed_bin_dir, user) 757 # XXX: debug 758 if e.get_attribute('startup'): 759 print >>script, "%s \\$USER '%s'" % \ 760 (self.node_startcommand, e.get_attribute('startup')) 761 else: 762 print >>script, self.node_startcommand 763 script.close() 764 if not segment_commands.scp_file("%s/%s.startup" % (tmpdir, pname), 765 user, pname): 766 self.log.error("Could not copy script to %s" % pname) 767 else: 768 self.log.error("Unmapped node: %s" % vname) 769 770 def start_node(self, user, host, node, segment_commands): 771 # Place an identity on the node so that the copying can succeed 772 segment_commands.ssh_cmd(user, host, "scp .ssh/id_rsa %s:.ssh" % node) 773 segment_commands.ssh_cmd(user, node, 774 "sudo /bin/sh ./%s.startup &" % node) 775 776 def start_nodes(self, user, host, nodes, segment_commands): 777 threads = [ ] 778 for n in nodes: 779 t = Thread(target=self.start_node, args=(user, host, n, 780 segment_commands)) 781 t.start() 782 threads.append(t) 783 784 done = [not t.isAlive() for t in threads] 785 while not all(done): 786 self.log.info("Waiting for threads %s" % done) 787 time.sleep(10) 788 done = [not t.isAlive() for t in threads] 789 790 791 792 793 def start_segment(self, segment_commands, aid, user, rspec, pubkey, 794 secretkey, ename, stagingdir, tmpdir, certfile, certpw, 795 export_certfile, topo, connInfo, services, timeout=0): 796 """ 797 Start a sub-experiment on a federant. 798 799 Get the current state, modify or create as appropriate, ship data 800 and configs and start the experiment. There are small ordering 801 differences based on the initial state of the sub-experiment. 802 """ 803 804 def random_slicename(user): 805 slicename = user 806 for i in range(0,5): 807 slicename += random.choice(string.ascii_letters) 808 return slicename 809 810 host = self.staging_host 811 if not os.access(certfile, os.R_OK): 812 self.log.error("[start_segment]: Cannot read certfile: %s" % \ 813 certfile) 814 return False 815 ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) 816 # Local software dir 817 lsoftdir = "%s/software" % tmpdir 818 819 # Open up a temporary file to contain a script for setting up the 820 # filespace for the new experiment. 821 self.log.info("[start_segment]: creating script file") 822 try: 823 sf, scriptname = tempfile.mkstemp() 824 scriptfile = os.fdopen(sf, 'w') 825 except EnvironmentError: 826 return False 827 828 scriptbase = os.path.basename(scriptname) 829 830 # Script the filesystem changes 831 print >>scriptfile, "/bin/rm -rf %s" % stagingdir 832 print >>scriptfile, 'mkdir -p %s' % stagingdir 833 print >>scriptfile, "rm -f %s" % scriptbase 834 scriptfile.close() 835 836 # Move the script to the remote machine 837 # XXX: could collide tempfile names on the remote host 838 if segment_commands.scp_file(scriptname, user, host, scriptbase): 839 os.remove(scriptname) 840 else: 841 return False 842 843 # Execute the script (and the script's last line deletes it) 844 if not segment_commands.ssh_cmd(user, host, "sh -x %s" % scriptbase): 845 return False 846 847 try: 848 gcred = segment_commands.pg_call(self.sa_url, 849 'GetCredential', {}, ctxt) 850 except self.ProtoGENIError, e: 851 raise service_error(service_error.federant, 852 "ProtoGENI: %s" % e) 853 # Find a slicename not in use 854 slicename = "fabereGpgL" 855 while True: 856 slicename = random_slicename(user) 857 try: 858 param = { 859 'credential': gcred, 860 'hrn': slicename, 861 'type': 'Slice' 862 } 863 segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt) 864 except segment_commands.ProtoGENIError, e: 865 print e 866 break 867 868 self.log.info("Creating %s" % slicename) 869 f = open("./rspec", "w") 870 print >>f, "%s" % rspec 871 f.close() 872 # Create the slice and allocate resources. If any of this stuff fails, 873 # the allocations will time out on PG in short order, so we just raise 874 # the service_error. 875 try: 876 param = { 877 'credential': gcred, 878 'hrn': slicename, 879 'type': 'Slice' 880 } 881 slice_cred = segment_commands.pg_call(self.sa_url, 'Register', param, ctxt) 882 f = open("./slice_cred", "w") 883 print >>f, slice_cred 884 f.close() 885 # Populate the ssh keys (let PG format them) 886 param = { 887 'credential': gcred, 888 } 889 keys = segment_commands.pg_call(self.sa_url, 'GetKeys', param, ctxt) 890 # Grab and redeem a ticket 891 param = { 892 'credential': slice_cred, 893 'rspec': rspec, 894 } 895 ticket = segment_commands.pg_call(self.cm_url, 'GetTicket', param, ctxt) 896 f = open("./ticket", "w") 897 print >>f, ticket 898 f.close() 899 param = { 900 'credential': slice_cred, 901 'keys': keys, 902 'ticket': ticket, 903 } 904 sliver_cred, manifest = segment_commands.pg_call(self.cm_url, 905 'RedeemTicket', param, ctxt) 906 f = open("./sliver_cred", "w") 907 print >>f, sliver_cred 908 f.close() 909 f = open("./manifest", "w") 910 print >>f, manifest 911 f.close() 912 # start 'em up 913 param = { 914 'credential': sliver_cred, 915 } 916 segment_commands.pg_call(self.cm_url, 'StartSliver', param, ctxt) 917 except segment_commands.ProtoGENIError, e: 918 raise service_error(service_error.federant, 919 "ProtoGENI: %s %s" % (e.code, e)) 920 921 # With manifest in hand, we can export the portal node names. 922 if self.create_debug: nodes = self.fake_manifest(topo) 923 else: nodes = self.manifest_to_dict(manifest) 924 925 self.export_store_info(export_certfile, nodes, self.ssh_port, 926 connInfo) 927 self.generate_portal_configs(topo, pubkey, secretkey, tmpdir, 928 ename, connInfo, services, nodes) 929 930 # Copy software to the staging machine (done after generation to copy 931 # those, too) 932 for d in (tmpdir, lsoftdir): 933 if os.path.isdir(d): 934 for f in os.listdir(d): 935 if not os.path.isdir("%s/%s" % (d, f)): 936 if not segment_commands.scp_file("%s/%s" % (d, f), 937 user, host, "%s/%s" % (stagingdir, f)): 938 self.log.error("Scp failed") 939 return False 940 941 942 # Now we wait for the nodes to start on PG 943 status = 'notready' 944 try: 945 while status == 'notready': 946 param = { 947 'credential': slice_cred 948 } 949 r = segment_commands.pg_call(self.cm_url, 'SliceStatus', param, ctxt) 950 print r 951 status = r.get('status', 'notready') 952 if status == 'notready': 953 time.sleep(30) 954 except segment_commands.ProtoGENIError, e: 955 raise service_error(service_error.federant, 956 "ProtoGENI: %s %s" % (e.code, e)) 957 958 if status == 'failed': 959 self.log.error('Sliver failed to start on ProtoGENI') 960 try: 961 param = { 962 'credential': slice_cred 963 } 964 segment_commands.pg_call(self.cm_url, 'DeleteSliver', param, ctxt) 965 except segment_commands.ProtoGENIError, e: 966 raise service_error(service_error.federant, 967 "ProtoGENI: %s" % e) 968 return False 969 else: 970 self.state_lock.acquire() 971 self.allocation[aid]['slice_name'] = slicename 972 self.allocation[aid]['slice_credential'] = slice_cred 973 self.allocation[aid]['sliver_credential'] = sliver_cred 974 self.allocation[aid]['manifest'] = manifest 975 self.allocation[aid]['certfile'] = certfile 976 self.allocation[aid]['certpw'] = certpw 977 self.write_state() 978 self.state_lock.release() 979 980 # Now we have configuration to do for ProtoGENI 981 self.configure_nodes(segment_commands, topo, nodes, user, pubkey, secretkey, 982 stagingdir, tmpdir) 983 984 self.start_nodes(user, self.staging_host, 985 [ n.get('hostname', None) for n in nodes.values()], 986 segment_commands) 987 988 # Everything has gone OK. 989 return True, dict([(k, n.get('hostname', None)) \ 990 for k, n in nodes.items()]) 310 991 311 992 def generate_rspec(self, topo, softdir, connInfo): … … 457 1138 cpw, alloc_log) 458 1139 459 def finalize_experiment(self, topo, starter, aid, alloc_id):1140 def finalize_experiment(self, topo, nodes, aid, alloc_id): 460 1141 # Copy the assigned names into the return topology 461 1142 rvtopo = topo.clone() 462 1143 embedding = [ ] 463 for n in starter.node:1144 for k, n in nodes.items(): 464 1145 embedding.append({ 465 'toponame': n,466 'physname': ["%s%s" % ( starter.node[n], self.domain)],1146 'toponame': k, 1147 'physname': ["%s%s" % (n, self.domain)], 467 1148 }) 468 1149 # Grab the log (this is some anal locking, but better safe than … … 535 1216 cpw, alloc_log = self.initialize_experiment_info(attrs, 536 1217 aid, certfile, tmpdir) 537 # XXX: we really need to put the import and connection info538 # generation off longer.539 1218 self.import_store_info(certfile, connInfo) 540 1219 rspec = self.generate_rspec(topo, "%s/%s/" \ 541 1220 % (self.staging_dir, ename), connInfo) 542 1221 543 s tarter = self.start_segment(keyfile=ssh_key,1222 segment_commands = protogeni_proxy(keyfile=ssh_key, 544 1223 debug=self.create_debug, log=alloc_log, 545 1224 ch_url = self.ch_url, sa_url=self.sa_url, 546 1225 cm_url=self.cm_url) 547 rv = starter(self, aid, user, rspec, pubkey_base, secretkey_base, 548 ename, 1226 rv, nodes = self.start_segment(segment_commands, aid, user, rspec, 1227 pubkey_base, 1228 secretkey_base, ename, 549 1229 "%s/%s" % (self.staging_dir, ename), tmpdir, cf, cpw, 550 1230 certfile, topo, connInfo, services) … … 563 1243 564 1244 if rv: 565 return self.finalize_experiment(topo, starter, aid, req['allocID'])1245 return self.finalize_experiment(topo, nodes, aid, req['allocID']) 566 1246 elif err: 567 1247 raise service_error(service_error.federant, … … 569 1249 else: 570 1250 raise service_error(service_error.federant, "Swapin failed") 1251 1252 def stop_segment(self, segment_commands, user, stagingdir, slice_cred, 1253 certfile, certpw): 1254 """ 1255 Stop a sub experiment by calling swapexp on the federant 1256 """ 1257 host = self.staging_host 1258 rv = False 1259 try: 1260 # Clean out tar files: we've gone over quota in the past 1261 if stagingdir: 1262 segment_commands.ssh_cmd(user, host, "rm -rf %s" % stagingdir) 1263 if slice_cred: 1264 self.log.error('Removing Sliver on ProtoGENI') 1265 ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) 1266 try: 1267 param = { 1268 'credential': slice_cred 1269 } 1270 segment_commands.pg_call(self.cm_url, 'DeleteSlice', 1271 param, ctxt) 1272 except segment_commands.ProtoGENIError, e: 1273 raise service_error(service_error.federant, 1274 "ProtoGENI: %s" % e) 1275 return True 1276 except self.ssh_cmd_timeout: 1277 rv = False 1278 return rv 571 1279 572 1280 def TerminateSegment(self, req, fid): … … 599 1307 staging = None 600 1308 601 stopper = self.stop_segment(keyfile=ssh_key, debug=self.create_debug, 602 ch_url = self.ch_url, sa_url=self.sa_url, cm_url=self.cm_url) 603 stopper(self, user, staging, slice_cred, cf, cpw) 1309 segment_commands = protogeni_proxy(keyfile=ssh_key, 1310 debug=self.create_debug, ch_url = self.ch_url, 1311 sa_url=self.sa_url, cm_url=self.cm_url) 1312 self.stop_segment(segment_commands, user, staging, slice_cred, cf, cpw) 604 1313 return { 'allocID': req['allocID'] } 1314 1315 def renew_segment(self, segment_commands, name, scred, interval, 1316 certfile, certpw): 1317 ctxt = fedd_ssl_context(my_cert=certfile, password=certpw) 1318 try: 1319 expiration = time.strftime("%Y%m%dT%H:%M:%S", 1320 time.gmtime(time.time() + interval)) 1321 cred = segment_commands.pg_call(self.sa_url, 'GetCredential', {}, ctxt) 1322 1323 param = { 1324 'credential': scred, 1325 'expiration': expiration 1326 } 1327 r = segment_commands.pg_call(self.sa_url, 'RenewSlice', param, ctxt) 1328 param = { 1329 'credential': cred, 1330 'hrn': name, 1331 'type': 'Slice', 1332 } 1333 slice = segment_commands.pg_call(self.sa_url, 'Resolve', param, ctxt) 1334 uuid = slice.get('uuid', None) 1335 if uuid == None: 1336 sys.exit('No uuid for %s' % slicename) 1337 1338 print 'Calling GetCredential (uuid)' 1339 param = { 1340 'credential': cred, 1341 'uuid': uuid, 1342 'type': 'Slice', 1343 } 1344 new_scred = segment_commands.pg_call(self.sa_url, 'GetCredential', param, ctxt) 1345 f = open('./new_slice_cred', 'w') 1346 print >>f, new_scred 1347 f.close() 1348 1349 except segment_commands.ProtoGENIError, e: 1350 self.log.error("Failed to extend slice %s: %s" % (name, e)) 1351 return None 1352 try: 1353 print 'Calling RenewSlice (CM)' 1354 param = { 1355 'credential': new_scred, 1356 } 1357 r = segment_commands.pg_call(self.cm_url, 'RenewSlice', param, ctxt) 1358 except segment_commands.ProtoGENIError, e: 1359 self.log.warn("Failed to renew sliver for %s: %s" % (name, e)) 1360 1361 return new_scred 1362 605 1363 606 1364 def RenewSlices(self): … … 628 1386 # There's a ProtoGENI slice associated with the segment; renew it. 629 1387 if name and scred: 630 renewer = self.renew_segment(log=self.log,1388 segment_commands = protogeni_proxy(log=self.log, 631 1389 debug=self.create_debug, keyfile=ssh_key, 632 1390 cm_url = self.cm_url, sa_url = self.sa_url, 633 1391 ch_url = self.ch_url) 634 new_scred = renewer(name, scred, self.renewal_interval, cf, cpw) 1392 new_scred = self.renew_segment(segment_commands, name, scred, 1393 self.renewal_interval, cf, cpw) 635 1394 if new_scred: 636 1395 self.log.info("Slice %s renewed until %s GMT" % \
Note: See TracChangeset
for help on using the changeset viewer.