source: fedd/fedd_experiment_control.py @ 0ea11af

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 0ea11af was 0ea11af, checked in by Ted Faber <faber@…>, 16 years ago

clean up and add some docs

  • Property mode set to 100644
File size: 51.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
48        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
49        "fed_evrepeater", "rc.accounts.patch"]
50   
51    class thread_pool:
52        """
53        A class to keep track of a set of threads all invoked for the same
54        task.  Manages the mutual exclusion of the states.
55        """
56        def __init__(self):
57            """
58            Start a pool.
59            """
60            self.changed = Condition()
61            self.started = 0
62            self.terminated = 0
63
64        def acquire(self):
65            """
66            Get the pool's lock.
67            """
68            self.changed.acquire()
69
70        def release(self):
71            """
72            Release the pool's lock.
73            """
74            self.changed.release()
75
76        def wait(self, timeout = None):
77            """
78            Wait for a pool thread to start or stop.
79            """
80            self.changed.wait(timeout)
81
82        def start(self):
83            """
84            Called by a pool thread to report starting.
85            """
86            self.changed.acquire()
87            self.started += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def terminate(self):
92            """
93            Called by a pool thread to report finishing.
94            """
95            self.changed.acquire()
96            self.terminated += 1
97            self.changed.notifyAll()
98            self.changed.release()
99
100        def clear(self):
101            """
102            Clear all pool data.
103            """
104            self.changed.acquire()
105            self.started = 0
106            self.terminated =0
107            self.changed.notifyAll()
108            self.changed.release()
109
110    class pooled_thread(Thread):
111        """
112        One of a set of threads dedicated to a specific task.  Uses the
113        thread_pool class above for coordination.
114        """
115        def __init__(self, group=None, target=None, name=None, args=(), 
116                kwargs={}, pdata=None, trace_file=None):
117            Thread.__init__(self, group, target, name, args, kwargs)
118            self.rv = None          # Return value of the ops in this thread
119            self.exception = None   # Exception that terminated this thread
120            self.target=target      # Target function to run on start()
121            self.args = args        # Args to pass to target
122            self.kwargs = kwargs    # Additional kw args
123            self.pdata = pdata      # thread_pool for this class
124            # Logger for this thread
125            self.log = logging.getLogger("fedd.experiment_control")
126       
127        def run(self):
128            """
129            Emulate Thread.run, except add pool data manipulation and error
130            logging.
131            """
132            if self.pdata:
133                self.pdata.start()
134
135            if self.target:
136                try:
137                    self.rv = self.target(*self.args, **self.kwargs)
138                except service_error, s:
139                    self.exception = s
140                    self.log.error("Thread exception: %s %s" % \
141                            (s.code_string(), s.desc))
142                except:
143                    self.exception = sys.exc_info()[1]
144                    self.log.error(("Unexpected thread exception: %s" +\
145                            "Trace %s") % (self.exception,\
146                                traceback.format_exc()))
147            if self.pdata:
148                self.pdata.terminate()
149
150    def __init__(self, config=None):
151        """
152        Intialize the various attributes, most from the config object
153        """
154        self.scripts = fedd_experiment_control_local.scripts
155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
156        self.thread_pool = fedd_experiment_control_local.thread_pool
157
158        self.cert_file = None
159        self.cert_pwd = None
160        self.trusted_certs = None
161
162        # Walk through the various relevant certificat specifying config
163        # attributes until the local certificate attributes can be resolved.
164        # The walk is from most specific to most general specification.
165        for p in ("create_experiment_", "proxy_", ""):
166            filen = "%scert_file" % p
167            pwn = "%scert_pwd" % p
168            trustn = "%strusted_certs" % p
169
170            if getattr(config, filen, None):
171                if not self.cert_file:
172                    self.cert_file = getattr(config, filen, None)
173                    self.cert_pwd = getattr(config, pwn, None)
174
175            if getattr(config, trustn, None):
176                if not self.trusted_certs:
177                    self.trusted_certs = getattr(config, trustn, None)
178
179        self.exp_stem = "fed-stem"
180        self.debug = config.create_debug
181        self.log = logging.getLogger("fedd.experiment_control")
182        self.muxmax = 2
183        self.nthreads = 2
184        self.randomize_experiments = False
185        self.scp_exec = "/usr/bin/scp"
186        self.scripts_dir = "/users/faber/testbed/federation"
187        self.splitter = None
188        self.ssh_exec="/usr/bin/ssh"
189        self.ssh_keygen = "/usr/bin/ssh-keygen"
190        self.ssh_identity_file = None
191        # XXX
192        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
193        self.ssh_type = "rsa"
194        self.state = { }
195        self.state_filename = config.experiment_state_file
196        self.state_lock = Lock()
197        self.tclsh = "/usr/local/bin/otclsh"
198        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
199        self.tbmap = { 
200                'deter':'https://users.isi.deterlab.net:23235',
201                'emulab':'https://users.isi.deterlab.net:23236',
202                'ucb':'https://users.isi.deterlab.net:23237',
203                }
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
208        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
209        self.def_gwstart = \
210                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
211        self.def_mgwstart = \
212                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
213        self.def_gwimage = "FBSD61-TUNNEL2";
214        self.def_gwtype = "pc";
215
216
217        if self.ssh_pubkey_file:
218            try:
219                f = open(self.ssh_pubkey_file, 'r')
220                self.ssh_pubkey = f.read()
221                f.close()
222            except IOError:
223                raise service_error(service_error.internal,
224                        "Cannot read sshpubkey")
225
226        # Set the logging level to the value passed in.  The getattr slieght of
227        # hand finds the logging level constant corrersponding to the string.
228        # We're a little paranoid to avoid user mayhem.
229        if config.experiment_log:
230            try:
231                level = int(getattr(logging, config.experiment_log.upper(),-1))
232
233                if  logging.DEBUG <= level <= logging.CRITICAL:
234                    self.log.setLevel(level)
235                else:
236                    self.log.error("Bad experiment_log value: %s" % \
237                            config.experiment_log)
238
239            except ValueError:
240                self.log.error("Bad experiment_log value: %s" % \
241                        config.experiment_log)
242
243        # Grab saved state.  OK to do this w/o locking because it's read only
244        # and only one thread should be in existence that can see self.state at
245        # this point.
246        if self.state_filename:
247            self.read_state()
248
249        # Confirm federation scripts in the right place
250        for s in self.scripts:
251            if not os.path.exists(self.scripts_dir + "/" + s):
252                raise service_error(service_error.server_config,
253                        "%s/%s not in local script dir" % (self.scripts_dir, s))
254
255        # Dispatch tables
256        self.soap_services = {\
257                'Create': make_soap_handler(\
258                        CreateRequestMessage.typecode,
259                        getattr(self, "create_experiment"), 
260                        CreateResponseMessage,
261                        "CreateResponseBody"),
262                'Vtopo': make_soap_handler(\
263                        VtopoRequestMessage.typecode,
264                        getattr(self, "get_vtopo"),
265                        VtopoResponseMessage,
266                        "VtopoResponseBody"),
267                'Vis': make_soap_handler(\
268                        VisRequestMessage.typecode,
269                        getattr(self, "get_vis"),
270                        VisResponseMessage,
271                        "VisResponseBody"),
272                'Info': make_soap_handler(\
273                        InfoRequestMessage.typecode,
274                        getattr(self, "get_info"),
275                        InfoResponseMessage,
276                        "InfoResponseBody"),
277                'Terminate': make_soap_handler(\
278                        TerminateRequestMessage.typecode,
279                        getattr(self, "terminate_experiment"),
280                        TerminateResponseMessage,
281                        "TerminateResponseBody"),
282        }
283
284        self.xmlrpc_services = {\
285                'Create': make_xmlrpc_handler(\
286                        getattr(self, "create_experiment"), 
287                        "CreateResponseBody"),
288                'Vtopo': make_xmlrpc_handler(\
289                        getattr(self, "get_vtopo"),
290                        "VtopoResponseBody"),
291                'Vis': make_xmlrpc_handler(\
292                        getattr(self, "get_vis"),
293                        "VisResponseBody"),
294                'Info': make_xmlrpc_handler(\
295                        getattr(self, "get_info"),
296                        "InfoResponseBody"),
297                'Terminate': make_xmlrpc_handler(\
298                        getattr(self, "terminate_experiment"),
299                        "TerminateResponseBody"),
300        }
301
302    def get_soap_services(self):
303        return self.soap_services
304
305    def get_xmlrpc_services(self):
306        return self.xmlrpc_services
307
308    def copy_file(self, src, dest, size=1024):
309        """
310        Exceedingly simple file copy.
311        """
312        s = open(src,'r')
313        d = open(dest, 'w')
314
315        buf = "x"
316        while buf != "":
317            buf = s.read(size)
318            d.write(buf)
319        s.close()
320        d.close()
321
322    # Call while holding self.state_lock
323    def write_state(self):
324        """
325        Write a new copy of experiment state after copying the existing state
326        to a backup.
327
328        State format is a simple pickling of the state dictionary.
329        """
330        if os.access(self.state_filename, os.W_OK):
331            self.copy_file(self.state_filename, \
332                    "%s.bak" % self.state_filename)
333        try:
334            f = open(self.state_filename, 'w')
335            pickle.dump(self.state, f)
336        except IOError, e:
337            self.log.error("Can't write file %s: %s" % \
338                    (self.state_filename, e))
339        except pickle.PicklingError, e:
340            self.log.error("Pickling problem: %s" % e)
341
342    # Call while holding self.state_lock
343    def read_state(self):
344        """
345        Read a new copy of experiment state.  Old state is overwritten.
346
347        State format is a simple pickling of the state dictionary.
348        """
349        try:
350            f = open(self.state_filename, "r")
351            self.state = pickle.load(f)
352        except IOError, e:
353            self.log.warning("No saved state: Can't open %s: %s" % \
354                    (self.state_filename, e))
355        except pickle.UnpicklingError, e:
356            self.log.warning("No saved state: Unpickling failed: %s" % e)
357
358    def scp_file(self, file, user, host, dest=""):
359        """
360        scp a file to the remote host.  If debug is set the action is only
361        logged.
362        """
363
364        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
365        rv = 0
366
367        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
368        if not self.debug:
369            rv = call(scp_cmd, stdout=trace, stderr=trace)
370
371        return rv == 0
372
373    def ssh_cmd(self, user, host, cmd, wname=None):
374        """
375        Run a remote command on host as user.  If debug is set, the action is
376        only logged.
377        """
378        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
379
380        self.log.debug("[ssh_cmd]: %s" % sh_str)
381        if not self.debug:
382            sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace)
383            return sub.wait() == 0
384        else:
385            return True
386
387    def ship_scripts(self, host, user, dest_dir):
388        """
389        Copy the federation scripts (fedkit) to the a federant.
390        """
391        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
392            for s in self.scripts:
393                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
394                        user, host, dest_dir):
395                    return False
396            return True
397        else:
398            return False
399
400    def ship_configs(self, host, user, src_dir, dest_dir):
401        """
402        Copy federant-specific configuration files to the federant.
403        """
404        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
405            return False
406        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
407            return False
408
409        for f in os.listdir(src_dir):
410            if os.path.isdir(f):
411                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
412                        "%s/%s" % (dest_dir, f)):
413                    return False
414            else:
415                if not self.scp_file("%s/%s" % (src_dir, f), 
416                        user, host, dest_dir):
417                    return False
418        return True
419
420    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
421        """
422        Start a sub-experiment on a federant.
423
424        Get the current state, modify or create as appropriate, ship data and
425        configs and start the experiment.  There are small ordering differences
426        based on the initial state of the sub-experiment.
427        """
428        # ops node in the federant
429        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
430        user = tbparams[tb]['user']     # federant user
431        pid = tbparams[tb]['project']   # federant project
432        # XXX
433        base_confs = ( "hosts",)
434        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
435        # command to test experiment state
436        expinfo_exec = "/usr/testbed/bin/expinfo" 
437        # Configuration directories on the remote machine
438        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
439        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
440        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
441        # Regular expressions to parse the expinfo response
442        state_re = re.compile("State:\s+(\w+)")
443        no_exp_re = re.compile("^No\s+such\s+experiment")
444        state = None    # Experiment state parsed from expinfo
445        # The expinfo ssh command
446        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
447
448        # Get status
449        self.log.debug("[start_segment]: %s"% " ".join(cmd))
450        dev_null = None
451        try:
452            dev_null = open("/dev/null", "a")
453        except IOError, e:
454            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
455
456        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
457        for line in status.stdout:
458            m = state_re.match(line)
459            if m: state = m.group(1)
460            else:
461                m = no_exp_re.match(line)
462                if m: state = "none"
463        rv = status.wait()
464
465        # If the experiment is not present the subcommand returns a non-zero
466        # return value.  If we successfully parsed a "none" outcome, ignore the
467        # return code.
468        if rv != 0 and state != "none":
469            raise service_error(service_error.internal,
470                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
471
472        self.log.debug("[start_segment]: %s: %s" % (tb, state))
473        self.log.info("[start_segment]:transferring experiment to %s" % tb)
474
475        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
476            return False
477        # Clear the federation files
478        if not self.ssh_cmd(user, host, 
479                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
480            return False
481        if not self.ssh_cmd(user, host, 
482                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
483            return False
484        # Clear and create the tarfiles and rpm directories
485        for d in (tarfiles_dir, rpms_dir):
486            if not self.ssh_cmd(user, host, 
487                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
488                return False
489            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
490                    "create tarfiles"):
491                return False
492       
493        if state == 'active':
494            # Remote experiment is active.  Modify it.
495            for f in base_confs:
496                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
497                        "%s/%s" % (proj_dir, f)):
498                    return False
499            if not self.ship_scripts(host, user, proj_dir):
500                return False
501            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
502                    proj_dir):
503                return False
504            if os.path.isdir("%s/tarfiles" % tmpdir):
505                if not self.ship_configs(host, user,
506                        "%s/tarfiles" % tmpdir, tarfiles_dir):
507                    return False
508            if os.path.isdir("%s/rpms" % tmpdir):
509                if not self.ship_configs(host, user,
510                        "%s/rpms" % tmpdir, tarfiles_dir):
511                    return False
512            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
513            if not self.ssh_cmd(user, host,
514                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
515                            (pid, eid, tclfile), "modexp"):
516                return False
517            return True
518        elif state == "swapped":
519            # Remote experiment swapped out.  Modify it and swap it in.
520            for f in base_confs:
521                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
522                        "%s/%s" % (proj_dir, f)):
523                    return False
524            if not self.ship_scripts(host, user, proj_dir):
525                return False
526            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
527                    proj_dir):
528                return False
529            if os.path.isdir("%s/tarfiles" % tmpdir):
530                if not self.ship_configs(host, user,
531                        "%s/tarfiles" % tmpdir, tarfiles_dir):
532                    return False
533            if os.path.isdir("%s/rpms" % tmpdir):
534                if not self.ship_configs(host, user,
535                        "%s/rpms" % tmpdir, tarfiles_dir):
536                    return False
537            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
538            if not self.ssh_cmd(user, host,
539                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
540                    "modexp"):
541                return False
542            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
543            if not self.ssh_cmd(user, host,
544                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
545                    "swapexp"):
546                return False
547            return True
548        elif state == "none":
549            # No remote experiment.  Create one.  We do this in 2 steps so we
550            # can put the configuration files and scripts into the new
551            # experiment directories.
552
553            # Tarfiles must be present for creation to work
554            if os.path.isdir("%s/tarfiles" % tmpdir):
555                if not self.ship_configs(host, user,
556                        "%s/tarfiles" % tmpdir, tarfiles_dir):
557                    return False
558            if os.path.isdir("%s/rpms" % tmpdir):
559                if not self.ship_configs(host, user,
560                        "%s/rpms" % tmpdir, tarfiles_dir):
561                    return False
562            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
563            if not self.ssh_cmd(user, host,
564                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
565                            (pid, eid, tclfile), "startexp"):
566                return False
567            # After startexp the per-experiment directories exist
568            for f in base_confs:
569                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
570                        "%s/%s" % (proj_dir, f)):
571                    return False
572            if not self.ship_scripts(host, user, proj_dir):
573                return False
574            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
575                    proj_dir):
576                return False
577            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
578            if not self.ssh_cmd(user, host,
579                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
580                    "swapexp"):
581                return False
582            return True
583        else:
584            self.log.debug("[start_segment]:unknown state %s" % state)
585            return False
586
587    def stop_segment(self, tb, eid, tbparams):
588        """
589        Stop a sub experiment by calling swapexp on the federant
590        """
591        user = tbparams[tb]['user']
592        host = tbparams[tb]['host']
593        pid = tbparams[tb]['project']
594
595        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
596        return self.ssh_cmd(user, host,
597                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
598
599       
600    def generate_ssh_keys(self, dest, type="rsa" ):
601        """
602        Generate a set of keys for the gateways to use to talk.
603
604        Keys are of type type and are stored in the required dest file.
605        """
606        valid_types = ("rsa", "dsa")
607        t = type.lower();
608        if t not in valid_types: raise ValueError
609        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
610
611        try:
612            trace = open("/dev/null", "w")
613        except IOError:
614            raise service_error(service_error.internal,
615                    "Cannot open /dev/null??");
616
617        # May raise CalledProcessError
618        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
619        rv = call(cmd, stdout=trace, stderr=trace)
620        if rv != 0:
621            raise service_error(service_error.internal, 
622                    "Cannot generate nonce ssh keys.  %s return code %d" \
623                            % (self.ssh_keygen, rv))
624
625    def gentopo(self, str):
626        """
627        Generate the topology dtat structure from the splitter's XML
628        representation of it.
629
630        The topology XML looks like:
631            <experiment>
632                <nodes>
633                    <node><vname></vname><ips>ip1:ip2</ips></node>
634                </nodes>
635                <lans>
636                    <lan>
637                        <vname></vname><vnode></vnode><ip></ip>
638                        <bandwidth></bandwidth><member>node:port</member>
639                    </lan>
640                </lans>
641        """
642        class topo_parse:
643            """
644            Parse the topology XML and create the dats structure.
645            """
646            def __init__(self):
647                # Typing of the subelements for data conversion
648                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
649                self.int_subelements = ( 'bandwidth',)
650                self.float_subelements = ( 'delay',)
651                # The final data structure
652                self.nodes = [ ]
653                self.lans =  [ ]
654                self.topo = { \
655                        'node': self.nodes,\
656                        'lan' : self.lans,\
657                    }
658                self.element = { }  # Current element being created
659                self.chars = ""     # Last text seen
660
661            def end_element(self, name):
662                # After each sub element the contents is added to the current
663                # element or to the appropriate list.
664                if name == 'node':
665                    self.nodes.append(self.element)
666                    self.element = { }
667                elif name == 'lan':
668                    self.lans.append(self.element)
669                    self.element = { }
670                elif name in self.str_subelements:
671                    self.element[name] = self.chars
672                    self.chars = ""
673                elif name in self.int_subelements:
674                    self.element[name] = int(self.chars)
675                    self.chars = ""
676                elif name in self.float_subelements:
677                    self.element[name] = float(self.chars)
678                    self.chars = ""
679
680            def found_chars(self, data):
681                self.chars += data.rstrip()
682
683
684        tp = topo_parse();
685        parser = xml.parsers.expat.ParserCreate()
686        parser.EndElementHandler = tp.end_element
687        parser.CharacterDataHandler = tp.found_chars
688
689        parser.Parse(str)
690
691        return tp.topo
692       
693
694    def genviz(self, topo):
695        """
696        Generate the visualization the virtual topology
697        """
698
699        neato = "/usr/local/bin/neato"
700        # These are used to parse neato output and to create the visualization
701        # file.
702        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
703        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
704                "%s</type></node>"
705
706        try:
707            # Node names
708            nodes = [ n['vname'] for n in topo['node'] ]
709            topo_lans = topo['lan']
710        except KeyError:
711            raise service_error(service_error.internal, "Bad topology")
712
713        lans = { }
714        links = { }
715
716        # Walk through the virtual topology, organizing the connections into
717        # 2-node connections (links) and more-than-2-node connections (lans).
718        # When a lan is created, it's added to the list of nodes (there's a
719        # node in the visualization for the lan).
720        for l in topo_lans:
721            if links.has_key(l['vname']):
722                if len(links[l['vname']]) < 2:
723                    links[l['vname']].append(l['vnode'])
724                else:
725                    nodes.append(l['vname'])
726                    lans[l['vname']] = links[l['vname']]
727                    del links[l['vname']]
728                    lans[l['vname']].append(l['vnode'])
729            elif lans.has_key(l['vname']):
730                lans[l['vname']].append(l['vnode'])
731            else:
732                links[l['vname']] = [ l['vnode'] ]
733
734
735        # Open up a temporary file for dot to turn into a visualization
736        try:
737            df, dotname = tempfile.mkstemp()
738            dotfile = os.fdopen(df, 'w')
739        except IOError:
740            raise service_error(service_error.internal,
741                    "Failed to open file in genviz")
742
743        # Generate a dot/neato input file from the links, nodes and lans
744        try:
745            print >>dotfile, "graph G {"
746            for n in nodes:
747                print >>dotfile, '\t"%s"' % n
748            for l in links.keys():
749                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
750            for l in lans.keys():
751                for n in lans[l]:
752                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
753            print >>dotfile, "}"
754            dotfile.close()
755        except TypeError:
756            raise service_error(service_error.internal,
757                    "Single endpoint link in vtopo")
758        except IOError:
759            raise service_error(service_error.internal, "Cannot write dot file")
760
761        # Use dot to create a visualization
762        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
763                '-Gpack=true', dotname], stdout=PIPE)
764
765        # Translate dot to vis format
766        vis_nodes = [ ]
767        vis = { 'node': vis_nodes }
768        for line in dot.stdout:
769            m = vis_re.match(line)
770            if m:
771                vn = m.group(1)
772                vis_node = {'name': vn, \
773                        'x': float(m.group(2)),\
774                        'y' : float(m.group(3)),\
775                    }
776                if vn in links.keys() or vn in lans.keys():
777                    vis_node['type'] = 'lan'
778                else:
779                    vis_node['type'] = 'node'
780                vis_nodes.append(vis_node)
781        rv = dot.wait()
782
783        os.remove(dotname)
784        if rv == 0 : return vis
785        else: return None
786
787
788    def get_access(self, tb, nodes, user, tbparam):
789        """
790        Get access to testbed through fedd and set the parameters for that tb
791        """
792
793        translate_attr = {
794            'slavenodestartcmd': 'expstart',
795            'slaveconnectorstartcmd': 'gwstart',
796            'masternodestartcmd': 'mexpstart',
797            'masterconnectorstartcmd': 'mgwstart',
798            'connectorimage': 'gwimage',
799            'connectortype': 'gwtype',
800            'tunnelcfg': 'tun',
801            'smbshare': 'smbshare',
802        }
803
804        # XXX multi-level access
805        uri = self.tbmap.get(tb, None)
806        if not uri:
807            raise service_error(serice_error.server_config, 
808                    "Unknown testbed: %s" % tb)
809
810        # The basic request
811        req = {\
812                'destinationTestbed' : { 'uri' : uri },
813                'user':  user,
814                'allocID' : { 'localname': 'test' },
815                'access' : [ { 'sshPubkey' : self.ssh_pubkey } ]
816            }
817       
818        # node resources if any
819        if nodes != None and len(nodes) > 0:
820            rnodes = [ ]
821            for n in nodes:
822                rn = { }
823                image, hw, count = n.split(":")
824                if image: rn['image'] = [ image ]
825                if hw: rn['hardware'] = [ hw ]
826                if count: rn['count'] = int(count)
827                rnodes.append(rn)
828            req['resources']= { }
829            req['resources']['node'] = rnodes
830
831        # No retry loop here.  Proxy servers must correctly authenticate
832        # themselves without help
833
834        try:
835            ctx = fedd_ssl_context(self.cert_file, 
836                    self.trusted_certs, password=self.cert_pwd)
837        except SSL.SSLError:
838            raise service_error(service_error.server_config, 
839                    "Server certificates misconfigured")
840
841        loc = feddServiceLocator();
842        port = loc.getfeddPortType(uri,
843                transport=M2Crypto.httpslib.HTTPSConnection, 
844                transdict={ 'ssl_context' : ctx })
845
846        # Reconstruct the full request message
847        msg = RequestAccessRequestMessage()
848        msg.set_element_RequestAccessRequestBody(
849                pack_soap(msg, "RequestAccessRequestBody", req))
850
851        try:
852            resp = port.RequestAccess(msg)
853        except ZSI.ParseException, e:
854            raise service_error(service_error.req,
855                    "Bad format message (XMLRPC??): %s" %
856                    str(e))
857        r = unpack_soap(resp)
858
859        if r.has_key('RequestAccessResponseBody'):
860            r = r['RequestAccessResponseBody']
861        else:
862            raise service_error(service_error.proxy,
863                    "Bad proxy response")
864
865
866        e = r['emulab']
867        p = e['project']
868        tbparam[tb] = { 
869                "boss": e['boss'],
870                "host": e['ops'],
871                "domain": e['domain'],
872                "fs": e['fileServer'],
873                "eventserver": e['eventServer'],
874                "project": unpack_id(p['name']),
875                "emulab" : e
876                }
877        # Make the testbed name be the label the user applied
878        p['testbed'] = {'localname': tb }
879
880        for u in p['user']:
881            tbparam[tb]['user'] = unpack_id(u['userID'])
882
883        for a in e['fedAttr']:
884            if a['attribute']:
885                key = translate_attr.get(a['attribute'].lower(), None)
886                if key:
887                    tbparam[tb][key]= a['value']
888       
889    class current_testbed:
890        """
891        Object for collecting the current testbed description.  The testbed
892        description is saved to a file with the local testbed variables
893        subsittuted line by line.
894        """
895        def __init__(self, eid, tmpdir):
896            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
897            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
898            self.current_testbed = None
899            self.testbed_file = None
900
901            self.def_expstart = \
902                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
903            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
904            self.def_gwstart = \
905                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
906            self.def_mgwstart = \
907                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
908            self.def_gwimage = "FBSD61-TUNNEL2";
909            self.def_gwtype = "pc";
910
911            self.eid = eid
912            self.tmpdir = tmpdir
913
914        def __call__(self, line, master, allocated, tbparams):
915            # Capture testbed topology descriptions
916            if self.current_testbed == None:
917                m = self.begin_testbed.match(line)
918                if m != None:
919                    self.current_testbed = m.group(1)
920                    if self.current_testbed == None:
921                        raise service_error(service_error.req,
922                                "Bad request format (unnamed testbed)")
923                    allocated[self.current_testbed] = \
924                            allocated.get(self.current_testbed,0) + 1
925                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
926                    if not os.path.exists(tb_dir):
927                        try:
928                            os.mkdir(tb_dir)
929                        except IOError:
930                            raise service_error(service_error.internal,
931                                    "Cannot create %s" % tb_dir)
932                    try:
933                        self.testbed_file = open("%s/%s.%s.tcl" %
934                                (tb_dir, self.eid, self.current_testbed), 'w')
935                    except IOError:
936                        self.testbed_file = None
937                    return True
938                else: return False
939            else:
940                m = self.end_testbed.match(line)
941                if m != None:
942                    if m.group(1) != self.current_testbed:
943                        raise service_error(service_error.internal, 
944                                "Mismatched testbed markers!?")
945                    if self.testbed_file != None: 
946                        self.testbed_file.close()
947                        self.testbed_file = None
948                    self.current_testbed = None
949                elif self.testbed_file:
950                    # Substitute variables and put the line into the local
951                    # testbed file.
952                    gwtype = tbparams[self.current_testbed].get('gwtype', 
953                            self.def_gwtype)
954                    gwimage = tbparams[self.current_testbed].get('gwimage', 
955                            self.def_gwimage)
956                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
957                            self.def_mgwstart)
958                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
959                            self.def_mexpstart)
960                    gwstart = tbparams[self.current_testbed].get('gwstart', 
961                            self.def_gwstart)
962                    expstart = tbparams[self.current_testbed].get('expstart', 
963                            self.def_expstart)
964                    project = tbparams[self.current_testbed].get('project')
965                    line = re.sub("GWTYPE", gwtype, line)
966                    line = re.sub("GWIMAGE", gwimage, line)
967                    if self.current_testbed == master:
968                        line = re.sub("GWSTART", mgwstart, line)
969                        line = re.sub("EXPSTART", mexpstart, line)
970                    else:
971                        line = re.sub("GWSTART", gwstart, line)
972                        line = re.sub("EXPSTART", expstart, line)
973                    # XXX: does `` embed without doing enything else?
974                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
975                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
976                    line = re.sub("EID", self.eid, line)
977                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
978                            (project, self.eid), line)
979                    print >>self.testbed_file, line
980                return True
981
982    class allbeds:
983        """
984        Process the Allbeds section.  Get access to each federant and save the
985        parameters in tbparams
986        """
987        def __init__(self, get_access):
988            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
989            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
990            self.in_allbeds = False
991            self.get_access = get_access
992
993        def __call__(self, line, user, tbparams):
994            # Testbed access parameters
995            if not self.in_allbeds:
996                if self.begin_allbeds.match(line):
997                    self.in_allbeds = True
998                    return True
999                else:
1000                    return False
1001            else:
1002                if self.end_allbeds.match(line):
1003                    self.in_allbeds = False
1004                else:
1005                    nodes = line.split('|')
1006                    tb = nodes.pop(0)
1007                    self.get_access(tb, nodes, user, tbparams)
1008                return True
1009
1010    class gateways:
1011        def __init__(self, eid, master, tmpdir, gw_pubkey,
1012                gw_secretkey, copy_file):
1013            self.begin_gateways = \
1014                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1015            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1016            self.current_gateways = None
1017            self.control_gateway = None
1018            self.active_end = { }
1019
1020            self.eid = eid
1021            self.master = master
1022            self.tmpdir = tmpdir
1023            self.gw_pubkey_base = gw_pubkey
1024            self.gw_secretkey_base = gw_secretkey
1025
1026            self.copy_file = copy_file
1027
1028
1029        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1030                active_end, tbparams, dtb, myname, desthost, type):
1031            """
1032            Produce a gateway configuration file from a gateways line.
1033            """
1034
1035            sproject = tbparams[gw].get('project', 'project')
1036            dproject = tbparams[dtb].get('project', 'project')
1037            sdomain = ".%s.%s%s" % (eid, sproject,
1038                    tbparams[gw].get('domain', ".example.com"))
1039            ddomain = ".%s.%s%s" % (eid, dproject,
1040                    tbparams[dtb].get('domain', ".example.com"))
1041            boss = tbparams[master].get('boss', "boss")
1042            fs = tbparams[master].get('fs', "fs")
1043            event_server = "%s%s" % \
1044                    (tbparams[gw].get('eventserver', "event_server"),
1045                            tbparams[gw].get('domain', "example.com"))
1046            remote_event_server = "%s%s" % \
1047                    (tbparams[dtb].get('eventserver', "event_server"),
1048                            tbparams[dtb].get('domain', "example.com"))
1049            seer_control = "%s%s" % \
1050                    (tbparams[gw].get('control', "control"), sdomain)
1051            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1052            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1053            tunnel_cfg = tbparams[gw].get("tun", "false")
1054
1055            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1056            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1057
1058            # translate to lower case so the `hostname` hack for specifying
1059            # configuration files works.
1060            conf_file = conf_file.lower();
1061            remote_conf_file = remote_conf_file.lower();
1062
1063            if dtb == master:
1064                active = "false"
1065            elif gw == master:
1066                active = "true"
1067            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1068                active = "false"
1069            else:
1070                active_end['%s-%s' % (gw, dtb)] = 1
1071                active = "true"
1072
1073            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1074            print >>gwconfig, "Active: %s" % active
1075            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1076            print >>gwconfig, "BossName: %s" % boss
1077            print >>gwconfig, "FsName: %s" % fs
1078            print >>gwconfig, "EventServerName: %s" % event_server
1079            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1080            print >>gwconfig, "SeerControl: %s" % seer_control
1081            print >>gwconfig, "Type: %s" % type
1082            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1083            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1084                    local_script_dir
1085            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1086            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1087            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1088                    (remote_script_dir, remote_conf_file)
1089            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1090            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
1091            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
1092            gwconfig.close()
1093
1094            return active == "true"
1095
1096        def __call__(self, line, allocated, tbparams):
1097            # Process gateways
1098            if not self.current_gateways:
1099                m = self.begin_gateways.match(line)
1100                if m:
1101                    self.current_gateways = m.group(1)
1102                    if allocated.has_key(self.current_gateways):
1103                        # This test should always succeed
1104                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1105                        if not os.path.exists(tb_dir):
1106                            try:
1107                                os.mkdir(tb_dir)
1108                            except IOError:
1109                                raise service_error(service_error.internal,
1110                                        "Cannot create %s" % tb_dir)
1111                    else:
1112                        # XXX
1113                        self.log.error("[gateways]: Ignoring gateways for " + \
1114                                "unknown testbed %s" % self.current_gateways)
1115                        self.current_gateways = None
1116                    return True
1117                else:
1118                    return False
1119            else:
1120                m = self.end_gateways.match(line)
1121                if m :
1122                    if m.group(1) != self.current_gateways:
1123                        raise service_error(service_error.internal,
1124                                "Mismatched gateway markers!?")
1125                    if self.control_gateway:
1126                        try:
1127                            cc = open("%s/%s/client.conf" %
1128                                    (self.tmpdir, self.current_gateways), 'w')
1129                            print >>cc, "ControlGateway: %s" % \
1130                                    self.control_gateway
1131                            if tbparams[self.master].has_key('smbshare'):
1132                                print >>cc, "SMBSHare: %s" % \
1133                                        tbparams[self.master]['smbshare']
1134                            print >>cc, "ProjectUser: %s" % \
1135                                    tbparams[self.master]['user']
1136                            print >>cc, "ProjectName: %s" % \
1137                                    tbparams[self.master]['project']
1138                            cc.close()
1139                        except IOError:
1140                            raise service_error(service_error.internal,
1141                                    "Error creating client config")
1142                        try:
1143                            cc = open("%s/%s/seer.conf" %
1144                                    (self.tmpdir, self.current_gateways),
1145                                    'w')
1146                            if self.current_gateways != self.master:
1147                                print >>cc, "ControlNode: %s" % \
1148                                        self.control_gateway
1149                            print >>cc, "ExperimentID: %s/%s" % \
1150                                    ( tbparams[self.master]['project'], \
1151                                    self.eid )
1152                            cc.close()
1153                        except IOError:
1154                            raise service_error(service_error.internal,
1155                                    "Error creating seer config")
1156                    else:
1157                        debug.error("[gateways]: No control gateway for %s" %\
1158                                    self.current_gateways)
1159                    self.current_gateways = None
1160                else:
1161                    dtb, myname, desthost, type = line.split(" ")
1162
1163                    if type == "control" or type == "both":
1164                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1165                                self.eid, 
1166                                tbparams[self.current_gateways]['project'],
1167                                tbparams[self.current_gateways]['domain'])
1168                    try:
1169                        active = self.gateway_conf_file(self.current_gateways,
1170                                self.master, self.eid, self.gw_pubkey_base,
1171                                self.gw_secretkey_base,
1172                                self.active_end, tbparams, dtb, myname,
1173                                desthost, type)
1174                    except IOError, e:
1175                        raise service_error(service_error.internal,
1176                                "Failed to write config file for %s" % \
1177                                        self.current_gateway)
1178           
1179                    gw_pubkey = "%s/keys/%s" % \
1180                            (self.tmpdir, self.gw_pubkey_base)
1181                    gw_secretkey = "%s/keys/%s" % \
1182                            (self.tmpdir, self.gw_secretkey_base)
1183
1184                    pkfile = "%s/%s/%s" % \
1185                            ( self.tmpdir, self.current_gateways, 
1186                                    self.gw_pubkey_base)
1187                    skfile = "%s/%s/%s" % \
1188                            ( self.tmpdir, self.current_gateways, 
1189                                    self.gw_secretkey_base)
1190
1191                    if not os.path.exists(pkfile):
1192                        try:
1193                            self.copy_file(gw_pubkey, pkfile)
1194                        except IOError:
1195                            service_error(service_error.internal,
1196                                    "Failed to copy pubkey file")
1197
1198                    if active and not os.path.exists(skfile):
1199                        try:
1200                            self.copy_file(gw_secretkey, skfile)
1201                        except IOError:
1202                            service_error(service_error.internal,
1203                                    "Failed to copy secretkey file")
1204                return True
1205
1206    class shunt_to_file:
1207        """
1208        Simple class to write data between two regexps to a file.
1209        """
1210        def __init__(self, begin, end, filename):
1211            """
1212            Begin shunting on a match of begin, stop on end, send data to
1213            filename.
1214            """
1215            self.begin = re.compile(begin)
1216            self.end = re.compile(end)
1217            self.in_shunt = False
1218            self.file = None
1219            self.filename = filename
1220
1221        def __call__(self, line):
1222            """
1223            Call this on each line in the input that may be shunted.
1224            """
1225            if not self.in_shunt:
1226                if self.begin.match(line):
1227                    self.in_shunt = True
1228                    try:
1229                        self.file = open(self.filename, "w")
1230                    except:
1231                        self.file = None
1232                        raise
1233                    return True
1234                else:
1235                    return False
1236            else:
1237                if self.end.match(line):
1238                    if self.file: 
1239                        self.file.close()
1240                        self.file = None
1241                    self.in_shunt = False
1242                else:
1243                    if self.file:
1244                        print >>self.file, line
1245                return True
1246
1247    class shunt_to_list:
1248        """
1249        Same interface as shunt_to_file.  Data collected in self.list, one list
1250        element per line.
1251        """
1252        def __init__(self, begin, end):
1253            self.begin = re.compile(begin)
1254            self.end = re.compile(end)
1255            self.in_shunt = False
1256            self.list = [ ]
1257       
1258        def __call__(self, line):
1259            if not self.in_shunt:
1260                if self.begin.match(line):
1261                    self.in_shunt = True
1262                    return True
1263                else:
1264                    return False
1265            else:
1266                if self.end.match(line):
1267                    self.in_shunt = False
1268                else:
1269                    self.list.append(line)
1270                return True
1271
1272    class shunt_to_string:
1273        """
1274        Same interface as shunt_to_file.  Data collected in self.str, all in
1275        one string.
1276        """
1277        def __init__(self, begin, end):
1278            self.begin = re.compile(begin)
1279            self.end = re.compile(end)
1280            self.in_shunt = False
1281            self.str = ""
1282       
1283        def __call__(self, line):
1284            if not self.in_shunt:
1285                if self.begin.match(line):
1286                    self.in_shunt = True
1287                    return True
1288                else:
1289                    return False
1290            else:
1291                if self.end.match(line):
1292                    self.in_shunt = False
1293                else:
1294                    self.str += line
1295                return True
1296
1297    def create_experiment(self, req, fid):
1298        """
1299        The external interface to experiment creation called from the
1300        dispatcher.
1301
1302        Creates a working directory, splits the incoming description using the
1303        splitter script and parses out the avrious subsections using the
1304        lcasses above.  Once each sub-experiment is created, use pooled threads
1305        to instantiate them and start it all up.
1306        """
1307        try:
1308            tmpdir = tempfile.mkdtemp(prefix="split-")
1309        except IOError:
1310            raise service_error(service_error.internal, "Cannot create tmp dir")
1311
1312        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1313        gw_secretkey_base = "fed.%s" % self.ssh_type
1314        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1315        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1316        tclfile = tmpdir + "/experiment.tcl"
1317        tbparams = { }
1318
1319        pid = "dummy"
1320        gid = "dummy"
1321        # XXX
1322        fail_soft = False
1323
1324        try:
1325            os.mkdir(tmpdir+"/keys")
1326        except OSError:
1327            raise service_error(service_error.internal,
1328                    "Can't make temporary dir")
1329
1330        req = req.get('CreateRequestBody', None)
1331        if not req:
1332            raise service_error(service_error.req,
1333                    "Bad request format (no CreateRequestBody)")
1334        # The tcl parser needs to read a file so put the content into that file
1335        file_content=req.get('experimentdescription', None)
1336        if file_content:
1337            try:
1338                f = open(tclfile, 'w')
1339                f.write(file_content)
1340                f.close()
1341            except IOError:
1342                raise service_error(service_error.internal,
1343                        "Cannot write temp experiment description")
1344        else:
1345            raise service_error(service_error.req, "No experiment description")
1346
1347        if req.has_key('experimentID') and \
1348                req['experimentID'].has_key('localname'):
1349            eid = req['experimentID']['localname']
1350            self.state_lock.acquire()
1351            while (self.state.has_key(eid)):
1352                eid += random.choice(string.ascii_letters)
1353            # To avoid another thread picking this localname
1354            self.state[eid] = "placeholder"
1355            self.state_lock.release()
1356        else:
1357            eid = self.exp_stem
1358            for i in range(0,5):
1359                eid += random.choice(string.ascii_letters)
1360            self.state_lock.acquire()
1361            while (self.state.has_key(eid)):
1362                eid = self.exp_stem
1363                for i in range(0,5):
1364                    eid += random.choice(string.ascii_letters)
1365            # To avoid another thread picking this localname
1366            self.state[eid] = "placeholder"
1367            self.state_lock.release()
1368
1369        try:
1370            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1371        except ValueError:
1372            raise service_error(service_error.server_config, 
1373                    "Bad key type (%s)" % self.ssh_type)
1374
1375        user = req.get('user', None)
1376        if user == None:
1377            raise service_error(service_error.req, "No user")
1378
1379        master = req.get('master', None)
1380        if master == None:
1381            raise service_error(service_error.req, "No master testbed label")
1382       
1383       
1384        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1385            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1386        tclparser = Popen(tclcmd, stdout=PIPE)
1387
1388        allocated = { }     # Testbeds we can access
1389        started = { }       # Testbeds where a sub-experiment started
1390                            # successfully
1391
1392        # Objects to parse the splitter output (defined above)
1393        parse_current_testbed = self.current_testbed(eid, tmpdir)
1394        parse_allbeds = self.allbeds(self.get_access)
1395        parse_gateways = self.gateways(eid, master, tmpdir,
1396                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1397        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1398                    "^#\s+End\s+Vtopo")
1399        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1400                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1401        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1402                "^#\s+End\s+tarfiles")
1403        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1404                "^#\s+End\s+rpms")
1405
1406        # Worling on the split data
1407        for line in tclparser.stdout:
1408            line = line.rstrip()
1409            if parse_current_testbed(line, master, allocated, tbparams):
1410                continue
1411            elif parse_allbeds(line, user, tbparams):
1412                continue
1413            elif parse_gateways(line, allocated, tbparams):
1414                continue
1415            elif parse_vtopo(line):
1416                continue
1417            elif parse_hostnames(line):
1418                continue
1419            elif parse_tarfiles(line):
1420                continue
1421            elif parse_rpms(line):
1422                continue
1423            else:
1424                raise service_error(service_error.internal, 
1425                        "Bad tcl parse? %s" % line)
1426
1427        # Virtual topology and visualization
1428        vtopo = self.gentopo(parse_vtopo.str)
1429        if not vtopo:
1430            raise service_error(service_error.internal, 
1431                    "Failed to generate virtual topology")
1432
1433        vis = self.genviz(vtopo)
1434        if not vis:
1435            raise service_error(service_error.internal, 
1436                    "Failed to generate visualization")
1437
1438        # save federant information
1439        for k in allocated.keys():
1440            tbparams[k]['federant'] = {\
1441                    'name': [ { 'localname' : eid} ],\
1442                    'emulab': tbparams[k]['emulab'],\
1443                    'master' : k == master,\
1444                }
1445
1446
1447        # Copy tarfiles and rpms needed at remote sites into a staging area
1448        try:
1449            for t in parse_tarfiles.list:
1450                if not os.path.exists("%s/tarfiles" % tmpdir):
1451                    os.mkdir("%s/tarfiles" % tmpdir)
1452                self.copy_file(t, "%s/tarfiles/%s" % \
1453                        (tmpdir, os.path.basename(t)))
1454            for r in parse_rpms.list:
1455                if not os.path.exists("%s/rpms" % tmpdir):
1456                    os.mkdir("%s/rpms" % tmpdir)
1457                self.copy_file(r, "%s/rpms/%s" % \
1458                        (tmpdir, os.path.basename(r)))
1459        except IOError, e:
1460            raise service_error(service_error.internal, 
1461                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1462
1463        thread_pool_info = self.thread_pool()
1464        threads = [ ]
1465
1466        for tb in [ k for k in allocated.keys() if k != master]:
1467            # Wait until we have a free slot to start the next testbed load
1468            thread_pool_info.acquire()
1469            while thread_pool_info.started - \
1470                    thread_pool_info.terminated >= self.nthreads:
1471                thread_pool_info.wait()
1472            thread_pool_info.release()
1473
1474            # Create and start a thread to start the segment, and save it to
1475            # get the return value later
1476            t  = self.pooled_thread(target=self.start_segment, 
1477                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1478                    pdata=thread_pool_info, trace_file=self.trace_file)
1479            threads.append(t)
1480            t.start()
1481
1482        # Wait until all finish (the first clause of the while is to make sure
1483        # one starts)
1484        thread_pool_info.acquire()
1485        while thread_pool_info.started == 0 or \
1486                thread_pool_info.started > thread_pool_info.terminated:
1487            thread_pool_info.wait()
1488        thread_pool_info.release()
1489
1490        # If none failed, start the master
1491        failed = [ t.getName() for t in threads if not t.rv ]
1492
1493        if len(failed) == 0:
1494            if not self.start_segment(master, eid, tbparams, tmpdir):
1495                failed.append(master)
1496
1497        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1498        # If one failed clean up, unless fail_soft is set
1499        if failed:
1500            if not fail_soft:
1501                for tb in succeeded:
1502                    self.stop_segment(tb, eid, tbparams)
1503                # Remove the placeholder
1504                self.state_lock.acquire()
1505                del self.state[eid]
1506                self.state_lock.release()
1507
1508                raise service_error(service_error.federant,
1509                    "Swap in failed on %s" % ",".join(failed))
1510        else:
1511            self.log.info("[start_segment]: Experiment %s started" % eid)
1512
1513        # Generate an ID for the experiment (slice) and a certificate that the
1514        # allocator can use to prove they own it.  We'll ship it back through
1515        # the encrypted connection.
1516        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1517
1518        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1519
1520        # Walk up tmpdir, deleting as we go
1521        for path, dirs, files in os.walk(tmpdir, topdown=False):
1522            for f in files:
1523                os.remove(os.path.join(path, f))
1524            for d in dirs:
1525                os.rmdir(os.path.join(path, d))
1526        os.rmdir(tmpdir)
1527
1528        resp = { 'federant' : [ tbparams[tb]['federant'] \
1529                for tb in tbparams.keys() \
1530                    if tbparams[tb].has_key('federant') ],\
1531                    'vtopo': vtopo,\
1532                    'vis' : vis,
1533                    'experimentID' : [\
1534                            { 'fedid': copy.copy(expid) }, \
1535                            { 'localname': eid },\
1536                        ],\
1537                    'experimentAccess': { 'X509' : expcert },\
1538                }
1539
1540        # Insert the experiment into our state and update the disk copy
1541        self.state_lock.acquire()
1542        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1543                for tb in tbparams.keys() \
1544                    if tbparams[tb].has_key('federant') ],\
1545                    'vtopo': vtopo,\
1546                    'vis' : vis,
1547                    'experimentID' : [\
1548                            { 'fedid': expid }, { 'localname': eid },\
1549                        ],\
1550                }
1551        self.state[eid] = self.state[expid]
1552        if self.state_filename: self.write_state()
1553        self.state_lock.release()
1554
1555        if not failed:
1556            return resp
1557        else:
1558            raise service_error(service_error.partial, \
1559                    "Partial swap in on %s" % ",".join(succeeded))
1560
1561
1562    def get_vtopo(self, req, fid):
1563        """
1564        Return the stored virtual topology for this experiment
1565        """
1566        rv = None
1567
1568        req = req.get('VtopoRequestBody', None)
1569        if not req:
1570            raise service_error(service_error.req,
1571                    "Bad request format (no VtopoRequestBody)")
1572        exp = req.get('experiment', None)
1573        if exp:
1574            if exp.has_key('fedid'):
1575                key = fedid(bits=exp['fedid'])
1576                keytype = "fedid"
1577            elif exp.has_key('localname'):
1578                key = exp['localname']
1579                keytype = "localname"
1580            else:
1581                raise service_error(service_error.req, "Unknown lookup type")
1582        else:
1583            raise service_error(service_error.req, "No request?")
1584
1585        self.state_lock.acquire()
1586        if self.state.has_key(key):
1587            rv = { 'experiment' : {keytype: key },\
1588                    'vtopo': self.state[key]['vtopo'],\
1589                }
1590        self.state_lock.release()
1591
1592        if rv: return rv
1593        else: raise service_error(service_error.req, "No such experiment")
1594
1595    def get_vis(self, req, fid):
1596        """
1597        Return the stored visualization for this experiment
1598        """
1599        rv = None
1600
1601        req = req.get('VisRequestBody', None)
1602        if not req:
1603            raise service_error(service_error.req,
1604                    "Bad request format (no VisRequestBody)")
1605        exp = req.get('experiment', None)
1606        if exp:
1607            if exp.has_key('fedid'):
1608                key = fedid(bits=exp['fedid'])
1609                keytype = "fedid"
1610            elif exp.has_key('localname'):
1611                key = exp['localname']
1612                keytype = "localname"
1613            else:
1614                raise service_error(service_error.req, "Unknown lookup type")
1615        else:
1616            raise service_error(service_error.req, "No request?")
1617
1618        self.state_lock.acquire()
1619        if self.state.has_key(key):
1620            rv =  { 'experiment' : {keytype: key },\
1621                    'vis': self.state[key]['vis'],\
1622                    }
1623        self.state_lock.release()
1624
1625        if rv: return rv
1626        else: raise service_error(service_error.req, "No such experiment")
1627
1628    def get_info(self, req, fid):
1629        """
1630        Return all the stored info about this experiment
1631        """
1632        rv = None
1633
1634        req = req.get('InfoRequestBody', None)
1635        if not req:
1636            raise service_error(service_error.req,
1637                    "Bad request format (no VisRequestBody)")
1638        exp = req.get('experiment', None)
1639        if exp:
1640            if exp.has_key('fedid'):
1641                key = fedid(bits=exp['fedid'])
1642                keytype = "fedid"
1643            elif exp.has_key('localname'):
1644                key = exp['localname']
1645                keytype = "localname"
1646            else:
1647                raise service_error(service_error.req, "Unknown lookup type")
1648        else:
1649            raise service_error(service_error.req, "No request?")
1650
1651        # The state may be massaged by the service function that called
1652        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1653        # state.
1654        self.state_lock.acquire()
1655        if self.state.has_key(key):
1656            rv = copy.deepcopy(self.state[key])
1657        self.state_lock.release()
1658
1659        if rv: return rv
1660        else: raise service_error(service_error.req, "No such experiment")
1661
1662
1663    def terminate_experiment(self, req, fid):
1664        """
1665        Swap this experiment out on the federants and delete the shared
1666        information
1667        """
1668        tbparams = { }
1669        req = req.get('TerminateRequestBody', None)
1670        if not req:
1671            raise service_error(service_error.req,
1672                    "Bad request format (no TerminateRequestBody)")
1673        exp = req.get('experiment', None)
1674        if exp:
1675            if exp.has_key('fedid'):
1676                key = fedid(bits=exp['fedid'])
1677                keytype = "fedid"
1678            elif exp.has_key('localname'):
1679                key = exp['localname']
1680                keytype = "localname"
1681            else:
1682                raise service_error(service_error.req, "Unknown lookup type")
1683        else:
1684            raise service_error(service_error.req, "No request?")
1685
1686        self.state_lock.acquire()
1687        fed_exp = self.state.get(key, None)
1688
1689        if fed_exp:
1690            # This branch of the conditional holds the lock to generate a
1691            # consistent temporary tbparams variable to deallocate experiments.
1692            # It releases the lock to do the deallocations and reacquires it to
1693            # remove the experiment state when the termination is complete.
1694            ids = []
1695            #  experimentID is a list of dicts that are self-describing
1696            #  identifiers.  This finds all the fedids and localnames - the
1697            #  keys of self.state - and puts them into ids.
1698            for id in fed_exp.get('experimentID', []):
1699                if id.has_key('fedid'): ids.append(id['fedid'])
1700                if id.has_key('localname'): ids.append(id['localname'])
1701
1702            # Construct enough of the tbparams to make the stop_segment calls
1703            # work
1704            for fed in fed_exp['federant']:
1705                try:
1706                    for e in fed['name']:
1707                        eid = e.get('localname', None)
1708                        if eid: break
1709                    else:
1710                        continue
1711
1712                    p = fed['emulab']['project']
1713
1714                    project = p['name']['localname']
1715                    tb = p['testbed']['localname']
1716                    user = p['user'][0]['userID']['localname']
1717
1718                    domain = fed['emulab']['domain']
1719                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1720                except KeyError, e:
1721                    continue
1722                tbparams[tb] = {\
1723                        'user': user,\
1724                        'domain': domain,\
1725                        'project': project,\
1726                        'host': host,\
1727                        'eid': eid,\
1728                    }
1729            self.state_lock.release()
1730
1731            # Stop everyone.
1732            for tb in tbparams.keys():
1733                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1734
1735            # Remove teh terminated experiment
1736            self.state_lock.acquire()
1737            for id in ids:
1738                if self.state.has_key(id): del self.state[id]
1739
1740            if self.state_filename: self.write_state()
1741            self.state_lock.release()
1742
1743            return { 'experiment': exp }
1744        else:
1745            # Don't forget to release the lock
1746            self.state_lock.release()
1747            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.