source: fedd/fedd_experiment_control.py @ 4ed10ae

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 4ed10ae was 4ed10ae, checked in by Ted Faber <faber@…>, 16 years ago

Proxy key additions working

  • Property mode set to 100644
File size: 53.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47   
48    class thread_pool:
49        """
50        A class to keep track of a set of threads all invoked for the same
51        task.  Manages the mutual exclusion of the states.
52        """
53        def __init__(self):
54            """
55            Start a pool.
56            """
57            self.changed = Condition()
58            self.started = 0
59            self.terminated = 0
60
61        def acquire(self):
62            """
63            Get the pool's lock.
64            """
65            self.changed.acquire()
66
67        def release(self):
68            """
69            Release the pool's lock.
70            """
71            self.changed.release()
72
73        def wait(self, timeout = None):
74            """
75            Wait for a pool thread to start or stop.
76            """
77            self.changed.wait(timeout)
78
79        def start(self):
80            """
81            Called by a pool thread to report starting.
82            """
83            self.changed.acquire()
84            self.started += 1
85            self.changed.notifyAll()
86            self.changed.release()
87
88        def terminate(self):
89            """
90            Called by a pool thread to report finishing.
91            """
92            self.changed.acquire()
93            self.terminated += 1
94            self.changed.notifyAll()
95            self.changed.release()
96
97        def clear(self):
98            """
99            Clear all pool data.
100            """
101            self.changed.acquire()
102            self.started = 0
103            self.terminated =0
104            self.changed.notifyAll()
105            self.changed.release()
106
107    class pooled_thread(Thread):
108        """
109        One of a set of threads dedicated to a specific task.  Uses the
110        thread_pool class above for coordination.
111        """
112        def __init__(self, group=None, target=None, name=None, args=(), 
113                kwargs={}, pdata=None, trace_file=None):
114            Thread.__init__(self, group, target, name, args, kwargs)
115            self.rv = None          # Return value of the ops in this thread
116            self.exception = None   # Exception that terminated this thread
117            self.target=target      # Target function to run on start()
118            self.args = args        # Args to pass to target
119            self.kwargs = kwargs    # Additional kw args
120            self.pdata = pdata      # thread_pool for this class
121            # Logger for this thread
122            self.log = logging.getLogger("fedd.experiment_control")
123       
124        def run(self):
125            """
126            Emulate Thread.run, except add pool data manipulation and error
127            logging.
128            """
129            if self.pdata:
130                self.pdata.start()
131
132            if self.target:
133                try:
134                    self.rv = self.target(*self.args, **self.kwargs)
135                except service_error, s:
136                    self.exception = s
137                    self.log.error("Thread exception: %s %s" % \
138                            (s.code_string(), s.desc))
139                except:
140                    self.exception = sys.exc_info()[1]
141                    self.log.error(("Unexpected thread exception: %s" +\
142                            "Trace %s") % (self.exception,\
143                                traceback.format_exc()))
144            if self.pdata:
145                self.pdata.terminate()
146
147    def __init__(self, config=None):
148        """
149        Intialize the various attributes, most from the config object
150        """
151        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
152        self.thread_pool = fedd_experiment_control_local.thread_pool
153
154        self.cert_file = None
155        self.cert_pwd = None
156        self.trusted_certs = None
157
158        # Walk through the various relevant certificat specifying config
159        # attributes until the local certificate attributes can be resolved.
160        # The walk is from most specific to most general specification.
161        for s in ("experiment_control", "globals"):
162            if config.has_section(s): 
163                if config.has_option(s, "cert_file"):
164                    if not self.cert_file:
165                        self.cert_file = config.get(s, "cert_file")
166                        self.cert_pwd = config.get(s, "cert_pwd")
167
168                if config.has_option(s, "trusted_certs"):
169                    if not self.trusted_certs:
170                        self.trusted_certs = config.get(s, "trusted_certs")
171
172        self.exp_stem = "fed-stem"
173        self.log = logging.getLogger("fedd.experiment_control")
174        self.muxmax = 2
175        self.nthreads = 2
176        self.randomize_experiments = False
177
178        self.scp_exec = "/usr/bin/scp"
179        self.splitter = None
180        self.ssh_exec="/usr/bin/ssh"
181        self.ssh_keygen = "/usr/bin/ssh-keygen"
182        self.ssh_identity_file = None
183
184        if config.has_section("experiment_control"):
185            self.debug = config.get("experiment_control", "create_debug")
186            self.state_filename = config.get("experiment_control", 
187                    "experiment_state_file")
188            self.splitter_url = config.get("experiment_control", "splitter_url")
189            self.fedkit = config.get("experiment_control", "fedkit")
190        else:
191            self.debug = False
192            self.state_filename = None
193            self.splitter_url = None
194            self.fedkit = None
195
196        # XXX
197        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
198        self.ssh_type = "rsa"
199        self.state = { }
200        self.state_lock = Lock()
201        self.tclsh = "/usr/local/bin/otclsh"
202        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
203        self.tbmap = { 
204                'deter':'https://users.isi.deterlab.net:23235',
205                'emulab':'https://users.isi.deterlab.net:23236',
206                'ucb':'https://users.isi.deterlab.net:23237',
207                }
208        self.trace_file = sys.stderr
209
210        self.def_expstart = \
211                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
212                "/tmp/federate";
213        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
214                "FEDDIR/hosts";
215        self.def_gwstart = \
216                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
217                "/tmp/bridge.log";
218        self.def_mgwstart = \
219                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
220                "/tmp/bridge.log";
221        self.def_gwimage = "FBSD61-TUNNEL2";
222        self.def_gwtype = "pc";
223
224
225        if self.ssh_pubkey_file:
226            try:
227                f = open(self.ssh_pubkey_file, 'r')
228                self.ssh_pubkey = f.read()
229                f.close()
230            except IOError:
231                raise service_error(service_error.internal,
232                        "Cannot read sshpubkey")
233
234        set_log_level(config, "experiment_control", self.log)
235
236        # Grab saved state.  OK to do this w/o locking because it's read only
237        # and only one thread should be in existence that can see self.state at
238        # this point.
239        if self.state_filename:
240            self.read_state()
241
242        # Dispatch tables
243        self.soap_services = {\
244                'Create': make_soap_handler(\
245                        CreateRequestMessage.typecode,
246                        getattr(self, "create_experiment"), 
247                        CreateResponseMessage,
248                        "CreateResponseBody"),
249                'Vtopo': make_soap_handler(\
250                        VtopoRequestMessage.typecode,
251                        getattr(self, "get_vtopo"),
252                        VtopoResponseMessage,
253                        "VtopoResponseBody"),
254                'Vis': make_soap_handler(\
255                        VisRequestMessage.typecode,
256                        getattr(self, "get_vis"),
257                        VisResponseMessage,
258                        "VisResponseBody"),
259                'Info': make_soap_handler(\
260                        InfoRequestMessage.typecode,
261                        getattr(self, "get_info"),
262                        InfoResponseMessage,
263                        "InfoResponseBody"),
264                'Terminate': make_soap_handler(\
265                        TerminateRequestMessage.typecode,
266                        getattr(self, "terminate_experiment"),
267                        TerminateResponseMessage,
268                        "TerminateResponseBody"),
269        }
270
271        self.xmlrpc_services = {\
272                'Create': make_xmlrpc_handler(\
273                        getattr(self, "create_experiment"), 
274                        "CreateResponseBody"),
275                'Vtopo': make_xmlrpc_handler(\
276                        getattr(self, "get_vtopo"),
277                        "VtopoResponseBody"),
278                'Vis': make_xmlrpc_handler(\
279                        getattr(self, "get_vis"),
280                        "VisResponseBody"),
281                'Info': make_xmlrpc_handler(\
282                        getattr(self, "get_info"),
283                        "InfoResponseBody"),
284                'Terminate': make_xmlrpc_handler(\
285                        getattr(self, "terminate_experiment"),
286                        "TerminateResponseBody"),
287        }
288
289    def copy_file(self, src, dest, size=1024):
290        """
291        Exceedingly simple file copy.
292        """
293        s = open(src,'r')
294        d = open(dest, 'w')
295
296        buf = "x"
297        while buf != "":
298            buf = s.read(size)
299            d.write(buf)
300        s.close()
301        d.close()
302
303    # Call while holding self.state_lock
304    def write_state(self):
305        """
306        Write a new copy of experiment state after copying the existing state
307        to a backup.
308
309        State format is a simple pickling of the state dictionary.
310        """
311        if os.access(self.state_filename, os.W_OK):
312            self.copy_file(self.state_filename, \
313                    "%s.bak" % self.state_filename)
314        try:
315            f = open(self.state_filename, 'w')
316            pickle.dump(self.state, f)
317        except IOError, e:
318            self.log.error("Can't write file %s: %s" % \
319                    (self.state_filename, e))
320        except pickle.PicklingError, e:
321            self.log.error("Pickling problem: %s" % e)
322        except TypeError, e:
323            self.log.error("Pickling problem (TypeError): %s" % e)
324
325    # Call while holding self.state_lock
326    def read_state(self):
327        """
328        Read a new copy of experiment state.  Old state is overwritten.
329
330        State format is a simple pickling of the state dictionary.
331        """
332        try:
333            f = open(self.state_filename, "r")
334            self.state = pickle.load(f)
335            self.log.debug("[read_state]: Read state from %s" % \
336                    self.state_filename)
337        except IOError, e:
338            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
339                    % (self.state_filename, e))
340        except pickle.UnpicklingError, e:
341            self.log.warning(("[read_state]: No saved state: " + \
342                    "Unpickling failed: %s") % e)
343
344    def scp_file(self, file, user, host, dest=""):
345        """
346        scp a file to the remote host.  If debug is set the action is only
347        logged.
348        """
349
350        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
351        rv = 0
352
353        try:
354            dnull = open("/dev/null", "r")
355        except IOError:
356            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
357            dnull = Null
358
359        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
360        if not self.debug:
361            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
362            else: rv = call(scp_cmd)
363
364        return rv == 0
365
366    def ssh_cmd(self, user, host, cmd, wname=None):
367        """
368        Run a remote command on host as user.  If debug is set, the action is
369        only logged.
370        """
371        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
372
373        try:
374            dnull = open("/dev/null", "r")
375        except IOError:
376            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
377            dnull = Null
378
379        self.log.debug("[ssh_cmd]: %s" % sh_str)
380        if not self.debug:
381            if dnull:
382                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
383            else:
384                sub = Popen(sh_str, shell=True)
385            return sub.wait() == 0
386        else:
387            return True
388
389    def ship_configs(self, host, user, src_dir, dest_dir):
390        """
391        Copy federant-specific configuration files to the federant.
392        """
393        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
394            return False
395        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
396            return False
397
398        for f in os.listdir(src_dir):
399            if os.path.isdir(f):
400                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
401                        "%s/%s" % (dest_dir, f)):
402                    return False
403            else:
404                if not self.scp_file("%s/%s" % (src_dir, f), 
405                        user, host, dest_dir):
406                    return False
407        return True
408
409    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
410        """
411        Start a sub-experiment on a federant.
412
413        Get the current state, modify or create as appropriate, ship data and
414        configs and start the experiment.  There are small ordering differences
415        based on the initial state of the sub-experiment.
416        """
417        # ops node in the federant
418        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
419        user = tbparams[tb]['user']     # federant user
420        pid = tbparams[tb]['project']   # federant project
421        # XXX
422        base_confs = ( "hosts",)
423        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
424        # command to test experiment state
425        expinfo_exec = "/usr/testbed/bin/expinfo" 
426        # Configuration directories on the remote machine
427        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
428        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
429        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
430        # Regular expressions to parse the expinfo response
431        state_re = re.compile("State:\s+(\w+)")
432        no_exp_re = re.compile("^No\s+such\s+experiment")
433        state = None    # Experiment state parsed from expinfo
434        # The expinfo ssh command
435        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
436
437        # Get status
438        self.log.debug("[start_segment]: %s"% " ".join(cmd))
439        dev_null = None
440        try:
441            dev_null = open("/dev/null", "a")
442        except IOError, e:
443            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
444
445        if self.debug:
446            state = 'swapped'
447            rv = 0
448        else:
449            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
450            for line in status.stdout:
451                m = state_re.match(line)
452                if m: state = m.group(1)
453                else:
454                    m = no_exp_re.match(line)
455                    if m: state = "none"
456            rv = status.wait()
457
458        # If the experiment is not present the subcommand returns a non-zero
459        # return value.  If we successfully parsed a "none" outcome, ignore the
460        # return code.
461        if rv != 0 and state != "none":
462            raise service_error(service_error.internal,
463                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
464
465        self.log.debug("[start_segment]: %s: %s" % (tb, state))
466        self.log.info("[start_segment]:transferring experiment to %s" % tb)
467
468        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
469            return False
470        # Clear the federation files
471        if not self.ssh_cmd(user, host, 
472                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
473            return False
474        if not self.ssh_cmd(user, host, 
475                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
476            return False
477        # Clear and create the tarfiles and rpm directories
478        for d in (tarfiles_dir, rpms_dir):
479            if not self.ssh_cmd(user, host, 
480                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
481                return False
482            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
483                    "create tarfiles"):
484                return False
485       
486        if state == 'active':
487            # Remote experiment is active.  Modify it.
488            for f in base_confs:
489                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
490                        "%s/%s" % (proj_dir, f)):
491                    return False
492            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
493                    proj_dir):
494                return False
495            if os.path.isdir("%s/tarfiles" % tmpdir):
496                if not self.ship_configs(host, user,
497                        "%s/tarfiles" % tmpdir, tarfiles_dir):
498                    return False
499            if os.path.isdir("%s/rpms" % tmpdir):
500                if not self.ship_configs(host, user,
501                        "%s/rpms" % tmpdir, tarfiles_dir):
502                    return False
503            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
504            if not self.ssh_cmd(user, host,
505                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
506                            (pid, eid, tclfile), "modexp"):
507                return False
508            return True
509        elif state == "swapped":
510            # Remote experiment swapped out.  Modify it and swap it in.
511            for f in base_confs:
512                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
513                        "%s/%s" % (proj_dir, f)):
514                    return False
515            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
516                    proj_dir):
517                return False
518            if os.path.isdir("%s/tarfiles" % tmpdir):
519                if not self.ship_configs(host, user,
520                        "%s/tarfiles" % tmpdir, tarfiles_dir):
521                    return False
522            if os.path.isdir("%s/rpms" % tmpdir):
523                if not self.ship_configs(host, user,
524                        "%s/rpms" % tmpdir, tarfiles_dir):
525                    return False
526            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
527            if not self.ssh_cmd(user, host,
528                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
529                    "modexp"):
530                return False
531            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
532            if not self.ssh_cmd(user, host,
533                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
534                    "swapexp"):
535                return False
536            return True
537        elif state == "none":
538            # No remote experiment.  Create one.  We do this in 2 steps so we
539            # can put the configuration files and scripts into the new
540            # experiment directories.
541
542            # Tarfiles must be present for creation to work
543            if os.path.isdir("%s/tarfiles" % tmpdir):
544                if not self.ship_configs(host, user,
545                        "%s/tarfiles" % tmpdir, tarfiles_dir):
546                    return False
547            if os.path.isdir("%s/rpms" % tmpdir):
548                if not self.ship_configs(host, user,
549                        "%s/rpms" % tmpdir, tarfiles_dir):
550                    return False
551            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
552            if not self.ssh_cmd(user, host,
553                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
554                            (pid, eid, tclfile), "startexp"):
555                return False
556            # After startexp the per-experiment directories exist
557            for f in base_confs:
558                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
559                        "%s/%s" % (proj_dir, f)):
560                    return False
561            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
562                    proj_dir):
563                return False
564            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
565            if not self.ssh_cmd(user, host,
566                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
567                    "swapexp"):
568                return False
569            return True
570        else:
571            self.log.debug("[start_segment]:unknown state %s" % state)
572            return False
573
574    def stop_segment(self, tb, eid, tbparams):
575        """
576        Stop a sub experiment by calling swapexp on the federant
577        """
578        user = tbparams[tb]['user']
579        host = tbparams[tb]['host']
580        pid = tbparams[tb]['project']
581
582        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
583        return self.ssh_cmd(user, host,
584                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
585
586       
587    def generate_ssh_keys(self, dest, type="rsa" ):
588        """
589        Generate a set of keys for the gateways to use to talk.
590
591        Keys are of type type and are stored in the required dest file.
592        """
593        valid_types = ("rsa", "dsa")
594        t = type.lower();
595        if t not in valid_types: raise ValueError
596        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
597
598        try:
599            trace = open("/dev/null", "w")
600        except IOError:
601            raise service_error(service_error.internal,
602                    "Cannot open /dev/null??");
603
604        # May raise CalledProcessError
605        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
606        rv = call(cmd, stdout=trace, stderr=trace)
607        if rv != 0:
608            raise service_error(service_error.internal, 
609                    "Cannot generate nonce ssh keys.  %s return code %d" \
610                            % (self.ssh_keygen, rv))
611
612    def gentopo(self, str):
613        """
614        Generate the topology dtat structure from the splitter's XML
615        representation of it.
616
617        The topology XML looks like:
618            <experiment>
619                <nodes>
620                    <node><vname></vname><ips>ip1:ip2</ips></node>
621                </nodes>
622                <lans>
623                    <lan>
624                        <vname></vname><vnode></vnode><ip></ip>
625                        <bandwidth></bandwidth><member>node:port</member>
626                    </lan>
627                </lans>
628        """
629        class topo_parse:
630            """
631            Parse the topology XML and create the dats structure.
632            """
633            def __init__(self):
634                # Typing of the subelements for data conversion
635                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
636                self.int_subelements = ( 'bandwidth',)
637                self.float_subelements = ( 'delay',)
638                # The final data structure
639                self.nodes = [ ]
640                self.lans =  [ ]
641                self.topo = { \
642                        'node': self.nodes,\
643                        'lan' : self.lans,\
644                    }
645                self.element = { }  # Current element being created
646                self.chars = ""     # Last text seen
647
648            def end_element(self, name):
649                # After each sub element the contents is added to the current
650                # element or to the appropriate list.
651                if name == 'node':
652                    self.nodes.append(self.element)
653                    self.element = { }
654                elif name == 'lan':
655                    self.lans.append(self.element)
656                    self.element = { }
657                elif name in self.str_subelements:
658                    self.element[name] = self.chars
659                    self.chars = ""
660                elif name in self.int_subelements:
661                    self.element[name] = int(self.chars)
662                    self.chars = ""
663                elif name in self.float_subelements:
664                    self.element[name] = float(self.chars)
665                    self.chars = ""
666
667            def found_chars(self, data):
668                self.chars += data.rstrip()
669
670
671        tp = topo_parse();
672        parser = xml.parsers.expat.ParserCreate()
673        parser.EndElementHandler = tp.end_element
674        parser.CharacterDataHandler = tp.found_chars
675
676        parser.Parse(str)
677
678        return tp.topo
679       
680
681    def genviz(self, topo):
682        """
683        Generate the visualization the virtual topology
684        """
685
686        neato = "/usr/local/bin/neato"
687        # These are used to parse neato output and to create the visualization
688        # file.
689        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
690        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
691                "%s</type></node>"
692
693        try:
694            # Node names
695            nodes = [ n['vname'] for n in topo['node'] ]
696            topo_lans = topo['lan']
697        except KeyError:
698            raise service_error(service_error.internal, "Bad topology")
699
700        lans = { }
701        links = { }
702
703        # Walk through the virtual topology, organizing the connections into
704        # 2-node connections (links) and more-than-2-node connections (lans).
705        # When a lan is created, it's added to the list of nodes (there's a
706        # node in the visualization for the lan).
707        for l in topo_lans:
708            if links.has_key(l['vname']):
709                if len(links[l['vname']]) < 2:
710                    links[l['vname']].append(l['vnode'])
711                else:
712                    nodes.append(l['vname'])
713                    lans[l['vname']] = links[l['vname']]
714                    del links[l['vname']]
715                    lans[l['vname']].append(l['vnode'])
716            elif lans.has_key(l['vname']):
717                lans[l['vname']].append(l['vnode'])
718            else:
719                links[l['vname']] = [ l['vnode'] ]
720
721
722        # Open up a temporary file for dot to turn into a visualization
723        try:
724            df, dotname = tempfile.mkstemp()
725            dotfile = os.fdopen(df, 'w')
726        except IOError:
727            raise service_error(service_error.internal,
728                    "Failed to open file in genviz")
729
730        # Generate a dot/neato input file from the links, nodes and lans
731        try:
732            print >>dotfile, "graph G {"
733            for n in nodes:
734                print >>dotfile, '\t"%s"' % n
735            for l in links.keys():
736                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
737            for l in lans.keys():
738                for n in lans[l]:
739                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
740            print >>dotfile, "}"
741            dotfile.close()
742        except TypeError:
743            raise service_error(service_error.internal,
744                    "Single endpoint link in vtopo")
745        except IOError:
746            raise service_error(service_error.internal, "Cannot write dot file")
747
748        # Use dot to create a visualization
749        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
750                '-Gpack=true', dotname], stdout=PIPE)
751
752        # Translate dot to vis format
753        vis_nodes = [ ]
754        vis = { 'node': vis_nodes }
755        for line in dot.stdout:
756            m = vis_re.match(line)
757            if m:
758                vn = m.group(1)
759                vis_node = {'name': vn, \
760                        'x': float(m.group(2)),\
761                        'y' : float(m.group(3)),\
762                    }
763                if vn in links.keys() or vn in lans.keys():
764                    vis_node['type'] = 'lan'
765                else:
766                    vis_node['type'] = 'node'
767                vis_nodes.append(vis_node)
768        rv = dot.wait()
769
770        os.remove(dotname)
771        if rv == 0 : return vis
772        else: return None
773
774    def get_access(self, tb, nodes, user, tbparam):
775        """
776        Get access to testbed through fedd and set the parameters for that tb
777        """
778
779        translate_attr = {
780            'slavenodestartcmd': 'expstart',
781            'slaveconnectorstartcmd': 'gwstart',
782            'masternodestartcmd': 'mexpstart',
783            'masterconnectorstartcmd': 'mgwstart',
784            'connectorimage': 'gwimage',
785            'connectortype': 'gwtype',
786            'tunnelcfg': 'tun',
787            'smbshare': 'smbshare',
788        }
789
790        uri = self.tbmap.get(tb, None)
791        if not uri:
792            raise service_error(serice_error.server_config, 
793                    "Unknown testbed: %s" % tb)
794
795        # The basic request
796        req = {\
797                'destinationTestbed' : { 'uri' : uri },
798                'user':  user,
799                'allocID' : { 'localname': 'test' },
800                # XXX: need to get service access stright
801                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
802                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
803            }
804       
805        # node resources if any
806        if nodes != None and len(nodes) > 0:
807            rnodes = [ ]
808            for n in nodes:
809                rn = { }
810                image, hw, count = n.split(":")
811                if image: rn['image'] = [ image ]
812                if hw: rn['hardware'] = [ hw ]
813                if count: rn['count'] = int(count)
814                rnodes.append(rn)
815            req['resources']= { }
816            req['resources']['node'] = rnodes
817
818        # No retry loop here.  Proxy servers must correctly authenticate
819        # themselves without help
820
821        try:
822            ctx = fedd_ssl_context(self.cert_file, 
823                    self.trusted_certs, password=self.cert_pwd)
824        except SSL.SSLError:
825            raise service_error(service_error.server_config, 
826                    "Server certificates misconfigured")
827
828        loc = feddServiceLocator();
829        port = loc.getfeddPortType(uri,
830                transport=M2Crypto.httpslib.HTTPSConnection, 
831                transdict={ 'ssl_context' : ctx })
832
833        # Reconstruct the full request message
834        msg = RequestAccessRequestMessage()
835        msg.set_element_RequestAccessRequestBody(
836                pack_soap(msg, "RequestAccessRequestBody", req))
837
838        try:
839            resp = port.RequestAccess(msg)
840        except ZSI.ParseException, e:
841            raise service_error(service_error.req,
842                    "Bad format message (XMLRPC??): %s" % str(e))
843        except ZSI.FaultException, e:
844            resp = e.fault.detail[0]
845
846        # Again, weird incompatibilities rear their head.  userRoles, which are
847        # restricted strings, seem to be encoded by ZSI as non-unicode strings
848        # in a way that confuses the pickling and XMLRPC sending systems.
849        # Explicitly unicoding them seems to fix this, though it concerns me
850        # some.  It may be that these things are actually a ZSI string
851        # subclass, and the character encoding is not the major issue.  In any
852        # case, making all the subclasses of basestring into unicode strings
853        # unifies the response format and solves the problem.
854        r = make_unicode(unpack_soap(resp))
855
856        if r.has_key('RequestAccessResponseBody'):
857            r = r['RequestAccessResponseBody']
858        else:
859            raise service_error(service_error.proxy,
860                    "Bad proxy response")
861
862        e = r['emulab']
863        p = e['project']
864        tbparam[tb] = { 
865                "boss": e['boss'],
866                "host": e['ops'],
867                "domain": e['domain'],
868                "fs": e['fileServer'],
869                "eventserver": e['eventServer'],
870                "project": unpack_id(p['name']),
871                "emulab" : e
872                }
873        # Make the testbed name be the label the user applied
874        p['testbed'] = {'localname': tb }
875
876        for u in p['user']:
877            tbparam[tb]['user'] = unpack_id(u['userID'])
878
879        for a in e['fedAttr']:
880            if a['attribute']:
881                key = translate_attr.get(a['attribute'].lower(), None)
882                if key:
883                    tbparam[tb][key]= a['value']
884       
885    def remote_splitter(self, uri, desc, master):
886
887        req = {
888                'description' : { 'ns2description': desc },
889                'master': master,
890                'include_fedkit': bool(self.fedkit)
891            }
892
893        # No retry loop here.  Proxy servers must correctly authenticate
894        # themselves without help
895        try:
896            ctx = fedd_ssl_context(self.cert_file, 
897                    self.trusted_certs, password=self.cert_pwd)
898        except SSL.SSLError:
899            raise service_error(service_error.server_config, 
900                    "Server certificates misconfigured")
901
902        loc = feddInternalServiceLocator();
903        port = loc.getfeddInternalPortType(uri,
904                transport=M2Crypto.httpslib.HTTPSConnection, 
905                transdict={ 'ssl_context' : ctx })
906
907        # Reconstruct the full request message
908        msg = Ns2SplitRequestMessage()
909        msg.set_element_Ns2SplitRequestBody(
910                pack_soap(msg, "Ns2SplitRequestBody", req))
911
912        try:
913            resp = port.Ns2Split(msg)
914        except ZSI.ParseException, e:
915            raise service_error(service_error.req,
916                    "Bad format message (XMLRPC??): %s" %
917                    str(e))
918        r = unpack_soap(resp)
919        if r.has_key('Ns2SplitResponseBody'):
920            r = r['Ns2SplitResponseBody']
921            if r.has_key('output'):
922                return r['output'].splitlines()
923            else:
924                raise service_error(service_error.proxy, 
925                        "Bad splitter response (no output)")
926        else:
927            raise service_error(service_error.proxy, "Bad splitter response")
928       
929    class current_testbed:
930        """
931        Object for collecting the current testbed description.  The testbed
932        description is saved to a file with the local testbed variables
933        subsittuted line by line.
934        """
935        def __init__(self, eid, tmpdir, fedkit):
936            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
937            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
938            self.current_testbed = None
939            self.testbed_file = None
940
941            self.def_expstart = \
942                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
943            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
944            self.def_gwstart = \
945                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
946            self.def_mgwstart = \
947                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
948            self.def_gwimage = "FBSD61-TUNNEL2";
949            self.def_gwtype = "pc";
950
951            self.eid = eid
952            self.tmpdir = tmpdir
953            self.fedkit = fedkit
954
955        def __call__(self, line, master, allocated, tbparams):
956            # Capture testbed topology descriptions
957            if self.current_testbed == None:
958                m = self.begin_testbed.match(line)
959                if m != None:
960                    self.current_testbed = m.group(1)
961                    if self.current_testbed == None:
962                        raise service_error(service_error.req,
963                                "Bad request format (unnamed testbed)")
964                    allocated[self.current_testbed] = \
965                            allocated.get(self.current_testbed,0) + 1
966                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
967                    if not os.path.exists(tb_dir):
968                        try:
969                            os.mkdir(tb_dir)
970                        except IOError:
971                            raise service_error(service_error.internal,
972                                    "Cannot create %s" % tb_dir)
973                    try:
974                        self.testbed_file = open("%s/%s.%s.tcl" %
975                                (tb_dir, self.eid, self.current_testbed), 'w')
976                    except IOError:
977                        self.testbed_file = None
978                    return True
979                else: return False
980            else:
981                m = self.end_testbed.match(line)
982                if m != None:
983                    if m.group(1) != self.current_testbed:
984                        raise service_error(service_error.internal, 
985                                "Mismatched testbed markers!?")
986                    if self.testbed_file != None: 
987                        self.testbed_file.close()
988                        self.testbed_file = None
989                    self.current_testbed = None
990                elif self.testbed_file:
991                    # Substitute variables and put the line into the local
992                    # testbed file.
993                    gwtype = tbparams[self.current_testbed].get('gwtype', 
994                            self.def_gwtype)
995                    gwimage = tbparams[self.current_testbed].get('gwimage', 
996                            self.def_gwimage)
997                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
998                            self.def_mgwstart)
999                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1000                            self.def_mexpstart)
1001                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1002                            self.def_gwstart)
1003                    expstart = tbparams[self.current_testbed].get('expstart', 
1004                            self.def_expstart)
1005                    project = tbparams[self.current_testbed].get('project')
1006                    line = re.sub("GWTYPE", gwtype, line)
1007                    line = re.sub("GWIMAGE", gwimage, line)
1008                    if self.current_testbed == master:
1009                        line = re.sub("GWSTART", mgwstart, line)
1010                        line = re.sub("EXPSTART", mexpstart, line)
1011                    else:
1012                        line = re.sub("GWSTART", gwstart, line)
1013                        line = re.sub("EXPSTART", expstart, line)
1014                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1015                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1016                    line = re.sub("EID", self.eid, line)
1017                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1018                            (project, self.eid), line)
1019                    if self.fedkit:
1020                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1021                                line)
1022                    print >>self.testbed_file, line
1023                return True
1024
1025    class allbeds:
1026        """
1027        Process the Allbeds section.  Get access to each federant and save the
1028        parameters in tbparams
1029        """
1030        def __init__(self, get_access):
1031            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1032            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1033            self.in_allbeds = False
1034            self.get_access = get_access
1035
1036        def __call__(self, line, user, tbparams):
1037            # Testbed access parameters
1038            if not self.in_allbeds:
1039                if self.begin_allbeds.match(line):
1040                    self.in_allbeds = True
1041                    return True
1042                else:
1043                    return False
1044            else:
1045                if self.end_allbeds.match(line):
1046                    self.in_allbeds = False
1047                else:
1048                    nodes = line.split('|')
1049                    tb = nodes.pop(0)
1050                    self.get_access(tb, nodes, user, tbparams)
1051                return True
1052
1053    class gateways:
1054        def __init__(self, eid, master, tmpdir, gw_pubkey,
1055                gw_secretkey, copy_file, fedkit):
1056            self.begin_gateways = \
1057                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1058            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1059            self.current_gateways = None
1060            self.control_gateway = None
1061            self.active_end = { }
1062
1063            self.eid = eid
1064            self.master = master
1065            self.tmpdir = tmpdir
1066            self.gw_pubkey_base = gw_pubkey
1067            self.gw_secretkey_base = gw_secretkey
1068
1069            self.copy_file = copy_file
1070            self.fedkit = fedkit
1071
1072
1073        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1074                active_end, tbparams, dtb, myname, desthost, type):
1075            """
1076            Produce a gateway configuration file from a gateways line.
1077            """
1078
1079            sproject = tbparams[gw].get('project', 'project')
1080            dproject = tbparams[dtb].get('project', 'project')
1081            sdomain = ".%s.%s%s" % (eid, sproject,
1082                    tbparams[gw].get('domain', ".example.com"))
1083            ddomain = ".%s.%s%s" % (eid, dproject,
1084                    tbparams[dtb].get('domain', ".example.com"))
1085            boss = tbparams[master].get('boss', "boss")
1086            fs = tbparams[master].get('fs', "fs")
1087            event_server = "%s%s" % \
1088                    (tbparams[gw].get('eventserver', "event_server"),
1089                            tbparams[gw].get('domain', "example.com"))
1090            remote_event_server = "%s%s" % \
1091                    (tbparams[dtb].get('eventserver', "event_server"),
1092                            tbparams[dtb].get('domain', "example.com"))
1093            seer_control = "%s%s" % \
1094                    (tbparams[gw].get('control', "control"), sdomain)
1095
1096            if self.fedkit:
1097                remote_script_dir = "/usr/local/federation/bin"
1098                local_script_dir = "/usr/local/federation/bin"
1099            else:
1100                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1101                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1102
1103            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1104            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1105            tunnel_cfg = tbparams[gw].get("tun", "false")
1106
1107            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1108            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1109
1110            # translate to lower case so the `hostname` hack for specifying
1111            # configuration files works.
1112            conf_file = conf_file.lower();
1113            remote_conf_file = remote_conf_file.lower();
1114
1115            if dtb == master:
1116                active = "false"
1117            elif gw == master:
1118                active = "true"
1119            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1120                active = "false"
1121            else:
1122                active_end['%s-%s' % (gw, dtb)] = 1
1123                active = "true"
1124
1125            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1126            print >>gwconfig, "Active: %s" % active
1127            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1128            print >>gwconfig, "BossName: %s" % boss
1129            print >>gwconfig, "FsName: %s" % fs
1130            print >>gwconfig, "EventServerName: %s" % event_server
1131            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1132            print >>gwconfig, "SeerControl: %s" % seer_control
1133            print >>gwconfig, "Type: %s" % type
1134            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1135            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1136                    local_script_dir
1137            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1138            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1139            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1140                    (remote_conf_dir, remote_conf_file)
1141            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1142            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1143            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1144            gwconfig.close()
1145
1146            return active == "true"
1147
1148        def __call__(self, line, allocated, tbparams):
1149            # Process gateways
1150            if not self.current_gateways:
1151                m = self.begin_gateways.match(line)
1152                if m:
1153                    self.current_gateways = m.group(1)
1154                    if allocated.has_key(self.current_gateways):
1155                        # This test should always succeed
1156                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1157                        if not os.path.exists(tb_dir):
1158                            try:
1159                                os.mkdir(tb_dir)
1160                            except IOError:
1161                                raise service_error(service_error.internal,
1162                                        "Cannot create %s" % tb_dir)
1163                    else:
1164                        # XXX
1165                        self.log.error("[gateways]: Ignoring gateways for " + \
1166                                "unknown testbed %s" % self.current_gateways)
1167                        self.current_gateways = None
1168                    return True
1169                else:
1170                    return False
1171            else:
1172                m = self.end_gateways.match(line)
1173                if m :
1174                    if m.group(1) != self.current_gateways:
1175                        raise service_error(service_error.internal,
1176                                "Mismatched gateway markers!?")
1177                    if self.control_gateway:
1178                        try:
1179                            cc = open("%s/%s/client.conf" %
1180                                    (self.tmpdir, self.current_gateways), 'w')
1181                            print >>cc, "ControlGateway: %s" % \
1182                                    self.control_gateway
1183                            if tbparams[self.master].has_key('smbshare'):
1184                                print >>cc, "SMBSHare: %s" % \
1185                                        tbparams[self.master]['smbshare']
1186                            print >>cc, "ProjectUser: %s" % \
1187                                    tbparams[self.master]['user']
1188                            print >>cc, "ProjectName: %s" % \
1189                                    tbparams[self.master]['project']
1190                            cc.close()
1191                        except IOError:
1192                            raise service_error(service_error.internal,
1193                                    "Error creating client config")
1194                        try:
1195                            cc = open("%s/%s/seer.conf" %
1196                                    (self.tmpdir, self.current_gateways),
1197                                    'w')
1198                            if self.current_gateways != self.master:
1199                                print >>cc, "ControlNode: %s" % \
1200                                        self.control_gateway
1201                            print >>cc, "ExperimentID: %s/%s" % \
1202                                    ( tbparams[self.master]['project'], \
1203                                    self.eid )
1204                            cc.close()
1205                        except IOError:
1206                            raise service_error(service_error.internal,
1207                                    "Error creating seer config")
1208                    else:
1209                        debug.error("[gateways]: No control gateway for %s" %\
1210                                    self.current_gateways)
1211                    self.current_gateways = None
1212                else:
1213                    dtb, myname, desthost, type = line.split(" ")
1214
1215                    if type == "control" or type == "both":
1216                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1217                                self.eid, 
1218                                tbparams[self.current_gateways]['project'],
1219                                tbparams[self.current_gateways]['domain'])
1220                    try:
1221                        active = self.gateway_conf_file(self.current_gateways,
1222                                self.master, self.eid, self.gw_pubkey_base,
1223                                self.gw_secretkey_base,
1224                                self.active_end, tbparams, dtb, myname,
1225                                desthost, type)
1226                    except IOError, e:
1227                        raise service_error(service_error.internal,
1228                                "Failed to write config file for %s" % \
1229                                        self.current_gateway)
1230           
1231                    gw_pubkey = "%s/keys/%s" % \
1232                            (self.tmpdir, self.gw_pubkey_base)
1233                    gw_secretkey = "%s/keys/%s" % \
1234                            (self.tmpdir, self.gw_secretkey_base)
1235
1236                    pkfile = "%s/%s/%s" % \
1237                            ( self.tmpdir, self.current_gateways, 
1238                                    self.gw_pubkey_base)
1239                    skfile = "%s/%s/%s" % \
1240                            ( self.tmpdir, self.current_gateways, 
1241                                    self.gw_secretkey_base)
1242
1243                    if not os.path.exists(pkfile):
1244                        try:
1245                            self.copy_file(gw_pubkey, pkfile)
1246                        except IOError:
1247                            service_error(service_error.internal,
1248                                    "Failed to copy pubkey file")
1249
1250                    if active and not os.path.exists(skfile):
1251                        try:
1252                            self.copy_file(gw_secretkey, skfile)
1253                        except IOError:
1254                            service_error(service_error.internal,
1255                                    "Failed to copy secretkey file")
1256                return True
1257
1258    class shunt_to_file:
1259        """
1260        Simple class to write data between two regexps to a file.
1261        """
1262        def __init__(self, begin, end, filename):
1263            """
1264            Begin shunting on a match of begin, stop on end, send data to
1265            filename.
1266            """
1267            self.begin = re.compile(begin)
1268            self.end = re.compile(end)
1269            self.in_shunt = False
1270            self.file = None
1271            self.filename = filename
1272
1273        def __call__(self, line):
1274            """
1275            Call this on each line in the input that may be shunted.
1276            """
1277            if not self.in_shunt:
1278                if self.begin.match(line):
1279                    self.in_shunt = True
1280                    try:
1281                        self.file = open(self.filename, "w")
1282                    except:
1283                        self.file = None
1284                        raise
1285                    return True
1286                else:
1287                    return False
1288            else:
1289                if self.end.match(line):
1290                    if self.file: 
1291                        self.file.close()
1292                        self.file = None
1293                    self.in_shunt = False
1294                else:
1295                    if self.file:
1296                        print >>self.file, line
1297                return True
1298
1299    class shunt_to_list:
1300        """
1301        Same interface as shunt_to_file.  Data collected in self.list, one list
1302        element per line.
1303        """
1304        def __init__(self, begin, end):
1305            self.begin = re.compile(begin)
1306            self.end = re.compile(end)
1307            self.in_shunt = False
1308            self.list = [ ]
1309       
1310        def __call__(self, line):
1311            if not self.in_shunt:
1312                if self.begin.match(line):
1313                    self.in_shunt = True
1314                    return True
1315                else:
1316                    return False
1317            else:
1318                if self.end.match(line):
1319                    self.in_shunt = False
1320                else:
1321                    self.list.append(line)
1322                return True
1323
1324    class shunt_to_string:
1325        """
1326        Same interface as shunt_to_file.  Data collected in self.str, all in
1327        one string.
1328        """
1329        def __init__(self, begin, end):
1330            self.begin = re.compile(begin)
1331            self.end = re.compile(end)
1332            self.in_shunt = False
1333            self.str = ""
1334       
1335        def __call__(self, line):
1336            if not self.in_shunt:
1337                if self.begin.match(line):
1338                    self.in_shunt = True
1339                    return True
1340                else:
1341                    return False
1342            else:
1343                if self.end.match(line):
1344                    self.in_shunt = False
1345                else:
1346                    self.str += line
1347                return True
1348
1349    def create_experiment(self, req, fid):
1350        """
1351        The external interface to experiment creation called from the
1352        dispatcher.
1353
1354        Creates a working directory, splits the incoming description using the
1355        splitter script and parses out the avrious subsections using the
1356        lcasses above.  Once each sub-experiment is created, use pooled threads
1357        to instantiate them and start it all up.
1358        """
1359        try:
1360            tmpdir = tempfile.mkdtemp(prefix="split-")
1361        except IOError:
1362            raise service_error(service_error.internal, "Cannot create tmp dir")
1363
1364        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1365        gw_secretkey_base = "fed.%s" % self.ssh_type
1366        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1367        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1368        tclfile = tmpdir + "/experiment.tcl"
1369        tbparams = { }
1370
1371        pid = "dummy"
1372        gid = "dummy"
1373        # XXX
1374        fail_soft = False
1375
1376        try:
1377            os.mkdir(tmpdir+"/keys")
1378        except OSError:
1379            raise service_error(service_error.internal,
1380                    "Can't make temporary dir")
1381
1382        req = req.get('CreateRequestBody', None)
1383        if not req:
1384            raise service_error(service_error.req,
1385                    "Bad request format (no CreateRequestBody)")
1386        # The tcl parser needs to read a file so put the content into that file
1387        descr=req.get('experimentdescription', None)
1388        if descr:
1389            file_content=descr.get('ns2description', None)
1390            if file_content:
1391                try:
1392                    f = open(tclfile, 'w')
1393                    f.write(file_content)
1394                    f.close()
1395                except IOError:
1396                    raise service_error(service_error.internal,
1397                            "Cannot write temp experiment description")
1398            else:
1399                raise service_error(service_error.req, 
1400                        "Only ns2descriptions supported")
1401        else:
1402            raise service_error(service_error.req, "No experiment description")
1403
1404        if req.has_key('experimentID') and \
1405                req['experimentID'].has_key('localname'):
1406            eid = req['experimentID']['localname']
1407            self.state_lock.acquire()
1408            while (self.state.has_key(eid)):
1409                eid += random.choice(string.ascii_letters)
1410            # To avoid another thread picking this localname
1411            self.state[eid] = "placeholder"
1412            self.state_lock.release()
1413        else:
1414            eid = self.exp_stem
1415            for i in range(0,5):
1416                eid += random.choice(string.ascii_letters)
1417            self.state_lock.acquire()
1418            while (self.state.has_key(eid)):
1419                eid = self.exp_stem
1420                for i in range(0,5):
1421                    eid += random.choice(string.ascii_letters)
1422            # To avoid another thread picking this localname
1423            self.state[eid] = "placeholder"
1424            self.state_lock.release()
1425
1426        try:
1427            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1428        except ValueError:
1429            raise service_error(service_error.server_config, 
1430                    "Bad key type (%s)" % self.ssh_type)
1431
1432        user = req.get('user', None)
1433        if user == None:
1434            raise service_error(service_error.req, "No user")
1435
1436        master = req.get('master', None)
1437        if master == None:
1438            raise service_error(service_error.req, "No master testbed label")
1439       
1440        if self.splitter_url:
1441            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1442            split_data = self.remote_splitter(self.splitter_url, file_content,
1443                    master)
1444        else:
1445            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1446                str(self.muxmax), '-m', master]
1447
1448            if self.fedkit:
1449                tclcmd.append('-k')
1450
1451            tclcmd.extend([pid, gid, eid, tclfile])
1452
1453            self.log.debug("running local splitter %s", " ".join(tclcmd))
1454            tclparser = Popen(tclcmd, stdout=PIPE)
1455            split_data = tclparser.stdout
1456
1457        allocated = { }     # Testbeds we can access
1458        started = { }       # Testbeds where a sub-experiment started
1459                            # successfully
1460
1461        # Objects to parse the splitter output (defined above)
1462        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1463        parse_allbeds = self.allbeds(self.get_access)
1464        parse_gateways = self.gateways(eid, master, tmpdir,
1465                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1466        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1467                    "^#\s+End\s+Vtopo")
1468        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1469                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1470        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1471                "^#\s+End\s+tarfiles")
1472        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1473                "^#\s+End\s+rpms")
1474
1475        # Worling on the split data
1476        for line in split_data:
1477            line = line.rstrip()
1478            if parse_current_testbed(line, master, allocated, tbparams):
1479                continue
1480            elif parse_allbeds(line, user, tbparams):
1481                continue
1482            elif parse_gateways(line, allocated, tbparams):
1483                continue
1484            elif parse_vtopo(line):
1485                continue
1486            elif parse_hostnames(line):
1487                continue
1488            elif parse_tarfiles(line):
1489                continue
1490            elif parse_rpms(line):
1491                continue
1492            else:
1493                raise service_error(service_error.internal, 
1494                        "Bad tcl parse? %s" % line)
1495
1496        # Virtual topology and visualization
1497        vtopo = self.gentopo(parse_vtopo.str)
1498        if not vtopo:
1499            raise service_error(service_error.internal, 
1500                    "Failed to generate virtual topology")
1501
1502        vis = self.genviz(vtopo)
1503        if not vis:
1504            raise service_error(service_error.internal, 
1505                    "Failed to generate visualization")
1506
1507        # save federant information
1508        for k in allocated.keys():
1509            tbparams[k]['federant'] = {\
1510                    'name': [ { 'localname' : eid} ],\
1511                    'emulab': tbparams[k]['emulab'],\
1512                    'master' : k == master,\
1513                }
1514
1515
1516        # Copy tarfiles and rpms needed at remote sites into a staging area
1517        try:
1518            if self.fedkit:
1519                parse_tarfiles.list.append(self.fedkit)
1520            for t in parse_tarfiles.list:
1521                if not os.path.exists("%s/tarfiles" % tmpdir):
1522                    os.mkdir("%s/tarfiles" % tmpdir)
1523                self.copy_file(t, "%s/tarfiles/%s" % \
1524                        (tmpdir, os.path.basename(t)))
1525            for r in parse_rpms.list:
1526                if not os.path.exists("%s/rpms" % tmpdir):
1527                    os.mkdir("%s/rpms" % tmpdir)
1528                self.copy_file(r, "%s/rpms/%s" % \
1529                        (tmpdir, os.path.basename(r)))
1530        except IOError, e:
1531            raise service_error(service_error.internal, 
1532                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1533
1534        thread_pool_info = self.thread_pool()
1535        threads = [ ]
1536
1537        for tb in [ k for k in allocated.keys() if k != master]:
1538            # Wait until we have a free slot to start the next testbed load
1539            thread_pool_info.acquire()
1540            while thread_pool_info.started - \
1541                    thread_pool_info.terminated >= self.nthreads:
1542                thread_pool_info.wait()
1543            thread_pool_info.release()
1544
1545            # Create and start a thread to start the segment, and save it to
1546            # get the return value later
1547            t  = self.pooled_thread(target=self.start_segment, 
1548                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1549                    pdata=thread_pool_info, trace_file=self.trace_file)
1550            threads.append(t)
1551            t.start()
1552
1553        # Wait until all finish (the first clause of the while is to make sure
1554        # one starts)
1555        thread_pool_info.acquire()
1556        while thread_pool_info.started == 0 or \
1557                thread_pool_info.started > thread_pool_info.terminated:
1558            thread_pool_info.wait()
1559        thread_pool_info.release()
1560
1561        # If none failed, start the master
1562        failed = [ t.getName() for t in threads if not t.rv ]
1563
1564        if len(failed) == 0:
1565            if not self.start_segment(master, eid, tbparams, tmpdir):
1566                failed.append(master)
1567
1568        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1569        # If one failed clean up, unless fail_soft is set
1570        if failed:
1571            if not fail_soft:
1572                for tb in succeeded:
1573                    self.stop_segment(tb, eid, tbparams)
1574                # Remove the placeholder
1575                self.state_lock.acquire()
1576                del self.state[eid]
1577                self.state_lock.release()
1578
1579                raise service_error(service_error.federant,
1580                    "Swap in failed on %s" % ",".join(failed))
1581        else:
1582            self.log.info("[start_segment]: Experiment %s started" % eid)
1583
1584        # Generate an ID for the experiment (slice) and a certificate that the
1585        # allocator can use to prove they own it.  We'll ship it back through
1586        # the encrypted connection.
1587        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1588
1589        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1590
1591        # Walk up tmpdir, deleting as we go
1592        for path, dirs, files in os.walk(tmpdir, topdown=False):
1593            for f in files:
1594                os.remove(os.path.join(path, f))
1595            for d in dirs:
1596                os.rmdir(os.path.join(path, d))
1597        os.rmdir(tmpdir)
1598
1599        resp = { 'federant' : [ tbparams[tb]['federant'] \
1600                for tb in tbparams.keys() \
1601                    if tbparams[tb].has_key('federant') ],\
1602                    'vtopo': vtopo,\
1603                    'vis' : vis,
1604                    'experimentID' : [\
1605                            { 'fedid': copy.copy(expid) }, \
1606                            { 'localname': eid },\
1607                        ],\
1608                    'experimentAccess': { 'X509' : expcert },\
1609                }
1610
1611        # Insert the experiment into our state and update the disk copy
1612        self.state_lock.acquire()
1613        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1614                for tb in tbparams.keys() \
1615                    if tbparams[tb].has_key('federant') ],\
1616                    'vtopo': vtopo,\
1617                    'vis' : vis,
1618                    'experimentID' : [\
1619                            { 'fedid': expid }, { 'localname': eid },\
1620                        ],\
1621                }
1622        self.state[eid] = self.state[expid]
1623        if self.state_filename: self.write_state()
1624        self.state_lock.release()
1625
1626        if not failed:
1627            return resp
1628        else:
1629            raise service_error(service_error.partial, \
1630                    "Partial swap in on %s" % ",".join(succeeded))
1631
1632
1633    def get_vtopo(self, req, fid):
1634        """
1635        Return the stored virtual topology for this experiment
1636        """
1637        rv = None
1638
1639        req = req.get('VtopoRequestBody', None)
1640        if not req:
1641            raise service_error(service_error.req,
1642                    "Bad request format (no VtopoRequestBody)")
1643        exp = req.get('experiment', None)
1644        if exp:
1645            if exp.has_key('fedid'):
1646                key = fedid(bits=exp['fedid'])
1647                keytype = "fedid"
1648            elif exp.has_key('localname'):
1649                key = exp['localname']
1650                keytype = "localname"
1651            else:
1652                raise service_error(service_error.req, "Unknown lookup type")
1653        else:
1654            raise service_error(service_error.req, "No request?")
1655
1656        self.state_lock.acquire()
1657        if self.state.has_key(key):
1658            rv = { 'experiment' : {keytype: key },\
1659                    'vtopo': self.state[key]['vtopo'],\
1660                }
1661        self.state_lock.release()
1662
1663        if rv: return rv
1664        else: raise service_error(service_error.req, "No such experiment")
1665
1666    def get_vis(self, req, fid):
1667        """
1668        Return the stored visualization for this experiment
1669        """
1670        rv = None
1671
1672        req = req.get('VisRequestBody', None)
1673        if not req:
1674            raise service_error(service_error.req,
1675                    "Bad request format (no VisRequestBody)")
1676        exp = req.get('experiment', None)
1677        if exp:
1678            if exp.has_key('fedid'):
1679                key = fedid(bits=exp['fedid'])
1680                keytype = "fedid"
1681            elif exp.has_key('localname'):
1682                key = exp['localname']
1683                keytype = "localname"
1684            else:
1685                raise service_error(service_error.req, "Unknown lookup type")
1686        else:
1687            raise service_error(service_error.req, "No request?")
1688
1689        self.state_lock.acquire()
1690        if self.state.has_key(key):
1691            rv =  { 'experiment' : {keytype: key },\
1692                    'vis': self.state[key]['vis'],\
1693                    }
1694        self.state_lock.release()
1695
1696        if rv: return rv
1697        else: raise service_error(service_error.req, "No such experiment")
1698
1699    def get_info(self, req, fid):
1700        """
1701        Return all the stored info about this experiment
1702        """
1703        rv = None
1704
1705        req = req.get('InfoRequestBody', None)
1706        if not req:
1707            raise service_error(service_error.req,
1708                    "Bad request format (no VisRequestBody)")
1709        exp = req.get('experiment', None)
1710        if exp:
1711            if exp.has_key('fedid'):
1712                key = fedid(bits=exp['fedid'])
1713                keytype = "fedid"
1714            elif exp.has_key('localname'):
1715                key = exp['localname']
1716                keytype = "localname"
1717            else:
1718                raise service_error(service_error.req, "Unknown lookup type")
1719        else:
1720            raise service_error(service_error.req, "No request?")
1721
1722        # The state may be massaged by the service function that called
1723        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1724        # state.
1725        self.state_lock.acquire()
1726        if self.state.has_key(key):
1727            rv = copy.deepcopy(self.state[key])
1728        self.state_lock.release()
1729
1730        if rv: return rv
1731        else: raise service_error(service_error.req, "No such experiment")
1732
1733
1734    def terminate_experiment(self, req, fid):
1735        """
1736        Swap this experiment out on the federants and delete the shared
1737        information
1738        """
1739        tbparams = { }
1740        req = req.get('TerminateRequestBody', None)
1741        if not req:
1742            raise service_error(service_error.req,
1743                    "Bad request format (no TerminateRequestBody)")
1744        exp = req.get('experiment', None)
1745        if exp:
1746            if exp.has_key('fedid'):
1747                key = fedid(bits=exp['fedid'])
1748                keytype = "fedid"
1749            elif exp.has_key('localname'):
1750                key = exp['localname']
1751                keytype = "localname"
1752            else:
1753                raise service_error(service_error.req, "Unknown lookup type")
1754        else:
1755            raise service_error(service_error.req, "No request?")
1756
1757        self.state_lock.acquire()
1758        fed_exp = self.state.get(key, None)
1759
1760        if fed_exp:
1761            # This branch of the conditional holds the lock to generate a
1762            # consistent temporary tbparams variable to deallocate experiments.
1763            # It releases the lock to do the deallocations and reacquires it to
1764            # remove the experiment state when the termination is complete.
1765            ids = []
1766            #  experimentID is a list of dicts that are self-describing
1767            #  identifiers.  This finds all the fedids and localnames - the
1768            #  keys of self.state - and puts them into ids.
1769            for id in fed_exp.get('experimentID', []):
1770                if id.has_key('fedid'): ids.append(id['fedid'])
1771                if id.has_key('localname'): ids.append(id['localname'])
1772
1773            # Construct enough of the tbparams to make the stop_segment calls
1774            # work
1775            for fed in fed_exp['federant']:
1776                try:
1777                    for e in fed['name']:
1778                        eid = e.get('localname', None)
1779                        if eid: break
1780                    else:
1781                        continue
1782
1783                    p = fed['emulab']['project']
1784
1785                    project = p['name']['localname']
1786                    tb = p['testbed']['localname']
1787                    user = p['user'][0]['userID']['localname']
1788
1789                    domain = fed['emulab']['domain']
1790                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1791                except KeyError, e:
1792                    continue
1793                tbparams[tb] = {\
1794                        'user': user,\
1795                        'domain': domain,\
1796                        'project': project,\
1797                        'host': host,\
1798                        'eid': eid,\
1799                    }
1800            self.state_lock.release()
1801
1802            # Stop everyone.
1803            for tb in tbparams.keys():
1804                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1805
1806            # Remove teh terminated experiment
1807            self.state_lock.acquire()
1808            for id in ids:
1809                if self.state.has_key(id): del self.state[id]
1810
1811            if self.state_filename: self.write_state()
1812            self.state_lock.release()
1813
1814            return { 'experiment': exp }
1815        else:
1816            # Don't forget to release the lock
1817            self.state_lock.release()
1818            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.