source: fedd/fedd_experiment_control.py @ d199ced

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since d199ced was d199ced, checked in by Ted Faber <faber@…>, 16 years ago

upgrade

  • Property mode set to 100644
File size: 51.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
48        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
49        "fed_evrepeater", "rc.accounts.patch"]
50   
51    class thread_pool:
52        """
53        A class to keep track of a set of threads all invoked for the same
54        task.  Manages the mutual exclusion of the states.
55        """
56        def __init__(self):
57            """
58            Start a pool.
59            """
60            self.changed = Condition()
61            self.started = 0
62            self.terminated = 0
63
64        def acquire(self):
65            """
66            Get the pool's lock.
67            """
68            self.changed.acquire()
69
70        def release(self):
71            """
72            Release the pool's lock.
73            """
74            self.changed.release()
75
76        def wait(self, timeout = None):
77            """
78            Wait for a pool thread to start or stop.
79            """
80            self.changed.wait(timeout)
81
82        def start(self):
83            """
84            Called by a pool thread to report starting.
85            """
86            self.changed.acquire()
87            self.started += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def terminate(self):
92            """
93            Called by a pool thread to report finishing.
94            """
95            self.changed.acquire()
96            self.terminated += 1
97            self.changed.notifyAll()
98            self.changed.release()
99
100        def clear(self):
101            """
102            Clear all pool data.
103            """
104            self.changed.acquire()
105            self.started = 0
106            self.terminated =0
107            self.changed.notifyAll()
108            self.changed.release()
109
110    class pooled_thread(Thread):
111        """
112        One of a set of threads dedicated to a specific task.  Uses the
113        thread_pool class above for coordination.
114        """
115        def __init__(self, group=None, target=None, name=None, args=(), 
116                kwargs={}, pdata=None, trace_file=None):
117            Thread.__init__(self, group, target, name, args, kwargs)
118            self.rv = None          # Return value of the ops in this thread
119            self.exception = None   # Exception that terminated this thread
120            self.target=target      # Target function to run on start()
121            self.args = args        # Args to pass to target
122            self.kwargs = kwargs    # Additional kw args
123            self.pdata = pdata      # thread_pool for this class
124            # Logger for this thread
125            self.log = logging.getLogger("fedd.experiment_control")
126       
127        def run(self):
128            """
129            Emulate Thread.run, except add pool data manipulation and error
130            logging.
131            """
132            if self.pdata:
133                self.pdata.start()
134
135            if self.target:
136                try:
137                    self.rv = self.target(*self.args, **self.kwargs)
138                except service_error, s:
139                    self.exception = s
140                    self.log.error("Thread exception: %s %s" % \
141                            (s.code_string(), s.desc))
142                except:
143                    self.exception = sys.exc_info()[1]
144                    self.log.error(("Unexpected thread exception: %s" +\
145                            "Trace %s") % (self.exception,\
146                                traceback.format_exc()))
147            if self.pdata:
148                self.pdata.terminate()
149
150    def __init__(self, config=None):
151        """
152        Intialize the various attributes, most from the config object
153        """
154        self.scripts = fedd_experiment_control_local.scripts
155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
156        self.thread_pool = fedd_experiment_control_local.thread_pool
157
158        self.cert_file = None
159        self.cert_pwd = None
160        self.trusted_certs = None
161
162        # Walk through the various relevant certificat specifying config
163        # attributes until the local certificate attributes can be resolved.
164        # The walk is from most specific to most general specification.
165        for p in ("create_experiment_", "proxy_", ""):
166            filen = "%scert_file" % p
167            pwn = "%scert_pwd" % p
168            trustn = "%strusted_certs" % p
169
170            if getattr(config, filen, None):
171                if not self.cert_file:
172                    self.cert_file = getattr(config, filen, None)
173                    self.cert_pwd = getattr(config, pwn, None)
174
175            if getattr(config, trustn, None):
176                if not self.trusted_certs:
177                    self.trusted_certs = getattr(config, trustn, None)
178
179        self.exp_stem = "fed-stem"
180        self.debug = config.create_debug
181        self.log = logging.getLogger("fedd.experiment_control")
182        self.muxmax = 2
183        self.nthreads = 2
184        self.randomize_experiments = False
185        self.scp_exec = "/usr/bin/scp"
186        self.scripts_dir = "/users/faber/testbed/federation"
187        self.splitter = None
188        self.ssh_exec="/usr/bin/ssh"
189        self.ssh_keygen = "/usr/bin/ssh-keygen"
190        self.ssh_identity_file = None
191        # XXX
192        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
193        self.ssh_type = "rsa"
194        self.state = { }
195        self.state_filename = config.experiment_state_file
196        self.state_lock = Lock()
197        self.tclsh = "/usr/local/bin/otclsh"
198        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
199        self.tbmap = { 
200                'deter':'https://users.isi.deterlab.net:23235',
201                'emulab':'https://users.isi.deterlab.net:23236',
202                'ucb':'https://users.isi.deterlab.net:23237',
203                }
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
208        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
209        self.def_gwstart = \
210                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
211        self.def_mgwstart = \
212                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
213        self.def_gwimage = "FBSD61-TUNNEL2";
214        self.def_gwtype = "pc";
215
216
217        if self.ssh_pubkey_file:
218            try:
219                f = open(self.ssh_pubkey_file, 'r')
220                self.ssh_pubkey = f.read()
221                f.close()
222            except IOError:
223                raise service_error(service_error.internal,
224                        "Cannot read sshpubkey")
225
226        # Set the logging level to the value passed in.  The getattr slieght of
227        # hand finds the logging level constant corrersponding to the string.
228        # We're a little paranoid to avoid user mayhem.
229        if config.experiment_log:
230            try:
231                level = int(getattr(logging, config.experiment_log.upper(),-1))
232
233                if  logging.DEBUG <= level <= logging.CRITICAL:
234                    self.log.setLevel(level)
235                else:
236                    self.log.error("Bad experiment_log value: %s" % \
237                            config.experiment_log)
238
239            except ValueError:
240                self.log.error("Bad experiment_log value: %s" % \
241                        config.experiment_log)
242
243        # Grab saved state.  OK to do this w/o locking because it's read only
244        # and only one thread should be in existence that can see self.state at
245        # this point.
246        if self.state_filename:
247            self.read_state()
248
249        # Confirm federation scripts in the right place
250        for s in self.scripts:
251            if not os.path.exists(self.scripts_dir + "/" + s):
252                raise service_error(service_error.server_config,
253                        "%s/%s not in local script dir" % (self.scripts_dir, s))
254
255        # Dispatch tables
256        self.soap_services = {\
257                'Create': make_soap_handler(\
258                        CreateRequestMessage.typecode,
259                        getattr(self, "create_experiment"), 
260                        CreateResponseMessage,
261                        "CreateResponseBody"),
262                'Vtopo': make_soap_handler(\
263                        VtopoRequestMessage.typecode,
264                        getattr(self, "get_vtopo"),
265                        VtopoResponseMessage,
266                        "VtopoResponseBody"),
267                'Vis': make_soap_handler(\
268                        VisRequestMessage.typecode,
269                        getattr(self, "get_vis"),
270                        VisResponseMessage,
271                        "VisResponseBody"),
272                'Info': make_soap_handler(\
273                        InfoRequestMessage.typecode,
274                        getattr(self, "get_info"),
275                        InfoResponseMessage,
276                        "InfoResponseBody"),
277                'Terminate': make_soap_handler(\
278                        TerminateRequestMessage.typecode,
279                        getattr(self, "terminate_experiment"),
280                        TerminateResponseMessage,
281                        "TerminateResponseBody"),
282        }
283
284        self.xmlrpc_services = {\
285                'Create': make_xmlrpc_handler(\
286                        getattr(self, "create_experiment"), 
287                        "CreateResponseBody"),
288                'Vtopo': make_xmlrpc_handler(\
289                        getattr(self, "get_vtopo"),
290                        "VtopoResponseBody"),
291                'Vis': make_xmlrpc_handler(\
292                        getattr(self, "get_vis"),
293                        "VisResponseBody"),
294                'Info': make_xmlrpc_handler(\
295                        getattr(self, "get_info"),
296                        "InfoResponseBody"),
297                'Terminate': make_xmlrpc_handler(\
298                        getattr(self, "terminate_experiment"),
299                        "TerminateResponseBody"),
300        }
301
302    def copy_file(self, src, dest, size=1024):
303        """
304        Exceedingly simple file copy.
305        """
306        s = open(src,'r')
307        d = open(dest, 'w')
308
309        buf = "x"
310        while buf != "":
311            buf = s.read(size)
312            d.write(buf)
313        s.close()
314        d.close()
315
316    # Call while holding self.state_lock
317    def write_state(self):
318        """
319        Write a new copy of experiment state after copying the existing state
320        to a backup.
321
322        State format is a simple pickling of the state dictionary.
323        """
324        if os.access(self.state_filename, os.W_OK):
325            self.copy_file(self.state_filename, \
326                    "%s.bak" % self.state_filename)
327        try:
328            f = open(self.state_filename, 'w')
329            pickle.dump(self.state, f)
330        except IOError, e:
331            self.log.error("Can't write file %s: %s" % \
332                    (self.state_filename, e))
333        except pickle.PicklingError, e:
334            self.log.error("Pickling problem: %s" % e)
335
336    # Call while holding self.state_lock
337    def read_state(self):
338        """
339        Read a new copy of experiment state.  Old state is overwritten.
340
341        State format is a simple pickling of the state dictionary.
342        """
343        try:
344            f = open(self.state_filename, "r")
345            self.state = pickle.load(f)
346            self.log.debug("[read_state]: Read state from %s" % \
347                    self.state_filename)
348        except IOError, e:
349            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
350                    % (self.state_filename, e))
351        except pickle.UnpicklingError, e:
352            self.log.warning(("[read_state]: No saved state: " + \
353                    "Unpickling failed: %s") % e)
354
355    def scp_file(self, file, user, host, dest=""):
356        """
357        scp a file to the remote host.  If debug is set the action is only
358        logged.
359        """
360
361        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
362        rv = 0
363
364        try:
365            dnull = open("/dev/null", "r")
366        except IOError:
367            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
368            dnull = Null
369
370        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
371        if not self.debug:
372            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
373            else: rv = call(scp_cmd)
374
375        return rv == 0
376
377    def ssh_cmd(self, user, host, cmd, wname=None):
378        """
379        Run a remote command on host as user.  If debug is set, the action is
380        only logged.
381        """
382        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
383
384        try:
385            dnull = open("/dev/null", "r")
386        except IOError:
387            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
388            dnull = Null
389
390        self.log.debug("[ssh_cmd]: %s" % sh_str)
391        if not self.debug:
392            if dnull:
393                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
394            else:
395                sub = Popen(sh_str, shell=True)
396            return sub.wait() == 0
397        else:
398            return True
399
400    def ship_scripts(self, host, user, dest_dir):
401        """
402        Copy the federation scripts (fedkit) to the a federant.
403        """
404        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
405            for s in self.scripts:
406                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
407                        user, host, dest_dir):
408                    return False
409            return True
410        else:
411            return False
412
413    def ship_configs(self, host, user, src_dir, dest_dir):
414        """
415        Copy federant-specific configuration files to the federant.
416        """
417        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
418            return False
419        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
420            return False
421
422        for f in os.listdir(src_dir):
423            if os.path.isdir(f):
424                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
425                        "%s/%s" % (dest_dir, f)):
426                    return False
427            else:
428                if not self.scp_file("%s/%s" % (src_dir, f), 
429                        user, host, dest_dir):
430                    return False
431        return True
432
433    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
434        """
435        Start a sub-experiment on a federant.
436
437        Get the current state, modify or create as appropriate, ship data and
438        configs and start the experiment.  There are small ordering differences
439        based on the initial state of the sub-experiment.
440        """
441        # ops node in the federant
442        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
443        user = tbparams[tb]['user']     # federant user
444        pid = tbparams[tb]['project']   # federant project
445        # XXX
446        base_confs = ( "hosts",)
447        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
448        # command to test experiment state
449        expinfo_exec = "/usr/testbed/bin/expinfo" 
450        # Configuration directories on the remote machine
451        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
452        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
453        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
454        # Regular expressions to parse the expinfo response
455        state_re = re.compile("State:\s+(\w+)")
456        no_exp_re = re.compile("^No\s+such\s+experiment")
457        state = None    # Experiment state parsed from expinfo
458        # The expinfo ssh command
459        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
460
461        # Get status
462        self.log.debug("[start_segment]: %s"% " ".join(cmd))
463        dev_null = None
464        try:
465            dev_null = open("/dev/null", "a")
466        except IOError, e:
467            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
468
469        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
470        for line in status.stdout:
471            m = state_re.match(line)
472            if m: state = m.group(1)
473            else:
474                m = no_exp_re.match(line)
475                if m: state = "none"
476        rv = status.wait()
477
478        # If the experiment is not present the subcommand returns a non-zero
479        # return value.  If we successfully parsed a "none" outcome, ignore the
480        # return code.
481        if rv != 0 and state != "none":
482            raise service_error(service_error.internal,
483                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
484
485        self.log.debug("[start_segment]: %s: %s" % (tb, state))
486        self.log.info("[start_segment]:transferring experiment to %s" % tb)
487
488        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
489            return False
490        # Clear the federation files
491        if not self.ssh_cmd(user, host, 
492                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
493            return False
494        if not self.ssh_cmd(user, host, 
495                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
496            return False
497        # Clear and create the tarfiles and rpm directories
498        for d in (tarfiles_dir, rpms_dir):
499            if not self.ssh_cmd(user, host, 
500                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
501                return False
502            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
503                    "create tarfiles"):
504                return False
505       
506        if state == 'active':
507            # Remote experiment is active.  Modify it.
508            for f in base_confs:
509                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
510                        "%s/%s" % (proj_dir, f)):
511                    return False
512            if not self.ship_scripts(host, user, proj_dir):
513                return False
514            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
515                    proj_dir):
516                return False
517            if os.path.isdir("%s/tarfiles" % tmpdir):
518                if not self.ship_configs(host, user,
519                        "%s/tarfiles" % tmpdir, tarfiles_dir):
520                    return False
521            if os.path.isdir("%s/rpms" % tmpdir):
522                if not self.ship_configs(host, user,
523                        "%s/rpms" % tmpdir, tarfiles_dir):
524                    return False
525            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
526            if not self.ssh_cmd(user, host,
527                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
528                            (pid, eid, tclfile), "modexp"):
529                return False
530            return True
531        elif state == "swapped":
532            # Remote experiment swapped out.  Modify it and swap it in.
533            for f in base_confs:
534                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
535                        "%s/%s" % (proj_dir, f)):
536                    return False
537            if not self.ship_scripts(host, user, proj_dir):
538                return False
539            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
540                    proj_dir):
541                return False
542            if os.path.isdir("%s/tarfiles" % tmpdir):
543                if not self.ship_configs(host, user,
544                        "%s/tarfiles" % tmpdir, tarfiles_dir):
545                    return False
546            if os.path.isdir("%s/rpms" % tmpdir):
547                if not self.ship_configs(host, user,
548                        "%s/rpms" % tmpdir, tarfiles_dir):
549                    return False
550            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
551            if not self.ssh_cmd(user, host,
552                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
553                    "modexp"):
554                return False
555            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
556            if not self.ssh_cmd(user, host,
557                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
558                    "swapexp"):
559                return False
560            return True
561        elif state == "none":
562            # No remote experiment.  Create one.  We do this in 2 steps so we
563            # can put the configuration files and scripts into the new
564            # experiment directories.
565
566            # Tarfiles must be present for creation to work
567            if os.path.isdir("%s/tarfiles" % tmpdir):
568                if not self.ship_configs(host, user,
569                        "%s/tarfiles" % tmpdir, tarfiles_dir):
570                    return False
571            if os.path.isdir("%s/rpms" % tmpdir):
572                if not self.ship_configs(host, user,
573                        "%s/rpms" % tmpdir, tarfiles_dir):
574                    return False
575            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
576            if not self.ssh_cmd(user, host,
577                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
578                            (pid, eid, tclfile), "startexp"):
579                return False
580            # After startexp the per-experiment directories exist
581            for f in base_confs:
582                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
583                        "%s/%s" % (proj_dir, f)):
584                    return False
585            if not self.ship_scripts(host, user, proj_dir):
586                return False
587            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
588                    proj_dir):
589                return False
590            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
591            if not self.ssh_cmd(user, host,
592                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
593                    "swapexp"):
594                return False
595            return True
596        else:
597            self.log.debug("[start_segment]:unknown state %s" % state)
598            return False
599
600    def stop_segment(self, tb, eid, tbparams):
601        """
602        Stop a sub experiment by calling swapexp on the federant
603        """
604        user = tbparams[tb]['user']
605        host = tbparams[tb]['host']
606        pid = tbparams[tb]['project']
607
608        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
609        return self.ssh_cmd(user, host,
610                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
611
612       
613    def generate_ssh_keys(self, dest, type="rsa" ):
614        """
615        Generate a set of keys for the gateways to use to talk.
616
617        Keys are of type type and are stored in the required dest file.
618        """
619        valid_types = ("rsa", "dsa")
620        t = type.lower();
621        if t not in valid_types: raise ValueError
622        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
623
624        try:
625            trace = open("/dev/null", "w")
626        except IOError:
627            raise service_error(service_error.internal,
628                    "Cannot open /dev/null??");
629
630        # May raise CalledProcessError
631        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
632        rv = call(cmd, stdout=trace, stderr=trace)
633        if rv != 0:
634            raise service_error(service_error.internal, 
635                    "Cannot generate nonce ssh keys.  %s return code %d" \
636                            % (self.ssh_keygen, rv))
637
638    def gentopo(self, str):
639        """
640        Generate the topology dtat structure from the splitter's XML
641        representation of it.
642
643        The topology XML looks like:
644            <experiment>
645                <nodes>
646                    <node><vname></vname><ips>ip1:ip2</ips></node>
647                </nodes>
648                <lans>
649                    <lan>
650                        <vname></vname><vnode></vnode><ip></ip>
651                        <bandwidth></bandwidth><member>node:port</member>
652                    </lan>
653                </lans>
654        """
655        class topo_parse:
656            """
657            Parse the topology XML and create the dats structure.
658            """
659            def __init__(self):
660                # Typing of the subelements for data conversion
661                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
662                self.int_subelements = ( 'bandwidth',)
663                self.float_subelements = ( 'delay',)
664                # The final data structure
665                self.nodes = [ ]
666                self.lans =  [ ]
667                self.topo = { \
668                        'node': self.nodes,\
669                        'lan' : self.lans,\
670                    }
671                self.element = { }  # Current element being created
672                self.chars = ""     # Last text seen
673
674            def end_element(self, name):
675                # After each sub element the contents is added to the current
676                # element or to the appropriate list.
677                if name == 'node':
678                    self.nodes.append(self.element)
679                    self.element = { }
680                elif name == 'lan':
681                    self.lans.append(self.element)
682                    self.element = { }
683                elif name in self.str_subelements:
684                    self.element[name] = self.chars
685                    self.chars = ""
686                elif name in self.int_subelements:
687                    self.element[name] = int(self.chars)
688                    self.chars = ""
689                elif name in self.float_subelements:
690                    self.element[name] = float(self.chars)
691                    self.chars = ""
692
693            def found_chars(self, data):
694                self.chars += data.rstrip()
695
696
697        tp = topo_parse();
698        parser = xml.parsers.expat.ParserCreate()
699        parser.EndElementHandler = tp.end_element
700        parser.CharacterDataHandler = tp.found_chars
701
702        parser.Parse(str)
703
704        return tp.topo
705       
706
707    def genviz(self, topo):
708        """
709        Generate the visualization the virtual topology
710        """
711
712        neato = "/usr/local/bin/neato"
713        # These are used to parse neato output and to create the visualization
714        # file.
715        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
716        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
717                "%s</type></node>"
718
719        try:
720            # Node names
721            nodes = [ n['vname'] for n in topo['node'] ]
722            topo_lans = topo['lan']
723        except KeyError:
724            raise service_error(service_error.internal, "Bad topology")
725
726        lans = { }
727        links = { }
728
729        # Walk through the virtual topology, organizing the connections into
730        # 2-node connections (links) and more-than-2-node connections (lans).
731        # When a lan is created, it's added to the list of nodes (there's a
732        # node in the visualization for the lan).
733        for l in topo_lans:
734            if links.has_key(l['vname']):
735                if len(links[l['vname']]) < 2:
736                    links[l['vname']].append(l['vnode'])
737                else:
738                    nodes.append(l['vname'])
739                    lans[l['vname']] = links[l['vname']]
740                    del links[l['vname']]
741                    lans[l['vname']].append(l['vnode'])
742            elif lans.has_key(l['vname']):
743                lans[l['vname']].append(l['vnode'])
744            else:
745                links[l['vname']] = [ l['vnode'] ]
746
747
748        # Open up a temporary file for dot to turn into a visualization
749        try:
750            df, dotname = tempfile.mkstemp()
751            dotfile = os.fdopen(df, 'w')
752        except IOError:
753            raise service_error(service_error.internal,
754                    "Failed to open file in genviz")
755
756        # Generate a dot/neato input file from the links, nodes and lans
757        try:
758            print >>dotfile, "graph G {"
759            for n in nodes:
760                print >>dotfile, '\t"%s"' % n
761            for l in links.keys():
762                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
763            for l in lans.keys():
764                for n in lans[l]:
765                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
766            print >>dotfile, "}"
767            dotfile.close()
768        except TypeError:
769            raise service_error(service_error.internal,
770                    "Single endpoint link in vtopo")
771        except IOError:
772            raise service_error(service_error.internal, "Cannot write dot file")
773
774        # Use dot to create a visualization
775        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
776                '-Gpack=true', dotname], stdout=PIPE)
777
778        # Translate dot to vis format
779        vis_nodes = [ ]
780        vis = { 'node': vis_nodes }
781        for line in dot.stdout:
782            m = vis_re.match(line)
783            if m:
784                vn = m.group(1)
785                vis_node = {'name': vn, \
786                        'x': float(m.group(2)),\
787                        'y' : float(m.group(3)),\
788                    }
789                if vn in links.keys() or vn in lans.keys():
790                    vis_node['type'] = 'lan'
791                else:
792                    vis_node['type'] = 'node'
793                vis_nodes.append(vis_node)
794        rv = dot.wait()
795
796        os.remove(dotname)
797        if rv == 0 : return vis
798        else: return None
799
800
801    def get_access(self, tb, nodes, user, tbparam):
802        """
803        Get access to testbed through fedd and set the parameters for that tb
804        """
805
806        translate_attr = {
807            'slavenodestartcmd': 'expstart',
808            'slaveconnectorstartcmd': 'gwstart',
809            'masternodestartcmd': 'mexpstart',
810            'masterconnectorstartcmd': 'mgwstart',
811            'connectorimage': 'gwimage',
812            'connectortype': 'gwtype',
813            'tunnelcfg': 'tun',
814            'smbshare': 'smbshare',
815        }
816
817        # XXX multi-level access
818        uri = self.tbmap.get(tb, None)
819        if not uri:
820            raise service_error(serice_error.server_config, 
821                    "Unknown testbed: %s" % tb)
822
823        # The basic request
824        req = {\
825                'destinationTestbed' : { 'uri' : uri },
826                'user':  user,
827                'allocID' : { 'localname': 'test' },
828                'access' : [ { 'sshPubkey' : self.ssh_pubkey } ]
829            }
830       
831        # node resources if any
832        if nodes != None and len(nodes) > 0:
833            rnodes = [ ]
834            for n in nodes:
835                rn = { }
836                image, hw, count = n.split(":")
837                if image: rn['image'] = [ image ]
838                if hw: rn['hardware'] = [ hw ]
839                if count: rn['count'] = int(count)
840                rnodes.append(rn)
841            req['resources']= { }
842            req['resources']['node'] = rnodes
843
844        # No retry loop here.  Proxy servers must correctly authenticate
845        # themselves without help
846
847        try:
848            ctx = fedd_ssl_context(self.cert_file, 
849                    self.trusted_certs, password=self.cert_pwd)
850        except SSL.SSLError:
851            raise service_error(service_error.server_config, 
852                    "Server certificates misconfigured")
853
854        loc = feddServiceLocator();
855        port = loc.getfeddPortType(uri,
856                transport=M2Crypto.httpslib.HTTPSConnection, 
857                transdict={ 'ssl_context' : ctx })
858
859        # Reconstruct the full request message
860        msg = RequestAccessRequestMessage()
861        msg.set_element_RequestAccessRequestBody(
862                pack_soap(msg, "RequestAccessRequestBody", req))
863
864        try:
865            resp = port.RequestAccess(msg)
866        except ZSI.ParseException, e:
867            raise service_error(service_error.req,
868                    "Bad format message (XMLRPC??): %s" %
869                    str(e))
870        r = unpack_soap(resp)
871
872        if r.has_key('RequestAccessResponseBody'):
873            r = r['RequestAccessResponseBody']
874        else:
875            raise service_error(service_error.proxy,
876                    "Bad proxy response")
877
878
879        e = r['emulab']
880        p = e['project']
881        tbparam[tb] = { 
882                "boss": e['boss'],
883                "host": e['ops'],
884                "domain": e['domain'],
885                "fs": e['fileServer'],
886                "eventserver": e['eventServer'],
887                "project": unpack_id(p['name']),
888                "emulab" : e
889                }
890        # Make the testbed name be the label the user applied
891        p['testbed'] = {'localname': tb }
892
893        for u in p['user']:
894            tbparam[tb]['user'] = unpack_id(u['userID'])
895
896        for a in e['fedAttr']:
897            if a['attribute']:
898                key = translate_attr.get(a['attribute'].lower(), None)
899                if key:
900                    tbparam[tb][key]= a['value']
901       
902    class current_testbed:
903        """
904        Object for collecting the current testbed description.  The testbed
905        description is saved to a file with the local testbed variables
906        subsittuted line by line.
907        """
908        def __init__(self, eid, tmpdir):
909            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
910            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
911            self.current_testbed = None
912            self.testbed_file = None
913
914            self.def_expstart = \
915                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
916            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
917            self.def_gwstart = \
918                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
919            self.def_mgwstart = \
920                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
921            self.def_gwimage = "FBSD61-TUNNEL2";
922            self.def_gwtype = "pc";
923
924            self.eid = eid
925            self.tmpdir = tmpdir
926
927        def __call__(self, line, master, allocated, tbparams):
928            # Capture testbed topology descriptions
929            if self.current_testbed == None:
930                m = self.begin_testbed.match(line)
931                if m != None:
932                    self.current_testbed = m.group(1)
933                    if self.current_testbed == None:
934                        raise service_error(service_error.req,
935                                "Bad request format (unnamed testbed)")
936                    allocated[self.current_testbed] = \
937                            allocated.get(self.current_testbed,0) + 1
938                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
939                    if not os.path.exists(tb_dir):
940                        try:
941                            os.mkdir(tb_dir)
942                        except IOError:
943                            raise service_error(service_error.internal,
944                                    "Cannot create %s" % tb_dir)
945                    try:
946                        self.testbed_file = open("%s/%s.%s.tcl" %
947                                (tb_dir, self.eid, self.current_testbed), 'w')
948                    except IOError:
949                        self.testbed_file = None
950                    return True
951                else: return False
952            else:
953                m = self.end_testbed.match(line)
954                if m != None:
955                    if m.group(1) != self.current_testbed:
956                        raise service_error(service_error.internal, 
957                                "Mismatched testbed markers!?")
958                    if self.testbed_file != None: 
959                        self.testbed_file.close()
960                        self.testbed_file = None
961                    self.current_testbed = None
962                elif self.testbed_file:
963                    # Substitute variables and put the line into the local
964                    # testbed file.
965                    gwtype = tbparams[self.current_testbed].get('gwtype', 
966                            self.def_gwtype)
967                    gwimage = tbparams[self.current_testbed].get('gwimage', 
968                            self.def_gwimage)
969                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
970                            self.def_mgwstart)
971                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
972                            self.def_mexpstart)
973                    gwstart = tbparams[self.current_testbed].get('gwstart', 
974                            self.def_gwstart)
975                    expstart = tbparams[self.current_testbed].get('expstart', 
976                            self.def_expstart)
977                    project = tbparams[self.current_testbed].get('project')
978                    line = re.sub("GWTYPE", gwtype, line)
979                    line = re.sub("GWIMAGE", gwimage, line)
980                    if self.current_testbed == master:
981                        line = re.sub("GWSTART", mgwstart, line)
982                        line = re.sub("EXPSTART", mexpstart, line)
983                    else:
984                        line = re.sub("GWSTART", gwstart, line)
985                        line = re.sub("EXPSTART", expstart, line)
986                    # XXX: does `` embed without doing enything else?
987                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
988                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
989                    line = re.sub("EID", self.eid, line)
990                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
991                            (project, self.eid), line)
992                    print >>self.testbed_file, line
993                return True
994
995    class allbeds:
996        """
997        Process the Allbeds section.  Get access to each federant and save the
998        parameters in tbparams
999        """
1000        def __init__(self, get_access):
1001            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1002            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1003            self.in_allbeds = False
1004            self.get_access = get_access
1005
1006        def __call__(self, line, user, tbparams):
1007            # Testbed access parameters
1008            if not self.in_allbeds:
1009                if self.begin_allbeds.match(line):
1010                    self.in_allbeds = True
1011                    return True
1012                else:
1013                    return False
1014            else:
1015                if self.end_allbeds.match(line):
1016                    self.in_allbeds = False
1017                else:
1018                    nodes = line.split('|')
1019                    tb = nodes.pop(0)
1020                    self.get_access(tb, nodes, user, tbparams)
1021                return True
1022
1023    class gateways:
1024        def __init__(self, eid, master, tmpdir, gw_pubkey,
1025                gw_secretkey, copy_file):
1026            self.begin_gateways = \
1027                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1028            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1029            self.current_gateways = None
1030            self.control_gateway = None
1031            self.active_end = { }
1032
1033            self.eid = eid
1034            self.master = master
1035            self.tmpdir = tmpdir
1036            self.gw_pubkey_base = gw_pubkey
1037            self.gw_secretkey_base = gw_secretkey
1038
1039            self.copy_file = copy_file
1040
1041
1042        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1043                active_end, tbparams, dtb, myname, desthost, type):
1044            """
1045            Produce a gateway configuration file from a gateways line.
1046            """
1047
1048            sproject = tbparams[gw].get('project', 'project')
1049            dproject = tbparams[dtb].get('project', 'project')
1050            sdomain = ".%s.%s%s" % (eid, sproject,
1051                    tbparams[gw].get('domain', ".example.com"))
1052            ddomain = ".%s.%s%s" % (eid, dproject,
1053                    tbparams[dtb].get('domain', ".example.com"))
1054            boss = tbparams[master].get('boss', "boss")
1055            fs = tbparams[master].get('fs', "fs")
1056            event_server = "%s%s" % \
1057                    (tbparams[gw].get('eventserver', "event_server"),
1058                            tbparams[gw].get('domain', "example.com"))
1059            remote_event_server = "%s%s" % \
1060                    (tbparams[dtb].get('eventserver', "event_server"),
1061                            tbparams[dtb].get('domain', "example.com"))
1062            seer_control = "%s%s" % \
1063                    (tbparams[gw].get('control', "control"), sdomain)
1064            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1065            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1066            tunnel_cfg = tbparams[gw].get("tun", "false")
1067
1068            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1069            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1070
1071            # translate to lower case so the `hostname` hack for specifying
1072            # configuration files works.
1073            conf_file = conf_file.lower();
1074            remote_conf_file = remote_conf_file.lower();
1075
1076            if dtb == master:
1077                active = "false"
1078            elif gw == master:
1079                active = "true"
1080            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1081                active = "false"
1082            else:
1083                active_end['%s-%s' % (gw, dtb)] = 1
1084                active = "true"
1085
1086            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1087            print >>gwconfig, "Active: %s" % active
1088            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1089            print >>gwconfig, "BossName: %s" % boss
1090            print >>gwconfig, "FsName: %s" % fs
1091            print >>gwconfig, "EventServerName: %s" % event_server
1092            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1093            print >>gwconfig, "SeerControl: %s" % seer_control
1094            print >>gwconfig, "Type: %s" % type
1095            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1096            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1097                    local_script_dir
1098            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1099            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1100            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1101                    (remote_script_dir, remote_conf_file)
1102            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1103            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
1104            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
1105            gwconfig.close()
1106
1107            return active == "true"
1108
1109        def __call__(self, line, allocated, tbparams):
1110            # Process gateways
1111            if not self.current_gateways:
1112                m = self.begin_gateways.match(line)
1113                if m:
1114                    self.current_gateways = m.group(1)
1115                    if allocated.has_key(self.current_gateways):
1116                        # This test should always succeed
1117                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1118                        if not os.path.exists(tb_dir):
1119                            try:
1120                                os.mkdir(tb_dir)
1121                            except IOError:
1122                                raise service_error(service_error.internal,
1123                                        "Cannot create %s" % tb_dir)
1124                    else:
1125                        # XXX
1126                        self.log.error("[gateways]: Ignoring gateways for " + \
1127                                "unknown testbed %s" % self.current_gateways)
1128                        self.current_gateways = None
1129                    return True
1130                else:
1131                    return False
1132            else:
1133                m = self.end_gateways.match(line)
1134                if m :
1135                    if m.group(1) != self.current_gateways:
1136                        raise service_error(service_error.internal,
1137                                "Mismatched gateway markers!?")
1138                    if self.control_gateway:
1139                        try:
1140                            cc = open("%s/%s/client.conf" %
1141                                    (self.tmpdir, self.current_gateways), 'w')
1142                            print >>cc, "ControlGateway: %s" % \
1143                                    self.control_gateway
1144                            if tbparams[self.master].has_key('smbshare'):
1145                                print >>cc, "SMBSHare: %s" % \
1146                                        tbparams[self.master]['smbshare']
1147                            print >>cc, "ProjectUser: %s" % \
1148                                    tbparams[self.master]['user']
1149                            print >>cc, "ProjectName: %s" % \
1150                                    tbparams[self.master]['project']
1151                            cc.close()
1152                        except IOError:
1153                            raise service_error(service_error.internal,
1154                                    "Error creating client config")
1155                        try:
1156                            cc = open("%s/%s/seer.conf" %
1157                                    (self.tmpdir, self.current_gateways),
1158                                    'w')
1159                            if self.current_gateways != self.master:
1160                                print >>cc, "ControlNode: %s" % \
1161                                        self.control_gateway
1162                            print >>cc, "ExperimentID: %s/%s" % \
1163                                    ( tbparams[self.master]['project'], \
1164                                    self.eid )
1165                            cc.close()
1166                        except IOError:
1167                            raise service_error(service_error.internal,
1168                                    "Error creating seer config")
1169                    else:
1170                        debug.error("[gateways]: No control gateway for %s" %\
1171                                    self.current_gateways)
1172                    self.current_gateways = None
1173                else:
1174                    dtb, myname, desthost, type = line.split(" ")
1175
1176                    if type == "control" or type == "both":
1177                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1178                                self.eid, 
1179                                tbparams[self.current_gateways]['project'],
1180                                tbparams[self.current_gateways]['domain'])
1181                    try:
1182                        active = self.gateway_conf_file(self.current_gateways,
1183                                self.master, self.eid, self.gw_pubkey_base,
1184                                self.gw_secretkey_base,
1185                                self.active_end, tbparams, dtb, myname,
1186                                desthost, type)
1187                    except IOError, e:
1188                        raise service_error(service_error.internal,
1189                                "Failed to write config file for %s" % \
1190                                        self.current_gateway)
1191           
1192                    gw_pubkey = "%s/keys/%s" % \
1193                            (self.tmpdir, self.gw_pubkey_base)
1194                    gw_secretkey = "%s/keys/%s" % \
1195                            (self.tmpdir, self.gw_secretkey_base)
1196
1197                    pkfile = "%s/%s/%s" % \
1198                            ( self.tmpdir, self.current_gateways, 
1199                                    self.gw_pubkey_base)
1200                    skfile = "%s/%s/%s" % \
1201                            ( self.tmpdir, self.current_gateways, 
1202                                    self.gw_secretkey_base)
1203
1204                    if not os.path.exists(pkfile):
1205                        try:
1206                            self.copy_file(gw_pubkey, pkfile)
1207                        except IOError:
1208                            service_error(service_error.internal,
1209                                    "Failed to copy pubkey file")
1210
1211                    if active and not os.path.exists(skfile):
1212                        try:
1213                            self.copy_file(gw_secretkey, skfile)
1214                        except IOError:
1215                            service_error(service_error.internal,
1216                                    "Failed to copy secretkey file")
1217                return True
1218
1219    class shunt_to_file:
1220        """
1221        Simple class to write data between two regexps to a file.
1222        """
1223        def __init__(self, begin, end, filename):
1224            """
1225            Begin shunting on a match of begin, stop on end, send data to
1226            filename.
1227            """
1228            self.begin = re.compile(begin)
1229            self.end = re.compile(end)
1230            self.in_shunt = False
1231            self.file = None
1232            self.filename = filename
1233
1234        def __call__(self, line):
1235            """
1236            Call this on each line in the input that may be shunted.
1237            """
1238            if not self.in_shunt:
1239                if self.begin.match(line):
1240                    self.in_shunt = True
1241                    try:
1242                        self.file = open(self.filename, "w")
1243                    except:
1244                        self.file = None
1245                        raise
1246                    return True
1247                else:
1248                    return False
1249            else:
1250                if self.end.match(line):
1251                    if self.file: 
1252                        self.file.close()
1253                        self.file = None
1254                    self.in_shunt = False
1255                else:
1256                    if self.file:
1257                        print >>self.file, line
1258                return True
1259
1260    class shunt_to_list:
1261        """
1262        Same interface as shunt_to_file.  Data collected in self.list, one list
1263        element per line.
1264        """
1265        def __init__(self, begin, end):
1266            self.begin = re.compile(begin)
1267            self.end = re.compile(end)
1268            self.in_shunt = False
1269            self.list = [ ]
1270       
1271        def __call__(self, line):
1272            if not self.in_shunt:
1273                if self.begin.match(line):
1274                    self.in_shunt = True
1275                    return True
1276                else:
1277                    return False
1278            else:
1279                if self.end.match(line):
1280                    self.in_shunt = False
1281                else:
1282                    self.list.append(line)
1283                return True
1284
1285    class shunt_to_string:
1286        """
1287        Same interface as shunt_to_file.  Data collected in self.str, all in
1288        one string.
1289        """
1290        def __init__(self, begin, end):
1291            self.begin = re.compile(begin)
1292            self.end = re.compile(end)
1293            self.in_shunt = False
1294            self.str = ""
1295       
1296        def __call__(self, line):
1297            if not self.in_shunt:
1298                if self.begin.match(line):
1299                    self.in_shunt = True
1300                    return True
1301                else:
1302                    return False
1303            else:
1304                if self.end.match(line):
1305                    self.in_shunt = False
1306                else:
1307                    self.str += line
1308                return True
1309
1310    def create_experiment(self, req, fid):
1311        """
1312        The external interface to experiment creation called from the
1313        dispatcher.
1314
1315        Creates a working directory, splits the incoming description using the
1316        splitter script and parses out the avrious subsections using the
1317        lcasses above.  Once each sub-experiment is created, use pooled threads
1318        to instantiate them and start it all up.
1319        """
1320        try:
1321            tmpdir = tempfile.mkdtemp(prefix="split-")
1322        except IOError:
1323            raise service_error(service_error.internal, "Cannot create tmp dir")
1324
1325        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1326        gw_secretkey_base = "fed.%s" % self.ssh_type
1327        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1328        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1329        tclfile = tmpdir + "/experiment.tcl"
1330        tbparams = { }
1331
1332        pid = "dummy"
1333        gid = "dummy"
1334        # XXX
1335        fail_soft = False
1336
1337        try:
1338            os.mkdir(tmpdir+"/keys")
1339        except OSError:
1340            raise service_error(service_error.internal,
1341                    "Can't make temporary dir")
1342
1343        req = req.get('CreateRequestBody', None)
1344        if not req:
1345            raise service_error(service_error.req,
1346                    "Bad request format (no CreateRequestBody)")
1347        # The tcl parser needs to read a file so put the content into that file
1348        file_content=req.get('experimentdescription', None)
1349        if file_content:
1350            try:
1351                f = open(tclfile, 'w')
1352                f.write(file_content)
1353                f.close()
1354            except IOError:
1355                raise service_error(service_error.internal,
1356                        "Cannot write temp experiment description")
1357        else:
1358            raise service_error(service_error.req, "No experiment description")
1359
1360        if req.has_key('experimentID') and \
1361                req['experimentID'].has_key('localname'):
1362            eid = req['experimentID']['localname']
1363            self.state_lock.acquire()
1364            while (self.state.has_key(eid)):
1365                eid += random.choice(string.ascii_letters)
1366            # To avoid another thread picking this localname
1367            self.state[eid] = "placeholder"
1368            self.state_lock.release()
1369        else:
1370            eid = self.exp_stem
1371            for i in range(0,5):
1372                eid += random.choice(string.ascii_letters)
1373            self.state_lock.acquire()
1374            while (self.state.has_key(eid)):
1375                eid = self.exp_stem
1376                for i in range(0,5):
1377                    eid += random.choice(string.ascii_letters)
1378            # To avoid another thread picking this localname
1379            self.state[eid] = "placeholder"
1380            self.state_lock.release()
1381
1382        try:
1383            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1384        except ValueError:
1385            raise service_error(service_error.server_config, 
1386                    "Bad key type (%s)" % self.ssh_type)
1387
1388        user = req.get('user', None)
1389        if user == None:
1390            raise service_error(service_error.req, "No user")
1391
1392        master = req.get('master', None)
1393        if master == None:
1394            raise service_error(service_error.req, "No master testbed label")
1395       
1396       
1397        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1398            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1399        tclparser = Popen(tclcmd, stdout=PIPE)
1400
1401        allocated = { }     # Testbeds we can access
1402        started = { }       # Testbeds where a sub-experiment started
1403                            # successfully
1404
1405        # Objects to parse the splitter output (defined above)
1406        parse_current_testbed = self.current_testbed(eid, tmpdir)
1407        parse_allbeds = self.allbeds(self.get_access)
1408        parse_gateways = self.gateways(eid, master, tmpdir,
1409                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1410        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1411                    "^#\s+End\s+Vtopo")
1412        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1413                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1414        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1415                "^#\s+End\s+tarfiles")
1416        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1417                "^#\s+End\s+rpms")
1418
1419        # Worling on the split data
1420        for line in tclparser.stdout:
1421            line = line.rstrip()
1422            if parse_current_testbed(line, master, allocated, tbparams):
1423                continue
1424            elif parse_allbeds(line, user, tbparams):
1425                continue
1426            elif parse_gateways(line, allocated, tbparams):
1427                continue
1428            elif parse_vtopo(line):
1429                continue
1430            elif parse_hostnames(line):
1431                continue
1432            elif parse_tarfiles(line):
1433                continue
1434            elif parse_rpms(line):
1435                continue
1436            else:
1437                raise service_error(service_error.internal, 
1438                        "Bad tcl parse? %s" % line)
1439
1440        # Virtual topology and visualization
1441        vtopo = self.gentopo(parse_vtopo.str)
1442        if not vtopo:
1443            raise service_error(service_error.internal, 
1444                    "Failed to generate virtual topology")
1445
1446        vis = self.genviz(vtopo)
1447        if not vis:
1448            raise service_error(service_error.internal, 
1449                    "Failed to generate visualization")
1450
1451        # save federant information
1452        for k in allocated.keys():
1453            tbparams[k]['federant'] = {\
1454                    'name': [ { 'localname' : eid} ],\
1455                    'emulab': tbparams[k]['emulab'],\
1456                    'master' : k == master,\
1457                }
1458
1459
1460        # Copy tarfiles and rpms needed at remote sites into a staging area
1461        try:
1462            for t in parse_tarfiles.list:
1463                if not os.path.exists("%s/tarfiles" % tmpdir):
1464                    os.mkdir("%s/tarfiles" % tmpdir)
1465                self.copy_file(t, "%s/tarfiles/%s" % \
1466                        (tmpdir, os.path.basename(t)))
1467            for r in parse_rpms.list:
1468                if not os.path.exists("%s/rpms" % tmpdir):
1469                    os.mkdir("%s/rpms" % tmpdir)
1470                self.copy_file(r, "%s/rpms/%s" % \
1471                        (tmpdir, os.path.basename(r)))
1472        except IOError, e:
1473            raise service_error(service_error.internal, 
1474                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1475
1476        thread_pool_info = self.thread_pool()
1477        threads = [ ]
1478
1479        for tb in [ k for k in allocated.keys() if k != master]:
1480            # Wait until we have a free slot to start the next testbed load
1481            thread_pool_info.acquire()
1482            while thread_pool_info.started - \
1483                    thread_pool_info.terminated >= self.nthreads:
1484                thread_pool_info.wait()
1485            thread_pool_info.release()
1486
1487            # Create and start a thread to start the segment, and save it to
1488            # get the return value later
1489            t  = self.pooled_thread(target=self.start_segment, 
1490                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1491                    pdata=thread_pool_info, trace_file=self.trace_file)
1492            threads.append(t)
1493            t.start()
1494
1495        # Wait until all finish (the first clause of the while is to make sure
1496        # one starts)
1497        thread_pool_info.acquire()
1498        while thread_pool_info.started == 0 or \
1499                thread_pool_info.started > thread_pool_info.terminated:
1500            thread_pool_info.wait()
1501        thread_pool_info.release()
1502
1503        # If none failed, start the master
1504        failed = [ t.getName() for t in threads if not t.rv ]
1505
1506        if len(failed) == 0:
1507            if not self.start_segment(master, eid, tbparams, tmpdir):
1508                failed.append(master)
1509
1510        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1511        # If one failed clean up, unless fail_soft is set
1512        if failed:
1513            if not fail_soft:
1514                for tb in succeeded:
1515                    self.stop_segment(tb, eid, tbparams)
1516                # Remove the placeholder
1517                self.state_lock.acquire()
1518                del self.state[eid]
1519                self.state_lock.release()
1520
1521                raise service_error(service_error.federant,
1522                    "Swap in failed on %s" % ",".join(failed))
1523        else:
1524            self.log.info("[start_segment]: Experiment %s started" % eid)
1525
1526        # Generate an ID for the experiment (slice) and a certificate that the
1527        # allocator can use to prove they own it.  We'll ship it back through
1528        # the encrypted connection.
1529        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1530
1531        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1532
1533        # Walk up tmpdir, deleting as we go
1534        for path, dirs, files in os.walk(tmpdir, topdown=False):
1535            for f in files:
1536                os.remove(os.path.join(path, f))
1537            for d in dirs:
1538                os.rmdir(os.path.join(path, d))
1539        os.rmdir(tmpdir)
1540
1541        resp = { 'federant' : [ tbparams[tb]['federant'] \
1542                for tb in tbparams.keys() \
1543                    if tbparams[tb].has_key('federant') ],\
1544                    'vtopo': vtopo,\
1545                    'vis' : vis,
1546                    'experimentID' : [\
1547                            { 'fedid': copy.copy(expid) }, \
1548                            { 'localname': eid },\
1549                        ],\
1550                    'experimentAccess': { 'X509' : expcert },\
1551                }
1552
1553        # Insert the experiment into our state and update the disk copy
1554        self.state_lock.acquire()
1555        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1556                for tb in tbparams.keys() \
1557                    if tbparams[tb].has_key('federant') ],\
1558                    'vtopo': vtopo,\
1559                    'vis' : vis,
1560                    'experimentID' : [\
1561                            { 'fedid': expid }, { 'localname': eid },\
1562                        ],\
1563                }
1564        self.state[eid] = self.state[expid]
1565        if self.state_filename: self.write_state()
1566        self.state_lock.release()
1567
1568        if not failed:
1569            return resp
1570        else:
1571            raise service_error(service_error.partial, \
1572                    "Partial swap in on %s" % ",".join(succeeded))
1573
1574
1575    def get_vtopo(self, req, fid):
1576        """
1577        Return the stored virtual topology for this experiment
1578        """
1579        rv = None
1580
1581        req = req.get('VtopoRequestBody', None)
1582        if not req:
1583            raise service_error(service_error.req,
1584                    "Bad request format (no VtopoRequestBody)")
1585        exp = req.get('experiment', None)
1586        if exp:
1587            if exp.has_key('fedid'):
1588                key = fedid(bits=exp['fedid'])
1589                keytype = "fedid"
1590            elif exp.has_key('localname'):
1591                key = exp['localname']
1592                keytype = "localname"
1593            else:
1594                raise service_error(service_error.req, "Unknown lookup type")
1595        else:
1596            raise service_error(service_error.req, "No request?")
1597
1598        self.state_lock.acquire()
1599        if self.state.has_key(key):
1600            rv = { 'experiment' : {keytype: key },\
1601                    'vtopo': self.state[key]['vtopo'],\
1602                }
1603        self.state_lock.release()
1604
1605        if rv: return rv
1606        else: raise service_error(service_error.req, "No such experiment")
1607
1608    def get_vis(self, req, fid):
1609        """
1610        Return the stored visualization for this experiment
1611        """
1612        rv = None
1613
1614        req = req.get('VisRequestBody', None)
1615        if not req:
1616            raise service_error(service_error.req,
1617                    "Bad request format (no VisRequestBody)")
1618        exp = req.get('experiment', None)
1619        if exp:
1620            if exp.has_key('fedid'):
1621                key = fedid(bits=exp['fedid'])
1622                keytype = "fedid"
1623            elif exp.has_key('localname'):
1624                key = exp['localname']
1625                keytype = "localname"
1626            else:
1627                raise service_error(service_error.req, "Unknown lookup type")
1628        else:
1629            raise service_error(service_error.req, "No request?")
1630
1631        self.state_lock.acquire()
1632        if self.state.has_key(key):
1633            rv =  { 'experiment' : {keytype: key },\
1634                    'vis': self.state[key]['vis'],\
1635                    }
1636        self.state_lock.release()
1637
1638        if rv: return rv
1639        else: raise service_error(service_error.req, "No such experiment")
1640
1641    def get_info(self, req, fid):
1642        """
1643        Return all the stored info about this experiment
1644        """
1645        rv = None
1646
1647        req = req.get('InfoRequestBody', None)
1648        if not req:
1649            raise service_error(service_error.req,
1650                    "Bad request format (no VisRequestBody)")
1651        exp = req.get('experiment', None)
1652        if exp:
1653            if exp.has_key('fedid'):
1654                key = fedid(bits=exp['fedid'])
1655                keytype = "fedid"
1656            elif exp.has_key('localname'):
1657                key = exp['localname']
1658                keytype = "localname"
1659            else:
1660                raise service_error(service_error.req, "Unknown lookup type")
1661        else:
1662            raise service_error(service_error.req, "No request?")
1663
1664        # The state may be massaged by the service function that called
1665        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1666        # state.
1667        self.state_lock.acquire()
1668        if self.state.has_key(key):
1669            rv = copy.deepcopy(self.state[key])
1670        self.state_lock.release()
1671
1672        if rv: return rv
1673        else: raise service_error(service_error.req, "No such experiment")
1674
1675
1676    def terminate_experiment(self, req, fid):
1677        """
1678        Swap this experiment out on the federants and delete the shared
1679        information
1680        """
1681        tbparams = { }
1682        req = req.get('TerminateRequestBody', None)
1683        if not req:
1684            raise service_error(service_error.req,
1685                    "Bad request format (no TerminateRequestBody)")
1686        exp = req.get('experiment', None)
1687        if exp:
1688            if exp.has_key('fedid'):
1689                key = fedid(bits=exp['fedid'])
1690                keytype = "fedid"
1691            elif exp.has_key('localname'):
1692                key = exp['localname']
1693                keytype = "localname"
1694            else:
1695                raise service_error(service_error.req, "Unknown lookup type")
1696        else:
1697            raise service_error(service_error.req, "No request?")
1698
1699        self.state_lock.acquire()
1700        fed_exp = self.state.get(key, None)
1701
1702        if fed_exp:
1703            # This branch of the conditional holds the lock to generate a
1704            # consistent temporary tbparams variable to deallocate experiments.
1705            # It releases the lock to do the deallocations and reacquires it to
1706            # remove the experiment state when the termination is complete.
1707            ids = []
1708            #  experimentID is a list of dicts that are self-describing
1709            #  identifiers.  This finds all the fedids and localnames - the
1710            #  keys of self.state - and puts them into ids.
1711            for id in fed_exp.get('experimentID', []):
1712                if id.has_key('fedid'): ids.append(id['fedid'])
1713                if id.has_key('localname'): ids.append(id['localname'])
1714
1715            # Construct enough of the tbparams to make the stop_segment calls
1716            # work
1717            for fed in fed_exp['federant']:
1718                try:
1719                    for e in fed['name']:
1720                        eid = e.get('localname', None)
1721                        if eid: break
1722                    else:
1723                        continue
1724
1725                    p = fed['emulab']['project']
1726
1727                    project = p['name']['localname']
1728                    tb = p['testbed']['localname']
1729                    user = p['user'][0]['userID']['localname']
1730
1731                    domain = fed['emulab']['domain']
1732                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1733                except KeyError, e:
1734                    continue
1735                tbparams[tb] = {\
1736                        'user': user,\
1737                        'domain': domain,\
1738                        'project': project,\
1739                        'host': host,\
1740                        'eid': eid,\
1741                    }
1742            self.state_lock.release()
1743
1744            # Stop everyone.
1745            for tb in tbparams.keys():
1746                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1747
1748            # Remove teh terminated experiment
1749            self.state_lock.acquire()
1750            for id in ids:
1751                if self.state.has_key(id): del self.state[id]
1752
1753            if self.state_filename: self.write_state()
1754            self.state_lock.release()
1755
1756            return { 'experiment': exp }
1757        else:
1758            # Don't forget to release the lock
1759            self.state_lock.release()
1760            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.