source: fedd/fedd_experiment_control.py @ 72ed6e4

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 72ed6e4 was 72ed6e4, checked in by Ted Faber <faber@…>, 16 years ago

refactor configuration parsing to make code extensions more modular

  • Property mode set to 100644
File size: 52.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
48        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
49        "fed_evrepeater", "rc.accounts.patch"]
50   
51    class thread_pool:
52        """
53        A class to keep track of a set of threads all invoked for the same
54        task.  Manages the mutual exclusion of the states.
55        """
56        def __init__(self):
57            """
58            Start a pool.
59            """
60            self.changed = Condition()
61            self.started = 0
62            self.terminated = 0
63
64        def acquire(self):
65            """
66            Get the pool's lock.
67            """
68            self.changed.acquire()
69
70        def release(self):
71            """
72            Release the pool's lock.
73            """
74            self.changed.release()
75
76        def wait(self, timeout = None):
77            """
78            Wait for a pool thread to start or stop.
79            """
80            self.changed.wait(timeout)
81
82        def start(self):
83            """
84            Called by a pool thread to report starting.
85            """
86            self.changed.acquire()
87            self.started += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def terminate(self):
92            """
93            Called by a pool thread to report finishing.
94            """
95            self.changed.acquire()
96            self.terminated += 1
97            self.changed.notifyAll()
98            self.changed.release()
99
100        def clear(self):
101            """
102            Clear all pool data.
103            """
104            self.changed.acquire()
105            self.started = 0
106            self.terminated =0
107            self.changed.notifyAll()
108            self.changed.release()
109
110    class pooled_thread(Thread):
111        """
112        One of a set of threads dedicated to a specific task.  Uses the
113        thread_pool class above for coordination.
114        """
115        def __init__(self, group=None, target=None, name=None, args=(), 
116                kwargs={}, pdata=None, trace_file=None):
117            Thread.__init__(self, group, target, name, args, kwargs)
118            self.rv = None          # Return value of the ops in this thread
119            self.exception = None   # Exception that terminated this thread
120            self.target=target      # Target function to run on start()
121            self.args = args        # Args to pass to target
122            self.kwargs = kwargs    # Additional kw args
123            self.pdata = pdata      # thread_pool for this class
124            # Logger for this thread
125            self.log = logging.getLogger("fedd.experiment_control")
126       
127        def run(self):
128            """
129            Emulate Thread.run, except add pool data manipulation and error
130            logging.
131            """
132            if self.pdata:
133                self.pdata.start()
134
135            if self.target:
136                try:
137                    self.rv = self.target(*self.args, **self.kwargs)
138                except service_error, s:
139                    self.exception = s
140                    self.log.error("Thread exception: %s %s" % \
141                            (s.code_string(), s.desc))
142                except:
143                    self.exception = sys.exc_info()[1]
144                    self.log.error(("Unexpected thread exception: %s" +\
145                            "Trace %s") % (self.exception,\
146                                traceback.format_exc()))
147            if self.pdata:
148                self.pdata.terminate()
149
150    def __init__(self, config=None):
151        """
152        Intialize the various attributes, most from the config object
153        """
154        self.scripts = fedd_experiment_control_local.scripts
155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
156        self.thread_pool = fedd_experiment_control_local.thread_pool
157
158        self.cert_file = None
159        self.cert_pwd = None
160        self.trusted_certs = None
161
162        # Walk through the various relevant certificat specifying config
163        # attributes until the local certificate attributes can be resolved.
164        # The walk is from most specific to most general specification.
165        for s in ("experiment_control", "globals"):
166            if config.has_section(s): 
167                if config.has_option(s, "cert_file"):
168                    if not self.cert_file:
169                        self.cert_file = config.get(s, "cert_file")
170                        self.cert_pwd = config.get(s, "cert_pwd")
171
172                if config.has_option(s, "trusted_certs"):
173                    if not self.trusted_certs:
174                        self.trusted_certs = config.get(s, "trusted_certs")
175
176        self.exp_stem = "fed-stem"
177        self.log = logging.getLogger("fedd.experiment_control")
178        self.muxmax = 2
179        self.nthreads = 2
180        self.randomize_experiments = False
181
182        self.scp_exec = "/usr/bin/scp"
183        self.splitter = None
184        self.ssh_exec="/usr/bin/ssh"
185        self.ssh_keygen = "/usr/bin/ssh-keygen"
186        self.ssh_identity_file = None
187
188        if config.has_section("experiment_control"):
189            self.debug = config.get("experiment_control", "create_debug")
190            self.state_filename = config.get("experiment_control", 
191                    "experiment_state_file")
192            self.scripts_dir = config.get("experiment_control",
193                    "federation_script_dir")
194        else:
195            self.debug = False
196            self.state_filename = None
197            self.scripts_dir = "/users/faber/testbed/federation"
198
199        # XXX
200        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
201        self.ssh_type = "rsa"
202        self.state = { }
203        self.state_lock = Lock()
204        self.tclsh = "/usr/local/bin/otclsh"
205        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
206        self.tbmap = { 
207                'deter':'https://users.isi.deterlab.net:23235',
208                'emulab':'https://users.isi.deterlab.net:23236',
209                'ucb':'https://users.isi.deterlab.net:23237',
210                }
211        self.trace_file = sys.stderr
212
213        self.def_expstart = \
214                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
215        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
216        self.def_gwstart = \
217                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
218        self.def_mgwstart = \
219                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
220        self.def_gwimage = "FBSD61-TUNNEL2";
221        self.def_gwtype = "pc";
222
223
224        if self.ssh_pubkey_file:
225            try:
226                f = open(self.ssh_pubkey_file, 'r')
227                self.ssh_pubkey = f.read()
228                f.close()
229            except IOError:
230                raise service_error(service_error.internal,
231                        "Cannot read sshpubkey")
232
233        # Set the logging level to the value passed in.  The getattr slieght of
234        # hand finds the logging level constant corrersponding to the string.
235        # We're a little paranoid to avoid user mayhem.
236        if config.has_option("experiment_control", "log_level"):
237            try:
238                level = int(getattr(logging, 
239                    config.get("experiment_control", "log_level").upper(),-1))
240
241                if  logging.DEBUG <= level <= logging.CRITICAL:
242                    self.log.setLevel(level)
243                else:
244                    self.log.error("Bad experiment_log value: %s" % \
245                            config.get("experiment_control", "log_level"))
246
247            except ValueError:
248                self.log.error("Bad experiment_log value: %s" % \
249                        config.experiment_log)
250
251        # Grab saved state.  OK to do this w/o locking because it's read only
252        # and only one thread should be in existence that can see self.state at
253        # this point.
254        if self.state_filename:
255            self.read_state()
256
257        # Confirm federation scripts in the right place
258        for s in self.scripts:
259            if not os.path.exists(self.scripts_dir + "/" + s):
260                raise service_error(service_error.server_config,
261                        "%s/%s not in local script dir" % (self.scripts_dir, s))
262
263        # Dispatch tables
264        self.soap_services = {\
265                'Create': make_soap_handler(\
266                        CreateRequestMessage.typecode,
267                        getattr(self, "create_experiment"), 
268                        CreateResponseMessage,
269                        "CreateResponseBody"),
270                'Vtopo': make_soap_handler(\
271                        VtopoRequestMessage.typecode,
272                        getattr(self, "get_vtopo"),
273                        VtopoResponseMessage,
274                        "VtopoResponseBody"),
275                'Vis': make_soap_handler(\
276                        VisRequestMessage.typecode,
277                        getattr(self, "get_vis"),
278                        VisResponseMessage,
279                        "VisResponseBody"),
280                'Info': make_soap_handler(\
281                        InfoRequestMessage.typecode,
282                        getattr(self, "get_info"),
283                        InfoResponseMessage,
284                        "InfoResponseBody"),
285                'Terminate': make_soap_handler(\
286                        TerminateRequestMessage.typecode,
287                        getattr(self, "terminate_experiment"),
288                        TerminateResponseMessage,
289                        "TerminateResponseBody"),
290        }
291
292        self.xmlrpc_services = {\
293                'Create': make_xmlrpc_handler(\
294                        getattr(self, "create_experiment"), 
295                        "CreateResponseBody"),
296                'Vtopo': make_xmlrpc_handler(\
297                        getattr(self, "get_vtopo"),
298                        "VtopoResponseBody"),
299                'Vis': make_xmlrpc_handler(\
300                        getattr(self, "get_vis"),
301                        "VisResponseBody"),
302                'Info': make_xmlrpc_handler(\
303                        getattr(self, "get_info"),
304                        "InfoResponseBody"),
305                'Terminate': make_xmlrpc_handler(\
306                        getattr(self, "terminate_experiment"),
307                        "TerminateResponseBody"),
308        }
309
310    def copy_file(self, src, dest, size=1024):
311        """
312        Exceedingly simple file copy.
313        """
314        s = open(src,'r')
315        d = open(dest, 'w')
316
317        buf = "x"
318        while buf != "":
319            buf = s.read(size)
320            d.write(buf)
321        s.close()
322        d.close()
323
324    # Call while holding self.state_lock
325    def write_state(self):
326        """
327        Write a new copy of experiment state after copying the existing state
328        to a backup.
329
330        State format is a simple pickling of the state dictionary.
331        """
332        if os.access(self.state_filename, os.W_OK):
333            self.copy_file(self.state_filename, \
334                    "%s.bak" % self.state_filename)
335        try:
336            f = open(self.state_filename, 'w')
337            pickle.dump(self.state, f)
338        except IOError, e:
339            self.log.error("Can't write file %s: %s" % \
340                    (self.state_filename, e))
341        except pickle.PicklingError, e:
342            self.log.error("Pickling problem: %s" % e)
343
344    # Call while holding self.state_lock
345    def read_state(self):
346        """
347        Read a new copy of experiment state.  Old state is overwritten.
348
349        State format is a simple pickling of the state dictionary.
350        """
351        try:
352            f = open(self.state_filename, "r")
353            self.state = pickle.load(f)
354            self.log.debug("[read_state]: Read state from %s" % \
355                    self.state_filename)
356        except IOError, e:
357            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
358                    % (self.state_filename, e))
359        except pickle.UnpicklingError, e:
360            self.log.warning(("[read_state]: No saved state: " + \
361                    "Unpickling failed: %s") % e)
362
363    def scp_file(self, file, user, host, dest=""):
364        """
365        scp a file to the remote host.  If debug is set the action is only
366        logged.
367        """
368
369        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
370        rv = 0
371
372        try:
373            dnull = open("/dev/null", "r")
374        except IOError:
375            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
376            dnull = Null
377
378        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
379        if not self.debug:
380            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
381            else: rv = call(scp_cmd)
382
383        return rv == 0
384
385    def ssh_cmd(self, user, host, cmd, wname=None):
386        """
387        Run a remote command on host as user.  If debug is set, the action is
388        only logged.
389        """
390        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
391
392        try:
393            dnull = open("/dev/null", "r")
394        except IOError:
395            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
396            dnull = Null
397
398        self.log.debug("[ssh_cmd]: %s" % sh_str)
399        if not self.debug:
400            if dnull:
401                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
402            else:
403                sub = Popen(sh_str, shell=True)
404            return sub.wait() == 0
405        else:
406            return True
407
408    def ship_scripts(self, host, user, dest_dir):
409        """
410        Copy the federation scripts (fedkit) to the a federant.
411        """
412        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
413            for s in self.scripts:
414                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
415                        user, host, dest_dir):
416                    return False
417            return True
418        else:
419            return False
420
421    def ship_configs(self, host, user, src_dir, dest_dir):
422        """
423        Copy federant-specific configuration files to the federant.
424        """
425        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
426            return False
427        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
428            return False
429
430        for f in os.listdir(src_dir):
431            if os.path.isdir(f):
432                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
433                        "%s/%s" % (dest_dir, f)):
434                    return False
435            else:
436                if not self.scp_file("%s/%s" % (src_dir, f), 
437                        user, host, dest_dir):
438                    return False
439        return True
440
441    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
442        """
443        Start a sub-experiment on a federant.
444
445        Get the current state, modify or create as appropriate, ship data and
446        configs and start the experiment.  There are small ordering differences
447        based on the initial state of the sub-experiment.
448        """
449        # ops node in the federant
450        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
451        user = tbparams[tb]['user']     # federant user
452        pid = tbparams[tb]['project']   # federant project
453        # XXX
454        base_confs = ( "hosts",)
455        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
456        # command to test experiment state
457        expinfo_exec = "/usr/testbed/bin/expinfo" 
458        # Configuration directories on the remote machine
459        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
460        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
461        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
462        # Regular expressions to parse the expinfo response
463        state_re = re.compile("State:\s+(\w+)")
464        no_exp_re = re.compile("^No\s+such\s+experiment")
465        state = None    # Experiment state parsed from expinfo
466        # The expinfo ssh command
467        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
468
469        # Get status
470        self.log.debug("[start_segment]: %s"% " ".join(cmd))
471        dev_null = None
472        try:
473            dev_null = open("/dev/null", "a")
474        except IOError, e:
475            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
476
477        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
478        for line in status.stdout:
479            m = state_re.match(line)
480            if m: state = m.group(1)
481            else:
482                m = no_exp_re.match(line)
483                if m: state = "none"
484        rv = status.wait()
485
486        # If the experiment is not present the subcommand returns a non-zero
487        # return value.  If we successfully parsed a "none" outcome, ignore the
488        # return code.
489        if rv != 0 and state != "none":
490            raise service_error(service_error.internal,
491                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
492
493        self.log.debug("[start_segment]: %s: %s" % (tb, state))
494        self.log.info("[start_segment]:transferring experiment to %s" % tb)
495
496        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
497            return False
498        # Clear the federation files
499        if not self.ssh_cmd(user, host, 
500                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
501            return False
502        if not self.ssh_cmd(user, host, 
503                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
504            return False
505        # Clear and create the tarfiles and rpm directories
506        for d in (tarfiles_dir, rpms_dir):
507            if not self.ssh_cmd(user, host, 
508                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
509                return False
510            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
511                    "create tarfiles"):
512                return False
513       
514        if state == 'active':
515            # Remote experiment is active.  Modify it.
516            for f in base_confs:
517                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
518                        "%s/%s" % (proj_dir, f)):
519                    return False
520            if not self.ship_scripts(host, user, proj_dir):
521                return False
522            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
523                    proj_dir):
524                return False
525            if os.path.isdir("%s/tarfiles" % tmpdir):
526                if not self.ship_configs(host, user,
527                        "%s/tarfiles" % tmpdir, tarfiles_dir):
528                    return False
529            if os.path.isdir("%s/rpms" % tmpdir):
530                if not self.ship_configs(host, user,
531                        "%s/rpms" % tmpdir, tarfiles_dir):
532                    return False
533            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
534            if not self.ssh_cmd(user, host,
535                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
536                            (pid, eid, tclfile), "modexp"):
537                return False
538            return True
539        elif state == "swapped":
540            # Remote experiment swapped out.  Modify it and swap it in.
541            for f in base_confs:
542                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
543                        "%s/%s" % (proj_dir, f)):
544                    return False
545            if not self.ship_scripts(host, user, proj_dir):
546                return False
547            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
548                    proj_dir):
549                return False
550            if os.path.isdir("%s/tarfiles" % tmpdir):
551                if not self.ship_configs(host, user,
552                        "%s/tarfiles" % tmpdir, tarfiles_dir):
553                    return False
554            if os.path.isdir("%s/rpms" % tmpdir):
555                if not self.ship_configs(host, user,
556                        "%s/rpms" % tmpdir, tarfiles_dir):
557                    return False
558            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
559            if not self.ssh_cmd(user, host,
560                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
561                    "modexp"):
562                return False
563            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
564            if not self.ssh_cmd(user, host,
565                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
566                    "swapexp"):
567                return False
568            return True
569        elif state == "none":
570            # No remote experiment.  Create one.  We do this in 2 steps so we
571            # can put the configuration files and scripts into the new
572            # experiment directories.
573
574            # Tarfiles must be present for creation to work
575            if os.path.isdir("%s/tarfiles" % tmpdir):
576                if not self.ship_configs(host, user,
577                        "%s/tarfiles" % tmpdir, tarfiles_dir):
578                    return False
579            if os.path.isdir("%s/rpms" % tmpdir):
580                if not self.ship_configs(host, user,
581                        "%s/rpms" % tmpdir, tarfiles_dir):
582                    return False
583            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
584            if not self.ssh_cmd(user, host,
585                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
586                            (pid, eid, tclfile), "startexp"):
587                return False
588            # After startexp the per-experiment directories exist
589            for f in base_confs:
590                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
591                        "%s/%s" % (proj_dir, f)):
592                    return False
593            if not self.ship_scripts(host, user, proj_dir):
594                return False
595            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
596                    proj_dir):
597                return False
598            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
599            if not self.ssh_cmd(user, host,
600                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
601                    "swapexp"):
602                return False
603            return True
604        else:
605            self.log.debug("[start_segment]:unknown state %s" % state)
606            return False
607
608    def stop_segment(self, tb, eid, tbparams):
609        """
610        Stop a sub experiment by calling swapexp on the federant
611        """
612        user = tbparams[tb]['user']
613        host = tbparams[tb]['host']
614        pid = tbparams[tb]['project']
615
616        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
617        return self.ssh_cmd(user, host,
618                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
619
620       
621    def generate_ssh_keys(self, dest, type="rsa" ):
622        """
623        Generate a set of keys for the gateways to use to talk.
624
625        Keys are of type type and are stored in the required dest file.
626        """
627        valid_types = ("rsa", "dsa")
628        t = type.lower();
629        if t not in valid_types: raise ValueError
630        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
631
632        try:
633            trace = open("/dev/null", "w")
634        except IOError:
635            raise service_error(service_error.internal,
636                    "Cannot open /dev/null??");
637
638        # May raise CalledProcessError
639        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
640        rv = call(cmd, stdout=trace, stderr=trace)
641        if rv != 0:
642            raise service_error(service_error.internal, 
643                    "Cannot generate nonce ssh keys.  %s return code %d" \
644                            % (self.ssh_keygen, rv))
645
646    def gentopo(self, str):
647        """
648        Generate the topology dtat structure from the splitter's XML
649        representation of it.
650
651        The topology XML looks like:
652            <experiment>
653                <nodes>
654                    <node><vname></vname><ips>ip1:ip2</ips></node>
655                </nodes>
656                <lans>
657                    <lan>
658                        <vname></vname><vnode></vnode><ip></ip>
659                        <bandwidth></bandwidth><member>node:port</member>
660                    </lan>
661                </lans>
662        """
663        class topo_parse:
664            """
665            Parse the topology XML and create the dats structure.
666            """
667            def __init__(self):
668                # Typing of the subelements for data conversion
669                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
670                self.int_subelements = ( 'bandwidth',)
671                self.float_subelements = ( 'delay',)
672                # The final data structure
673                self.nodes = [ ]
674                self.lans =  [ ]
675                self.topo = { \
676                        'node': self.nodes,\
677                        'lan' : self.lans,\
678                    }
679                self.element = { }  # Current element being created
680                self.chars = ""     # Last text seen
681
682            def end_element(self, name):
683                # After each sub element the contents is added to the current
684                # element or to the appropriate list.
685                if name == 'node':
686                    self.nodes.append(self.element)
687                    self.element = { }
688                elif name == 'lan':
689                    self.lans.append(self.element)
690                    self.element = { }
691                elif name in self.str_subelements:
692                    self.element[name] = self.chars
693                    self.chars = ""
694                elif name in self.int_subelements:
695                    self.element[name] = int(self.chars)
696                    self.chars = ""
697                elif name in self.float_subelements:
698                    self.element[name] = float(self.chars)
699                    self.chars = ""
700
701            def found_chars(self, data):
702                self.chars += data.rstrip()
703
704
705        tp = topo_parse();
706        parser = xml.parsers.expat.ParserCreate()
707        parser.EndElementHandler = tp.end_element
708        parser.CharacterDataHandler = tp.found_chars
709
710        parser.Parse(str)
711
712        return tp.topo
713       
714
715    def genviz(self, topo):
716        """
717        Generate the visualization the virtual topology
718        """
719
720        neato = "/usr/local/bin/neato"
721        # These are used to parse neato output and to create the visualization
722        # file.
723        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
724        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
725                "%s</type></node>"
726
727        try:
728            # Node names
729            nodes = [ n['vname'] for n in topo['node'] ]
730            topo_lans = topo['lan']
731        except KeyError:
732            raise service_error(service_error.internal, "Bad topology")
733
734        lans = { }
735        links = { }
736
737        # Walk through the virtual topology, organizing the connections into
738        # 2-node connections (links) and more-than-2-node connections (lans).
739        # When a lan is created, it's added to the list of nodes (there's a
740        # node in the visualization for the lan).
741        for l in topo_lans:
742            if links.has_key(l['vname']):
743                if len(links[l['vname']]) < 2:
744                    links[l['vname']].append(l['vnode'])
745                else:
746                    nodes.append(l['vname'])
747                    lans[l['vname']] = links[l['vname']]
748                    del links[l['vname']]
749                    lans[l['vname']].append(l['vnode'])
750            elif lans.has_key(l['vname']):
751                lans[l['vname']].append(l['vnode'])
752            else:
753                links[l['vname']] = [ l['vnode'] ]
754
755
756        # Open up a temporary file for dot to turn into a visualization
757        try:
758            df, dotname = tempfile.mkstemp()
759            dotfile = os.fdopen(df, 'w')
760        except IOError:
761            raise service_error(service_error.internal,
762                    "Failed to open file in genviz")
763
764        # Generate a dot/neato input file from the links, nodes and lans
765        try:
766            print >>dotfile, "graph G {"
767            for n in nodes:
768                print >>dotfile, '\t"%s"' % n
769            for l in links.keys():
770                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
771            for l in lans.keys():
772                for n in lans[l]:
773                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
774            print >>dotfile, "}"
775            dotfile.close()
776        except TypeError:
777            raise service_error(service_error.internal,
778                    "Single endpoint link in vtopo")
779        except IOError:
780            raise service_error(service_error.internal, "Cannot write dot file")
781
782        # Use dot to create a visualization
783        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
784                '-Gpack=true', dotname], stdout=PIPE)
785
786        # Translate dot to vis format
787        vis_nodes = [ ]
788        vis = { 'node': vis_nodes }
789        for line in dot.stdout:
790            m = vis_re.match(line)
791            if m:
792                vn = m.group(1)
793                vis_node = {'name': vn, \
794                        'x': float(m.group(2)),\
795                        'y' : float(m.group(3)),\
796                    }
797                if vn in links.keys() or vn in lans.keys():
798                    vis_node['type'] = 'lan'
799                else:
800                    vis_node['type'] = 'node'
801                vis_nodes.append(vis_node)
802        rv = dot.wait()
803
804        os.remove(dotname)
805        if rv == 0 : return vis
806        else: return None
807
808
809    def get_access(self, tb, nodes, user, tbparam):
810        """
811        Get access to testbed through fedd and set the parameters for that tb
812        """
813
814        translate_attr = {
815            'slavenodestartcmd': 'expstart',
816            'slaveconnectorstartcmd': 'gwstart',
817            'masternodestartcmd': 'mexpstart',
818            'masterconnectorstartcmd': 'mgwstart',
819            'connectorimage': 'gwimage',
820            'connectortype': 'gwtype',
821            'tunnelcfg': 'tun',
822            'smbshare': 'smbshare',
823        }
824
825        uri = self.tbmap.get(tb, None)
826        if not uri:
827            raise service_error(serice_error.server_config, 
828                    "Unknown testbed: %s" % tb)
829
830        # The basic request
831        req = {\
832                'destinationTestbed' : { 'uri' : uri },
833                'user':  user,
834                'allocID' : { 'localname': 'test' },
835                # XXX: need to get service access stright
836                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
837                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
838            }
839       
840        # node resources if any
841        if nodes != None and len(nodes) > 0:
842            rnodes = [ ]
843            for n in nodes:
844                rn = { }
845                image, hw, count = n.split(":")
846                if image: rn['image'] = [ image ]
847                if hw: rn['hardware'] = [ hw ]
848                if count: rn['count'] = int(count)
849                rnodes.append(rn)
850            req['resources']= { }
851            req['resources']['node'] = rnodes
852
853        # No retry loop here.  Proxy servers must correctly authenticate
854        # themselves without help
855
856        try:
857            ctx = fedd_ssl_context(self.cert_file, 
858                    self.trusted_certs, password=self.cert_pwd)
859        except SSL.SSLError:
860            raise service_error(service_error.server_config, 
861                    "Server certificates misconfigured")
862
863        loc = feddServiceLocator();
864        port = loc.getfeddPortType(uri,
865                transport=M2Crypto.httpslib.HTTPSConnection, 
866                transdict={ 'ssl_context' : ctx })
867
868        # Reconstruct the full request message
869        msg = RequestAccessRequestMessage()
870        msg.set_element_RequestAccessRequestBody(
871                pack_soap(msg, "RequestAccessRequestBody", req))
872
873        try:
874            resp = port.RequestAccess(msg)
875        except ZSI.ParseException, e:
876            raise service_error(service_error.req,
877                    "Bad format message (XMLRPC??): %s" %
878                    str(e))
879        r = unpack_soap(resp)
880
881        if r.has_key('RequestAccessResponseBody'):
882            r = r['RequestAccessResponseBody']
883        else:
884            raise service_error(service_error.proxy,
885                    "Bad proxy response")
886
887
888        e = r['emulab']
889        p = e['project']
890        tbparam[tb] = { 
891                "boss": e['boss'],
892                "host": e['ops'],
893                "domain": e['domain'],
894                "fs": e['fileServer'],
895                "eventserver": e['eventServer'],
896                "project": unpack_id(p['name']),
897                "emulab" : e
898                }
899        # Make the testbed name be the label the user applied
900        p['testbed'] = {'localname': tb }
901
902        for u in p['user']:
903            tbparam[tb]['user'] = unpack_id(u['userID'])
904
905        for a in e['fedAttr']:
906            if a['attribute']:
907                key = translate_attr.get(a['attribute'].lower(), None)
908                if key:
909                    tbparam[tb][key]= a['value']
910       
911    class current_testbed:
912        """
913        Object for collecting the current testbed description.  The testbed
914        description is saved to a file with the local testbed variables
915        subsittuted line by line.
916        """
917        def __init__(self, eid, tmpdir):
918            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
919            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
920            self.current_testbed = None
921            self.testbed_file = None
922
923            self.def_expstart = \
924                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
925            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
926            self.def_gwstart = \
927                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
928            self.def_mgwstart = \
929                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
930            self.def_gwimage = "FBSD61-TUNNEL2";
931            self.def_gwtype = "pc";
932
933            self.eid = eid
934            self.tmpdir = tmpdir
935
936        def __call__(self, line, master, allocated, tbparams):
937            # Capture testbed topology descriptions
938            if self.current_testbed == None:
939                m = self.begin_testbed.match(line)
940                if m != None:
941                    self.current_testbed = m.group(1)
942                    if self.current_testbed == None:
943                        raise service_error(service_error.req,
944                                "Bad request format (unnamed testbed)")
945                    allocated[self.current_testbed] = \
946                            allocated.get(self.current_testbed,0) + 1
947                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
948                    if not os.path.exists(tb_dir):
949                        try:
950                            os.mkdir(tb_dir)
951                        except IOError:
952                            raise service_error(service_error.internal,
953                                    "Cannot create %s" % tb_dir)
954                    try:
955                        self.testbed_file = open("%s/%s.%s.tcl" %
956                                (tb_dir, self.eid, self.current_testbed), 'w')
957                    except IOError:
958                        self.testbed_file = None
959                    return True
960                else: return False
961            else:
962                m = self.end_testbed.match(line)
963                if m != None:
964                    if m.group(1) != self.current_testbed:
965                        raise service_error(service_error.internal, 
966                                "Mismatched testbed markers!?")
967                    if self.testbed_file != None: 
968                        self.testbed_file.close()
969                        self.testbed_file = None
970                    self.current_testbed = None
971                elif self.testbed_file:
972                    # Substitute variables and put the line into the local
973                    # testbed file.
974                    gwtype = tbparams[self.current_testbed].get('gwtype', 
975                            self.def_gwtype)
976                    gwimage = tbparams[self.current_testbed].get('gwimage', 
977                            self.def_gwimage)
978                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
979                            self.def_mgwstart)
980                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
981                            self.def_mexpstart)
982                    gwstart = tbparams[self.current_testbed].get('gwstart', 
983                            self.def_gwstart)
984                    expstart = tbparams[self.current_testbed].get('expstart', 
985                            self.def_expstart)
986                    project = tbparams[self.current_testbed].get('project')
987                    line = re.sub("GWTYPE", gwtype, line)
988                    line = re.sub("GWIMAGE", gwimage, line)
989                    if self.current_testbed == master:
990                        line = re.sub("GWSTART", mgwstart, line)
991                        line = re.sub("EXPSTART", mexpstart, line)
992                    else:
993                        line = re.sub("GWSTART", gwstart, line)
994                        line = re.sub("EXPSTART", expstart, line)
995                    # XXX: does `` embed without doing enything else?
996                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
997                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
998                    line = re.sub("EID", self.eid, line)
999                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1000                            (project, self.eid), line)
1001                    print >>self.testbed_file, line
1002                return True
1003
1004    class allbeds:
1005        """
1006        Process the Allbeds section.  Get access to each federant and save the
1007        parameters in tbparams
1008        """
1009        def __init__(self, get_access):
1010            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1011            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1012            self.in_allbeds = False
1013            self.get_access = get_access
1014
1015        def __call__(self, line, user, tbparams):
1016            # Testbed access parameters
1017            if not self.in_allbeds:
1018                if self.begin_allbeds.match(line):
1019                    self.in_allbeds = True
1020                    return True
1021                else:
1022                    return False
1023            else:
1024                if self.end_allbeds.match(line):
1025                    self.in_allbeds = False
1026                else:
1027                    nodes = line.split('|')
1028                    tb = nodes.pop(0)
1029                    self.get_access(tb, nodes, user, tbparams)
1030                return True
1031
1032    class gateways:
1033        def __init__(self, eid, master, tmpdir, gw_pubkey,
1034                gw_secretkey, copy_file):
1035            self.begin_gateways = \
1036                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1037            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1038            self.current_gateways = None
1039            self.control_gateway = None
1040            self.active_end = { }
1041
1042            self.eid = eid
1043            self.master = master
1044            self.tmpdir = tmpdir
1045            self.gw_pubkey_base = gw_pubkey
1046            self.gw_secretkey_base = gw_secretkey
1047
1048            self.copy_file = copy_file
1049
1050
1051        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1052                active_end, tbparams, dtb, myname, desthost, type):
1053            """
1054            Produce a gateway configuration file from a gateways line.
1055            """
1056
1057            sproject = tbparams[gw].get('project', 'project')
1058            dproject = tbparams[dtb].get('project', 'project')
1059            sdomain = ".%s.%s%s" % (eid, sproject,
1060                    tbparams[gw].get('domain', ".example.com"))
1061            ddomain = ".%s.%s%s" % (eid, dproject,
1062                    tbparams[dtb].get('domain', ".example.com"))
1063            boss = tbparams[master].get('boss', "boss")
1064            fs = tbparams[master].get('fs', "fs")
1065            event_server = "%s%s" % \
1066                    (tbparams[gw].get('eventserver', "event_server"),
1067                            tbparams[gw].get('domain', "example.com"))
1068            remote_event_server = "%s%s" % \
1069                    (tbparams[dtb].get('eventserver', "event_server"),
1070                            tbparams[dtb].get('domain', "example.com"))
1071            seer_control = "%s%s" % \
1072                    (tbparams[gw].get('control', "control"), sdomain)
1073            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1074            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1075            tunnel_cfg = tbparams[gw].get("tun", "false")
1076
1077            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1078            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1079
1080            # translate to lower case so the `hostname` hack for specifying
1081            # configuration files works.
1082            conf_file = conf_file.lower();
1083            remote_conf_file = remote_conf_file.lower();
1084
1085            if dtb == master:
1086                active = "false"
1087            elif gw == master:
1088                active = "true"
1089            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1090                active = "false"
1091            else:
1092                active_end['%s-%s' % (gw, dtb)] = 1
1093                active = "true"
1094
1095            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1096            print >>gwconfig, "Active: %s" % active
1097            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1098            print >>gwconfig, "BossName: %s" % boss
1099            print >>gwconfig, "FsName: %s" % fs
1100            print >>gwconfig, "EventServerName: %s" % event_server
1101            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1102            print >>gwconfig, "SeerControl: %s" % seer_control
1103            print >>gwconfig, "Type: %s" % type
1104            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1105            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1106                    local_script_dir
1107            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1108            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1109            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1110                    (remote_script_dir, remote_conf_file)
1111            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1112            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
1113            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
1114            gwconfig.close()
1115
1116            return active == "true"
1117
1118        def __call__(self, line, allocated, tbparams):
1119            # Process gateways
1120            if not self.current_gateways:
1121                m = self.begin_gateways.match(line)
1122                if m:
1123                    self.current_gateways = m.group(1)
1124                    if allocated.has_key(self.current_gateways):
1125                        # This test should always succeed
1126                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1127                        if not os.path.exists(tb_dir):
1128                            try:
1129                                os.mkdir(tb_dir)
1130                            except IOError:
1131                                raise service_error(service_error.internal,
1132                                        "Cannot create %s" % tb_dir)
1133                    else:
1134                        # XXX
1135                        self.log.error("[gateways]: Ignoring gateways for " + \
1136                                "unknown testbed %s" % self.current_gateways)
1137                        self.current_gateways = None
1138                    return True
1139                else:
1140                    return False
1141            else:
1142                m = self.end_gateways.match(line)
1143                if m :
1144                    if m.group(1) != self.current_gateways:
1145                        raise service_error(service_error.internal,
1146                                "Mismatched gateway markers!?")
1147                    if self.control_gateway:
1148                        try:
1149                            cc = open("%s/%s/client.conf" %
1150                                    (self.tmpdir, self.current_gateways), 'w')
1151                            print >>cc, "ControlGateway: %s" % \
1152                                    self.control_gateway
1153                            if tbparams[self.master].has_key('smbshare'):
1154                                print >>cc, "SMBSHare: %s" % \
1155                                        tbparams[self.master]['smbshare']
1156                            print >>cc, "ProjectUser: %s" % \
1157                                    tbparams[self.master]['user']
1158                            print >>cc, "ProjectName: %s" % \
1159                                    tbparams[self.master]['project']
1160                            cc.close()
1161                        except IOError:
1162                            raise service_error(service_error.internal,
1163                                    "Error creating client config")
1164                        try:
1165                            cc = open("%s/%s/seer.conf" %
1166                                    (self.tmpdir, self.current_gateways),
1167                                    'w')
1168                            if self.current_gateways != self.master:
1169                                print >>cc, "ControlNode: %s" % \
1170                                        self.control_gateway
1171                            print >>cc, "ExperimentID: %s/%s" % \
1172                                    ( tbparams[self.master]['project'], \
1173                                    self.eid )
1174                            cc.close()
1175                        except IOError:
1176                            raise service_error(service_error.internal,
1177                                    "Error creating seer config")
1178                    else:
1179                        debug.error("[gateways]: No control gateway for %s" %\
1180                                    self.current_gateways)
1181                    self.current_gateways = None
1182                else:
1183                    dtb, myname, desthost, type = line.split(" ")
1184
1185                    if type == "control" or type == "both":
1186                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1187                                self.eid, 
1188                                tbparams[self.current_gateways]['project'],
1189                                tbparams[self.current_gateways]['domain'])
1190                    try:
1191                        active = self.gateway_conf_file(self.current_gateways,
1192                                self.master, self.eid, self.gw_pubkey_base,
1193                                self.gw_secretkey_base,
1194                                self.active_end, tbparams, dtb, myname,
1195                                desthost, type)
1196                    except IOError, e:
1197                        raise service_error(service_error.internal,
1198                                "Failed to write config file for %s" % \
1199                                        self.current_gateway)
1200           
1201                    gw_pubkey = "%s/keys/%s" % \
1202                            (self.tmpdir, self.gw_pubkey_base)
1203                    gw_secretkey = "%s/keys/%s" % \
1204                            (self.tmpdir, self.gw_secretkey_base)
1205
1206                    pkfile = "%s/%s/%s" % \
1207                            ( self.tmpdir, self.current_gateways, 
1208                                    self.gw_pubkey_base)
1209                    skfile = "%s/%s/%s" % \
1210                            ( self.tmpdir, self.current_gateways, 
1211                                    self.gw_secretkey_base)
1212
1213                    if not os.path.exists(pkfile):
1214                        try:
1215                            self.copy_file(gw_pubkey, pkfile)
1216                        except IOError:
1217                            service_error(service_error.internal,
1218                                    "Failed to copy pubkey file")
1219
1220                    if active and not os.path.exists(skfile):
1221                        try:
1222                            self.copy_file(gw_secretkey, skfile)
1223                        except IOError:
1224                            service_error(service_error.internal,
1225                                    "Failed to copy secretkey file")
1226                return True
1227
1228    class shunt_to_file:
1229        """
1230        Simple class to write data between two regexps to a file.
1231        """
1232        def __init__(self, begin, end, filename):
1233            """
1234            Begin shunting on a match of begin, stop on end, send data to
1235            filename.
1236            """
1237            self.begin = re.compile(begin)
1238            self.end = re.compile(end)
1239            self.in_shunt = False
1240            self.file = None
1241            self.filename = filename
1242
1243        def __call__(self, line):
1244            """
1245            Call this on each line in the input that may be shunted.
1246            """
1247            if not self.in_shunt:
1248                if self.begin.match(line):
1249                    self.in_shunt = True
1250                    try:
1251                        self.file = open(self.filename, "w")
1252                    except:
1253                        self.file = None
1254                        raise
1255                    return True
1256                else:
1257                    return False
1258            else:
1259                if self.end.match(line):
1260                    if self.file: 
1261                        self.file.close()
1262                        self.file = None
1263                    self.in_shunt = False
1264                else:
1265                    if self.file:
1266                        print >>self.file, line
1267                return True
1268
1269    class shunt_to_list:
1270        """
1271        Same interface as shunt_to_file.  Data collected in self.list, one list
1272        element per line.
1273        """
1274        def __init__(self, begin, end):
1275            self.begin = re.compile(begin)
1276            self.end = re.compile(end)
1277            self.in_shunt = False
1278            self.list = [ ]
1279       
1280        def __call__(self, line):
1281            if not self.in_shunt:
1282                if self.begin.match(line):
1283                    self.in_shunt = True
1284                    return True
1285                else:
1286                    return False
1287            else:
1288                if self.end.match(line):
1289                    self.in_shunt = False
1290                else:
1291                    self.list.append(line)
1292                return True
1293
1294    class shunt_to_string:
1295        """
1296        Same interface as shunt_to_file.  Data collected in self.str, all in
1297        one string.
1298        """
1299        def __init__(self, begin, end):
1300            self.begin = re.compile(begin)
1301            self.end = re.compile(end)
1302            self.in_shunt = False
1303            self.str = ""
1304       
1305        def __call__(self, line):
1306            if not self.in_shunt:
1307                if self.begin.match(line):
1308                    self.in_shunt = True
1309                    return True
1310                else:
1311                    return False
1312            else:
1313                if self.end.match(line):
1314                    self.in_shunt = False
1315                else:
1316                    self.str += line
1317                return True
1318
1319    def create_experiment(self, req, fid):
1320        """
1321        The external interface to experiment creation called from the
1322        dispatcher.
1323
1324        Creates a working directory, splits the incoming description using the
1325        splitter script and parses out the avrious subsections using the
1326        lcasses above.  Once each sub-experiment is created, use pooled threads
1327        to instantiate them and start it all up.
1328        """
1329        try:
1330            tmpdir = tempfile.mkdtemp(prefix="split-")
1331        except IOError:
1332            raise service_error(service_error.internal, "Cannot create tmp dir")
1333
1334        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1335        gw_secretkey_base = "fed.%s" % self.ssh_type
1336        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1337        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1338        tclfile = tmpdir + "/experiment.tcl"
1339        tbparams = { }
1340
1341        pid = "dummy"
1342        gid = "dummy"
1343        # XXX
1344        fail_soft = False
1345
1346        try:
1347            os.mkdir(tmpdir+"/keys")
1348        except OSError:
1349            raise service_error(service_error.internal,
1350                    "Can't make temporary dir")
1351
1352        req = req.get('CreateRequestBody', None)
1353        if not req:
1354            raise service_error(service_error.req,
1355                    "Bad request format (no CreateRequestBody)")
1356        # The tcl parser needs to read a file so put the content into that file
1357        descr=req.get('experimentdescription', None)
1358        if descr:
1359            file_content=descr.get('ns2description', None)
1360            if file_content:
1361                try:
1362                    f = open(tclfile, 'w')
1363                    f.write(file_content)
1364                    f.close()
1365                except IOError:
1366                    raise service_error(service_error.internal,
1367                            "Cannot write temp experiment description")
1368            else:
1369                raise service_error(service_error.req, 
1370                        "Only ns2descriptions supported")
1371        else:
1372            raise service_error(service_error.req, "No experiment description")
1373
1374        if req.has_key('experimentID') and \
1375                req['experimentID'].has_key('localname'):
1376            eid = req['experimentID']['localname']
1377            self.state_lock.acquire()
1378            while (self.state.has_key(eid)):
1379                eid += random.choice(string.ascii_letters)
1380            # To avoid another thread picking this localname
1381            self.state[eid] = "placeholder"
1382            self.state_lock.release()
1383        else:
1384            eid = self.exp_stem
1385            for i in range(0,5):
1386                eid += random.choice(string.ascii_letters)
1387            self.state_lock.acquire()
1388            while (self.state.has_key(eid)):
1389                eid = self.exp_stem
1390                for i in range(0,5):
1391                    eid += random.choice(string.ascii_letters)
1392            # To avoid another thread picking this localname
1393            self.state[eid] = "placeholder"
1394            self.state_lock.release()
1395
1396        try:
1397            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1398        except ValueError:
1399            raise service_error(service_error.server_config, 
1400                    "Bad key type (%s)" % self.ssh_type)
1401
1402        user = req.get('user', None)
1403        if user == None:
1404            raise service_error(service_error.req, "No user")
1405
1406        master = req.get('master', None)
1407        if master == None:
1408            raise service_error(service_error.req, "No master testbed label")
1409       
1410       
1411        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1412            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1413        tclparser = Popen(tclcmd, stdout=PIPE)
1414
1415        allocated = { }     # Testbeds we can access
1416        started = { }       # Testbeds where a sub-experiment started
1417                            # successfully
1418
1419        # Objects to parse the splitter output (defined above)
1420        parse_current_testbed = self.current_testbed(eid, tmpdir)
1421        parse_allbeds = self.allbeds(self.get_access)
1422        parse_gateways = self.gateways(eid, master, tmpdir,
1423                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1424        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1425                    "^#\s+End\s+Vtopo")
1426        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1427                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1428        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1429                "^#\s+End\s+tarfiles")
1430        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1431                "^#\s+End\s+rpms")
1432
1433        # Worling on the split data
1434        for line in tclparser.stdout:
1435            line = line.rstrip()
1436            if parse_current_testbed(line, master, allocated, tbparams):
1437                continue
1438            elif parse_allbeds(line, user, tbparams):
1439                continue
1440            elif parse_gateways(line, allocated, tbparams):
1441                continue
1442            elif parse_vtopo(line):
1443                continue
1444            elif parse_hostnames(line):
1445                continue
1446            elif parse_tarfiles(line):
1447                continue
1448            elif parse_rpms(line):
1449                continue
1450            else:
1451                raise service_error(service_error.internal, 
1452                        "Bad tcl parse? %s" % line)
1453
1454        # Virtual topology and visualization
1455        vtopo = self.gentopo(parse_vtopo.str)
1456        if not vtopo:
1457            raise service_error(service_error.internal, 
1458                    "Failed to generate virtual topology")
1459
1460        vis = self.genviz(vtopo)
1461        if not vis:
1462            raise service_error(service_error.internal, 
1463                    "Failed to generate visualization")
1464
1465        # save federant information
1466        for k in allocated.keys():
1467            tbparams[k]['federant'] = {\
1468                    'name': [ { 'localname' : eid} ],\
1469                    'emulab': tbparams[k]['emulab'],\
1470                    'master' : k == master,\
1471                }
1472
1473
1474        # Copy tarfiles and rpms needed at remote sites into a staging area
1475        try:
1476            for t in parse_tarfiles.list:
1477                if not os.path.exists("%s/tarfiles" % tmpdir):
1478                    os.mkdir("%s/tarfiles" % tmpdir)
1479                self.copy_file(t, "%s/tarfiles/%s" % \
1480                        (tmpdir, os.path.basename(t)))
1481            for r in parse_rpms.list:
1482                if not os.path.exists("%s/rpms" % tmpdir):
1483                    os.mkdir("%s/rpms" % tmpdir)
1484                self.copy_file(r, "%s/rpms/%s" % \
1485                        (tmpdir, os.path.basename(r)))
1486        except IOError, e:
1487            raise service_error(service_error.internal, 
1488                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1489
1490        thread_pool_info = self.thread_pool()
1491        threads = [ ]
1492
1493        for tb in [ k for k in allocated.keys() if k != master]:
1494            # Wait until we have a free slot to start the next testbed load
1495            thread_pool_info.acquire()
1496            while thread_pool_info.started - \
1497                    thread_pool_info.terminated >= self.nthreads:
1498                thread_pool_info.wait()
1499            thread_pool_info.release()
1500
1501            # Create and start a thread to start the segment, and save it to
1502            # get the return value later
1503            t  = self.pooled_thread(target=self.start_segment, 
1504                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1505                    pdata=thread_pool_info, trace_file=self.trace_file)
1506            threads.append(t)
1507            t.start()
1508
1509        # Wait until all finish (the first clause of the while is to make sure
1510        # one starts)
1511        thread_pool_info.acquire()
1512        while thread_pool_info.started == 0 or \
1513                thread_pool_info.started > thread_pool_info.terminated:
1514            thread_pool_info.wait()
1515        thread_pool_info.release()
1516
1517        # If none failed, start the master
1518        failed = [ t.getName() for t in threads if not t.rv ]
1519
1520        if len(failed) == 0:
1521            if not self.start_segment(master, eid, tbparams, tmpdir):
1522                failed.append(master)
1523
1524        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1525        # If one failed clean up, unless fail_soft is set
1526        if failed:
1527            if not fail_soft:
1528                for tb in succeeded:
1529                    self.stop_segment(tb, eid, tbparams)
1530                # Remove the placeholder
1531                self.state_lock.acquire()
1532                del self.state[eid]
1533                self.state_lock.release()
1534
1535                raise service_error(service_error.federant,
1536                    "Swap in failed on %s" % ",".join(failed))
1537        else:
1538            self.log.info("[start_segment]: Experiment %s started" % eid)
1539
1540        # Generate an ID for the experiment (slice) and a certificate that the
1541        # allocator can use to prove they own it.  We'll ship it back through
1542        # the encrypted connection.
1543        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1544
1545        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1546
1547        # Walk up tmpdir, deleting as we go
1548        for path, dirs, files in os.walk(tmpdir, topdown=False):
1549            for f in files:
1550                os.remove(os.path.join(path, f))
1551            for d in dirs:
1552                os.rmdir(os.path.join(path, d))
1553        os.rmdir(tmpdir)
1554
1555        resp = { 'federant' : [ tbparams[tb]['federant'] \
1556                for tb in tbparams.keys() \
1557                    if tbparams[tb].has_key('federant') ],\
1558                    'vtopo': vtopo,\
1559                    'vis' : vis,
1560                    'experimentID' : [\
1561                            { 'fedid': copy.copy(expid) }, \
1562                            { 'localname': eid },\
1563                        ],\
1564                    'experimentAccess': { 'X509' : expcert },\
1565                }
1566
1567        # Insert the experiment into our state and update the disk copy
1568        self.state_lock.acquire()
1569        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1570                for tb in tbparams.keys() \
1571                    if tbparams[tb].has_key('federant') ],\
1572                    'vtopo': vtopo,\
1573                    'vis' : vis,
1574                    'experimentID' : [\
1575                            { 'fedid': expid }, { 'localname': eid },\
1576                        ],\
1577                }
1578        self.state[eid] = self.state[expid]
1579        if self.state_filename: self.write_state()
1580        self.state_lock.release()
1581
1582        if not failed:
1583            return resp
1584        else:
1585            raise service_error(service_error.partial, \
1586                    "Partial swap in on %s" % ",".join(succeeded))
1587
1588
1589    def get_vtopo(self, req, fid):
1590        """
1591        Return the stored virtual topology for this experiment
1592        """
1593        rv = None
1594
1595        req = req.get('VtopoRequestBody', None)
1596        if not req:
1597            raise service_error(service_error.req,
1598                    "Bad request format (no VtopoRequestBody)")
1599        exp = req.get('experiment', None)
1600        if exp:
1601            if exp.has_key('fedid'):
1602                key = fedid(bits=exp['fedid'])
1603                keytype = "fedid"
1604            elif exp.has_key('localname'):
1605                key = exp['localname']
1606                keytype = "localname"
1607            else:
1608                raise service_error(service_error.req, "Unknown lookup type")
1609        else:
1610            raise service_error(service_error.req, "No request?")
1611
1612        self.state_lock.acquire()
1613        if self.state.has_key(key):
1614            rv = { 'experiment' : {keytype: key },\
1615                    'vtopo': self.state[key]['vtopo'],\
1616                }
1617        self.state_lock.release()
1618
1619        if rv: return rv
1620        else: raise service_error(service_error.req, "No such experiment")
1621
1622    def get_vis(self, req, fid):
1623        """
1624        Return the stored visualization for this experiment
1625        """
1626        rv = None
1627
1628        req = req.get('VisRequestBody', None)
1629        if not req:
1630            raise service_error(service_error.req,
1631                    "Bad request format (no VisRequestBody)")
1632        exp = req.get('experiment', None)
1633        if exp:
1634            if exp.has_key('fedid'):
1635                key = fedid(bits=exp['fedid'])
1636                keytype = "fedid"
1637            elif exp.has_key('localname'):
1638                key = exp['localname']
1639                keytype = "localname"
1640            else:
1641                raise service_error(service_error.req, "Unknown lookup type")
1642        else:
1643            raise service_error(service_error.req, "No request?")
1644
1645        self.state_lock.acquire()
1646        if self.state.has_key(key):
1647            rv =  { 'experiment' : {keytype: key },\
1648                    'vis': self.state[key]['vis'],\
1649                    }
1650        self.state_lock.release()
1651
1652        if rv: return rv
1653        else: raise service_error(service_error.req, "No such experiment")
1654
1655    def get_info(self, req, fid):
1656        """
1657        Return all the stored info about this experiment
1658        """
1659        rv = None
1660
1661        req = req.get('InfoRequestBody', None)
1662        if not req:
1663            raise service_error(service_error.req,
1664                    "Bad request format (no VisRequestBody)")
1665        exp = req.get('experiment', None)
1666        if exp:
1667            if exp.has_key('fedid'):
1668                key = fedid(bits=exp['fedid'])
1669                keytype = "fedid"
1670            elif exp.has_key('localname'):
1671                key = exp['localname']
1672                keytype = "localname"
1673            else:
1674                raise service_error(service_error.req, "Unknown lookup type")
1675        else:
1676            raise service_error(service_error.req, "No request?")
1677
1678        # The state may be massaged by the service function that called
1679        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1680        # state.
1681        self.state_lock.acquire()
1682        if self.state.has_key(key):
1683            rv = copy.deepcopy(self.state[key])
1684        self.state_lock.release()
1685
1686        if rv: return rv
1687        else: raise service_error(service_error.req, "No such experiment")
1688
1689
1690    def terminate_experiment(self, req, fid):
1691        """
1692        Swap this experiment out on the federants and delete the shared
1693        information
1694        """
1695        tbparams = { }
1696        req = req.get('TerminateRequestBody', None)
1697        if not req:
1698            raise service_error(service_error.req,
1699                    "Bad request format (no TerminateRequestBody)")
1700        exp = req.get('experiment', None)
1701        if exp:
1702            if exp.has_key('fedid'):
1703                key = fedid(bits=exp['fedid'])
1704                keytype = "fedid"
1705            elif exp.has_key('localname'):
1706                key = exp['localname']
1707                keytype = "localname"
1708            else:
1709                raise service_error(service_error.req, "Unknown lookup type")
1710        else:
1711            raise service_error(service_error.req, "No request?")
1712
1713        self.state_lock.acquire()
1714        fed_exp = self.state.get(key, None)
1715
1716        if fed_exp:
1717            # This branch of the conditional holds the lock to generate a
1718            # consistent temporary tbparams variable to deallocate experiments.
1719            # It releases the lock to do the deallocations and reacquires it to
1720            # remove the experiment state when the termination is complete.
1721            ids = []
1722            #  experimentID is a list of dicts that are self-describing
1723            #  identifiers.  This finds all the fedids and localnames - the
1724            #  keys of self.state - and puts them into ids.
1725            for id in fed_exp.get('experimentID', []):
1726                if id.has_key('fedid'): ids.append(id['fedid'])
1727                if id.has_key('localname'): ids.append(id['localname'])
1728
1729            # Construct enough of the tbparams to make the stop_segment calls
1730            # work
1731            for fed in fed_exp['federant']:
1732                try:
1733                    for e in fed['name']:
1734                        eid = e.get('localname', None)
1735                        if eid: break
1736                    else:
1737                        continue
1738
1739                    p = fed['emulab']['project']
1740
1741                    project = p['name']['localname']
1742                    tb = p['testbed']['localname']
1743                    user = p['user'][0]['userID']['localname']
1744
1745                    domain = fed['emulab']['domain']
1746                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1747                except KeyError, e:
1748                    continue
1749                tbparams[tb] = {\
1750                        'user': user,\
1751                        'domain': domain,\
1752                        'project': project,\
1753                        'host': host,\
1754                        'eid': eid,\
1755                    }
1756            self.state_lock.release()
1757
1758            # Stop everyone.
1759            for tb in tbparams.keys():
1760                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1761
1762            # Remove teh terminated experiment
1763            self.state_lock.acquire()
1764            for id in ids:
1765                if self.state.has_key(id): del self.state[id]
1766
1767            if self.state_filename: self.write_state()
1768            self.state_lock.release()
1769
1770            return { 'experiment': exp }
1771        else:
1772            # Don't forget to release the lock
1773            self.state_lock.release()
1774            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.