source: fedd/fedd_experiment_control.py @ 0b466d1

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 0b466d1 was 0b466d1, checked in by Ted Faber <faber@…>, 16 years ago

logging

  • Property mode set to 100644
File size: 51.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
48        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
49        "fed_evrepeater", "rc.accounts.patch"]
50   
51    class thread_pool:
52        """
53        A class to keep track of a set of threads all invoked for the same
54        task.  Manages the mutual exclusion of the states.
55        """
56        def __init__(self):
57            """
58            Start a pool.
59            """
60            self.changed = Condition()
61            self.started = 0
62            self.terminated = 0
63
64        def acquire(self):
65            """
66            Get the pool's lock.
67            """
68            self.changed.acquire()
69
70        def release(self):
71            """
72            Release the pool's lock.
73            """
74            self.changed.release()
75
76        def wait(self, timeout = None):
77            """
78            Wait for a pool thread to start or stop.
79            """
80            self.changed.wait(timeout)
81
82        def start(self):
83            """
84            Called by a pool thread to report starting.
85            """
86            self.changed.acquire()
87            self.started += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def terminate(self):
92            """
93            Called by a pool thread to report finishing.
94            """
95            self.changed.acquire()
96            self.terminated += 1
97            self.changed.notifyAll()
98            self.changed.release()
99
100        def clear(self):
101            """
102            Clear all pool data.
103            """
104            self.changed.acquire()
105            self.started = 0
106            self.terminated =0
107            self.changed.notifyAll()
108            self.changed.release()
109
110    class pooled_thread(Thread):
111        """
112        One of a set of threads dedicated to a specific task.  Uses the
113        thread_pool class above for coordination.
114        """
115        def __init__(self, group=None, target=None, name=None, args=(), 
116                kwargs={}, pdata=None, trace_file=None):
117            Thread.__init__(self, group, target, name, args, kwargs)
118            self.rv = None          # Return value of the ops in this thread
119            self.exception = None   # Exception that terminated this thread
120            self.target=target      # Target function to run on start()
121            self.args = args        # Args to pass to target
122            self.kwargs = kwargs    # Additional kw args
123            self.pdata = pdata      # thread_pool for this class
124            # Logger for this thread
125            self.log = logging.getLogger("fedd.experiment_control")
126       
127        def run(self):
128            """
129            Emulate Thread.run, except add pool data manipulation and error
130            logging.
131            """
132            if self.pdata:
133                self.pdata.start()
134
135            if self.target:
136                try:
137                    self.rv = self.target(*self.args, **self.kwargs)
138                except service_error, s:
139                    self.exception = s
140                    self.log.error("Thread exception: %s %s" % \
141                            (s.code_string(), s.desc))
142                except:
143                    self.exception = sys.exc_info()[1]
144                    self.log.error(("Unexpected thread exception: %s" +\
145                            "Trace %s") % (self.exception,\
146                                traceback.format_exc()))
147            if self.pdata:
148                self.pdata.terminate()
149
150    def __init__(self, config=None):
151        """
152        Intialize the various attributes, most from the config object
153        """
154        self.scripts = fedd_experiment_control_local.scripts
155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
156        self.thread_pool = fedd_experiment_control_local.thread_pool
157
158        self.cert_file = None
159        self.cert_pwd = None
160        self.trusted_certs = None
161
162        # Walk through the various relevant certificat specifying config
163        # attributes until the local certificate attributes can be resolved.
164        # The walk is from most specific to most general specification.
165        for p in ("create_experiment_", "proxy_", ""):
166            filen = "%scert_file" % p
167            pwn = "%scert_pwd" % p
168            trustn = "%strusted_certs" % p
169
170            if getattr(config, filen, None):
171                if not self.cert_file:
172                    self.cert_file = getattr(config, filen, None)
173                    self.cert_pwd = getattr(config, pwn, None)
174
175            if getattr(config, trustn, None):
176                if not self.trusted_certs:
177                    self.trusted_certs = getattr(config, trustn, None)
178
179        self.exp_stem = "fed-stem"
180        self.debug = config.create_debug
181        self.log = logging.getLogger("fedd.experiment_control")
182        self.muxmax = 2
183        self.nthreads = 2
184        self.randomize_experiments = False
185        self.scp_exec = "/usr/bin/scp"
186        self.scripts_dir = "/users/faber/testbed/federation"
187        self.splitter = None
188        self.ssh_exec="/usr/bin/ssh"
189        self.ssh_keygen = "/usr/bin/ssh-keygen"
190        self.ssh_identity_file = None
191        # XXX
192        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
193        self.ssh_type = "rsa"
194        self.state = { }
195        self.state_filename = config.experiment_state_file
196        self.state_lock = Lock()
197        self.tclsh = "/usr/local/bin/otclsh"
198        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
199        self.tbmap = { 
200                'deter':'https://users.isi.deterlab.net:23235',
201                'emulab':'https://users.isi.deterlab.net:23236',
202                'ucb':'https://users.isi.deterlab.net:23237',
203                }
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
208        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
209        self.def_gwstart = \
210                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
211        self.def_mgwstart = \
212                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
213        self.def_gwimage = "FBSD61-TUNNEL2";
214        self.def_gwtype = "pc";
215
216
217        if self.ssh_pubkey_file:
218            try:
219                f = open(self.ssh_pubkey_file, 'r')
220                self.ssh_pubkey = f.read()
221                f.close()
222            except IOError:
223                raise service_error(service_error.internal,
224                        "Cannot read sshpubkey")
225
226        # Set the logging level to the value passed in.  The getattr slieght of
227        # hand finds the logging level constant corrersponding to the string.
228        # We're a little paranoid to avoid user mayhem.
229        if config.experiment_log:
230            try:
231                level = int(getattr(logging, config.experiment_log.upper(),-1))
232
233                if  logging.DEBUG <= level <= logging.CRITICAL:
234                    self.log.setLevel(level)
235                else:
236                    self.log.error("Bad experiment_log value: %s" % \
237                            config.experiment_log)
238
239            except ValueError:
240                self.log.error("Bad experiment_log value: %s" % \
241                        config.experiment_log)
242
243        # Grab saved state.  OK to do this w/o locking because it's read only
244        # and only one thread should be in existence that can see self.state at
245        # this point.
246        if self.state_filename:
247            self.read_state()
248
249        # Confirm federation scripts in the right place
250        for s in self.scripts:
251            if not os.path.exists(self.scripts_dir + "/" + s):
252                raise service_error(service_error.server_config,
253                        "%s/%s not in local script dir" % (self.scripts_dir, s))
254
255        # Dispatch tables
256        self.soap_services = {\
257                'Create': make_soap_handler(\
258                        CreateRequestMessage.typecode,
259                        getattr(self, "create_experiment"), 
260                        CreateResponseMessage,
261                        "CreateResponseBody"),
262                'Vtopo': make_soap_handler(\
263                        VtopoRequestMessage.typecode,
264                        getattr(self, "get_vtopo"),
265                        VtopoResponseMessage,
266                        "VtopoResponseBody"),
267                'Vis': make_soap_handler(\
268                        VisRequestMessage.typecode,
269                        getattr(self, "get_vis"),
270                        VisResponseMessage,
271                        "VisResponseBody"),
272                'Info': make_soap_handler(\
273                        InfoRequestMessage.typecode,
274                        getattr(self, "get_info"),
275                        InfoResponseMessage,
276                        "InfoResponseBody"),
277                'Terminate': make_soap_handler(\
278                        TerminateRequestMessage.typecode,
279                        getattr(self, "terminate_experiment"),
280                        TerminateResponseMessage,
281                        "TerminateResponseBody"),
282        }
283
284        self.xmlrpc_services = {\
285                'Create': make_xmlrpc_handler(\
286                        getattr(self, "create_experiment"), 
287                        "CreateResponseBody"),
288                'Vtopo': make_xmlrpc_handler(\
289                        getattr(self, "get_vtopo"),
290                        "VtopoResponseBody"),
291                'Vis': make_xmlrpc_handler(\
292                        getattr(self, "get_vis"),
293                        "VisResponseBody"),
294                'Info': make_xmlrpc_handler(\
295                        getattr(self, "get_info"),
296                        "InfoResponseBody"),
297                'Terminate': make_xmlrpc_handler(\
298                        getattr(self, "terminate_experiment"),
299                        "TerminateResponseBody"),
300        }
301
302    def copy_file(self, src, dest, size=1024):
303        """
304        Exceedingly simple file copy.
305        """
306        s = open(src,'r')
307        d = open(dest, 'w')
308
309        buf = "x"
310        while buf != "":
311            buf = s.read(size)
312            d.write(buf)
313        s.close()
314        d.close()
315
316    # Call while holding self.state_lock
317    def write_state(self):
318        """
319        Write a new copy of experiment state after copying the existing state
320        to a backup.
321
322        State format is a simple pickling of the state dictionary.
323        """
324        if os.access(self.state_filename, os.W_OK):
325            self.copy_file(self.state_filename, \
326                    "%s.bak" % self.state_filename)
327        try:
328            f = open(self.state_filename, 'w')
329            pickle.dump(self.state, f)
330        except IOError, e:
331            self.log.error("Can't write file %s: %s" % \
332                    (self.state_filename, e))
333        except pickle.PicklingError, e:
334            self.log.error("Pickling problem: %s" % e)
335
336    # Call while holding self.state_lock
337    def read_state(self):
338        """
339        Read a new copy of experiment state.  Old state is overwritten.
340
341        State format is a simple pickling of the state dictionary.
342        """
343        try:
344            f = open(self.state_filename, "r")
345            self.state = pickle.load(f)
346        except IOError, e:
347            self.log.warning("No saved state: Can't open %s: %s" % \
348                    (self.state_filename, e))
349        except pickle.UnpicklingError, e:
350            self.log.warning("No saved state: Unpickling failed: %s" % e)
351
352    def scp_file(self, file, user, host, dest=""):
353        """
354        scp a file to the remote host.  If debug is set the action is only
355        logged.
356        """
357
358        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
359        rv = 0
360
361        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
362        if not self.debug:
363            rv = call(scp_cmd, stdout=trace, stderr=trace)
364
365        return rv == 0
366
367    def ssh_cmd(self, user, host, cmd, wname=None):
368        """
369        Run a remote command on host as user.  If debug is set, the action is
370        only logged.
371        """
372        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
373
374        self.log.debug("[ssh_cmd]: %s" % sh_str)
375        if not self.debug:
376            sub = Popen(sh_str, shell=True, stdout=trace, stderr=trace)
377            return sub.wait() == 0
378        else:
379            return True
380
381    def ship_scripts(self, host, user, dest_dir):
382        """
383        Copy the federation scripts (fedkit) to the a federant.
384        """
385        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
386            for s in self.scripts:
387                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
388                        user, host, dest_dir):
389                    return False
390            return True
391        else:
392            return False
393
394    def ship_configs(self, host, user, src_dir, dest_dir):
395        """
396        Copy federant-specific configuration files to the federant.
397        """
398        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
399            return False
400        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
401            return False
402
403        for f in os.listdir(src_dir):
404            if os.path.isdir(f):
405                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
406                        "%s/%s" % (dest_dir, f)):
407                    return False
408            else:
409                if not self.scp_file("%s/%s" % (src_dir, f), 
410                        user, host, dest_dir):
411                    return False
412        return True
413
414    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
415        """
416        Start a sub-experiment on a federant.
417
418        Get the current state, modify or create as appropriate, ship data and
419        configs and start the experiment.  There are small ordering differences
420        based on the initial state of the sub-experiment.
421        """
422        # ops node in the federant
423        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
424        user = tbparams[tb]['user']     # federant user
425        pid = tbparams[tb]['project']   # federant project
426        # XXX
427        base_confs = ( "hosts",)
428        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
429        # command to test experiment state
430        expinfo_exec = "/usr/testbed/bin/expinfo" 
431        # Configuration directories on the remote machine
432        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
433        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
434        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
435        # Regular expressions to parse the expinfo response
436        state_re = re.compile("State:\s+(\w+)")
437        no_exp_re = re.compile("^No\s+such\s+experiment")
438        state = None    # Experiment state parsed from expinfo
439        # The expinfo ssh command
440        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
441
442        # Get status
443        self.log.debug("[start_segment]: %s"% " ".join(cmd))
444        dev_null = None
445        try:
446            dev_null = open("/dev/null", "a")
447        except IOError, e:
448            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
449
450        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
451        for line in status.stdout:
452            m = state_re.match(line)
453            if m: state = m.group(1)
454            else:
455                m = no_exp_re.match(line)
456                if m: state = "none"
457        rv = status.wait()
458
459        # If the experiment is not present the subcommand returns a non-zero
460        # return value.  If we successfully parsed a "none" outcome, ignore the
461        # return code.
462        if rv != 0 and state != "none":
463            raise service_error(service_error.internal,
464                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
465
466        self.log.debug("[start_segment]: %s: %s" % (tb, state))
467        self.log.info("[start_segment]:transferring experiment to %s" % tb)
468
469        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
470            return False
471        # Clear the federation files
472        if not self.ssh_cmd(user, host, 
473                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
474            return False
475        if not self.ssh_cmd(user, host, 
476                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
477            return False
478        # Clear and create the tarfiles and rpm directories
479        for d in (tarfiles_dir, rpms_dir):
480            if not self.ssh_cmd(user, host, 
481                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
482                return False
483            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
484                    "create tarfiles"):
485                return False
486       
487        if state == 'active':
488            # Remote experiment is active.  Modify it.
489            for f in base_confs:
490                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
491                        "%s/%s" % (proj_dir, f)):
492                    return False
493            if not self.ship_scripts(host, user, proj_dir):
494                return False
495            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
496                    proj_dir):
497                return False
498            if os.path.isdir("%s/tarfiles" % tmpdir):
499                if not self.ship_configs(host, user,
500                        "%s/tarfiles" % tmpdir, tarfiles_dir):
501                    return False
502            if os.path.isdir("%s/rpms" % tmpdir):
503                if not self.ship_configs(host, user,
504                        "%s/rpms" % tmpdir, tarfiles_dir):
505                    return False
506            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
507            if not self.ssh_cmd(user, host,
508                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
509                            (pid, eid, tclfile), "modexp"):
510                return False
511            return True
512        elif state == "swapped":
513            # Remote experiment swapped out.  Modify it and swap it in.
514            for f in base_confs:
515                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
516                        "%s/%s" % (proj_dir, f)):
517                    return False
518            if not self.ship_scripts(host, user, proj_dir):
519                return False
520            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
521                    proj_dir):
522                return False
523            if os.path.isdir("%s/tarfiles" % tmpdir):
524                if not self.ship_configs(host, user,
525                        "%s/tarfiles" % tmpdir, tarfiles_dir):
526                    return False
527            if os.path.isdir("%s/rpms" % tmpdir):
528                if not self.ship_configs(host, user,
529                        "%s/rpms" % tmpdir, tarfiles_dir):
530                    return False
531            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
532            if not self.ssh_cmd(user, host,
533                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
534                    "modexp"):
535                return False
536            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
537            if not self.ssh_cmd(user, host,
538                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
539                    "swapexp"):
540                return False
541            return True
542        elif state == "none":
543            # No remote experiment.  Create one.  We do this in 2 steps so we
544            # can put the configuration files and scripts into the new
545            # experiment directories.
546
547            # Tarfiles must be present for creation to work
548            if os.path.isdir("%s/tarfiles" % tmpdir):
549                if not self.ship_configs(host, user,
550                        "%s/tarfiles" % tmpdir, tarfiles_dir):
551                    return False
552            if os.path.isdir("%s/rpms" % tmpdir):
553                if not self.ship_configs(host, user,
554                        "%s/rpms" % tmpdir, tarfiles_dir):
555                    return False
556            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
557            if not self.ssh_cmd(user, host,
558                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
559                            (pid, eid, tclfile), "startexp"):
560                return False
561            # After startexp the per-experiment directories exist
562            for f in base_confs:
563                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
564                        "%s/%s" % (proj_dir, f)):
565                    return False
566            if not self.ship_scripts(host, user, proj_dir):
567                return False
568            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
569                    proj_dir):
570                return False
571            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
572            if not self.ssh_cmd(user, host,
573                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
574                    "swapexp"):
575                return False
576            return True
577        else:
578            self.log.debug("[start_segment]:unknown state %s" % state)
579            return False
580
581    def stop_segment(self, tb, eid, tbparams):
582        """
583        Stop a sub experiment by calling swapexp on the federant
584        """
585        user = tbparams[tb]['user']
586        host = tbparams[tb]['host']
587        pid = tbparams[tb]['project']
588
589        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
590        return self.ssh_cmd(user, host,
591                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
592
593       
594    def generate_ssh_keys(self, dest, type="rsa" ):
595        """
596        Generate a set of keys for the gateways to use to talk.
597
598        Keys are of type type and are stored in the required dest file.
599        """
600        valid_types = ("rsa", "dsa")
601        t = type.lower();
602        if t not in valid_types: raise ValueError
603        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
604
605        try:
606            trace = open("/dev/null", "w")
607        except IOError:
608            raise service_error(service_error.internal,
609                    "Cannot open /dev/null??");
610
611        # May raise CalledProcessError
612        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
613        rv = call(cmd, stdout=trace, stderr=trace)
614        if rv != 0:
615            raise service_error(service_error.internal, 
616                    "Cannot generate nonce ssh keys.  %s return code %d" \
617                            % (self.ssh_keygen, rv))
618
619    def gentopo(self, str):
620        """
621        Generate the topology dtat structure from the splitter's XML
622        representation of it.
623
624        The topology XML looks like:
625            <experiment>
626                <nodes>
627                    <node><vname></vname><ips>ip1:ip2</ips></node>
628                </nodes>
629                <lans>
630                    <lan>
631                        <vname></vname><vnode></vnode><ip></ip>
632                        <bandwidth></bandwidth><member>node:port</member>
633                    </lan>
634                </lans>
635        """
636        class topo_parse:
637            """
638            Parse the topology XML and create the dats structure.
639            """
640            def __init__(self):
641                # Typing of the subelements for data conversion
642                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
643                self.int_subelements = ( 'bandwidth',)
644                self.float_subelements = ( 'delay',)
645                # The final data structure
646                self.nodes = [ ]
647                self.lans =  [ ]
648                self.topo = { \
649                        'node': self.nodes,\
650                        'lan' : self.lans,\
651                    }
652                self.element = { }  # Current element being created
653                self.chars = ""     # Last text seen
654
655            def end_element(self, name):
656                # After each sub element the contents is added to the current
657                # element or to the appropriate list.
658                if name == 'node':
659                    self.nodes.append(self.element)
660                    self.element = { }
661                elif name == 'lan':
662                    self.lans.append(self.element)
663                    self.element = { }
664                elif name in self.str_subelements:
665                    self.element[name] = self.chars
666                    self.chars = ""
667                elif name in self.int_subelements:
668                    self.element[name] = int(self.chars)
669                    self.chars = ""
670                elif name in self.float_subelements:
671                    self.element[name] = float(self.chars)
672                    self.chars = ""
673
674            def found_chars(self, data):
675                self.chars += data.rstrip()
676
677
678        tp = topo_parse();
679        parser = xml.parsers.expat.ParserCreate()
680        parser.EndElementHandler = tp.end_element
681        parser.CharacterDataHandler = tp.found_chars
682
683        parser.Parse(str)
684
685        return tp.topo
686       
687
688    def genviz(self, topo):
689        """
690        Generate the visualization the virtual topology
691        """
692
693        neato = "/usr/local/bin/neato"
694        # These are used to parse neato output and to create the visualization
695        # file.
696        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
697        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
698                "%s</type></node>"
699
700        try:
701            # Node names
702            nodes = [ n['vname'] for n in topo['node'] ]
703            topo_lans = topo['lan']
704        except KeyError:
705            raise service_error(service_error.internal, "Bad topology")
706
707        lans = { }
708        links = { }
709
710        # Walk through the virtual topology, organizing the connections into
711        # 2-node connections (links) and more-than-2-node connections (lans).
712        # When a lan is created, it's added to the list of nodes (there's a
713        # node in the visualization for the lan).
714        for l in topo_lans:
715            if links.has_key(l['vname']):
716                if len(links[l['vname']]) < 2:
717                    links[l['vname']].append(l['vnode'])
718                else:
719                    nodes.append(l['vname'])
720                    lans[l['vname']] = links[l['vname']]
721                    del links[l['vname']]
722                    lans[l['vname']].append(l['vnode'])
723            elif lans.has_key(l['vname']):
724                lans[l['vname']].append(l['vnode'])
725            else:
726                links[l['vname']] = [ l['vnode'] ]
727
728
729        # Open up a temporary file for dot to turn into a visualization
730        try:
731            df, dotname = tempfile.mkstemp()
732            dotfile = os.fdopen(df, 'w')
733        except IOError:
734            raise service_error(service_error.internal,
735                    "Failed to open file in genviz")
736
737        # Generate a dot/neato input file from the links, nodes and lans
738        try:
739            print >>dotfile, "graph G {"
740            for n in nodes:
741                print >>dotfile, '\t"%s"' % n
742            for l in links.keys():
743                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
744            for l in lans.keys():
745                for n in lans[l]:
746                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
747            print >>dotfile, "}"
748            dotfile.close()
749        except TypeError:
750            raise service_error(service_error.internal,
751                    "Single endpoint link in vtopo")
752        except IOError:
753            raise service_error(service_error.internal, "Cannot write dot file")
754
755        # Use dot to create a visualization
756        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
757                '-Gpack=true', dotname], stdout=PIPE)
758
759        # Translate dot to vis format
760        vis_nodes = [ ]
761        vis = { 'node': vis_nodes }
762        for line in dot.stdout:
763            m = vis_re.match(line)
764            if m:
765                vn = m.group(1)
766                vis_node = {'name': vn, \
767                        'x': float(m.group(2)),\
768                        'y' : float(m.group(3)),\
769                    }
770                if vn in links.keys() or vn in lans.keys():
771                    vis_node['type'] = 'lan'
772                else:
773                    vis_node['type'] = 'node'
774                vis_nodes.append(vis_node)
775        rv = dot.wait()
776
777        os.remove(dotname)
778        if rv == 0 : return vis
779        else: return None
780
781
782    def get_access(self, tb, nodes, user, tbparam):
783        """
784        Get access to testbed through fedd and set the parameters for that tb
785        """
786
787        translate_attr = {
788            'slavenodestartcmd': 'expstart',
789            'slaveconnectorstartcmd': 'gwstart',
790            'masternodestartcmd': 'mexpstart',
791            'masterconnectorstartcmd': 'mgwstart',
792            'connectorimage': 'gwimage',
793            'connectortype': 'gwtype',
794            'tunnelcfg': 'tun',
795            'smbshare': 'smbshare',
796        }
797
798        # XXX multi-level access
799        uri = self.tbmap.get(tb, None)
800        if not uri:
801            raise service_error(serice_error.server_config, 
802                    "Unknown testbed: %s" % tb)
803
804        # The basic request
805        req = {\
806                'destinationTestbed' : { 'uri' : uri },
807                'user':  user,
808                'allocID' : { 'localname': 'test' },
809                'access' : [ { 'sshPubkey' : self.ssh_pubkey } ]
810            }
811       
812        # node resources if any
813        if nodes != None and len(nodes) > 0:
814            rnodes = [ ]
815            for n in nodes:
816                rn = { }
817                image, hw, count = n.split(":")
818                if image: rn['image'] = [ image ]
819                if hw: rn['hardware'] = [ hw ]
820                if count: rn['count'] = int(count)
821                rnodes.append(rn)
822            req['resources']= { }
823            req['resources']['node'] = rnodes
824
825        # No retry loop here.  Proxy servers must correctly authenticate
826        # themselves without help
827
828        try:
829            ctx = fedd_ssl_context(self.cert_file, 
830                    self.trusted_certs, password=self.cert_pwd)
831        except SSL.SSLError:
832            raise service_error(service_error.server_config, 
833                    "Server certificates misconfigured")
834
835        loc = feddServiceLocator();
836        port = loc.getfeddPortType(uri,
837                transport=M2Crypto.httpslib.HTTPSConnection, 
838                transdict={ 'ssl_context' : ctx })
839
840        # Reconstruct the full request message
841        msg = RequestAccessRequestMessage()
842        msg.set_element_RequestAccessRequestBody(
843                pack_soap(msg, "RequestAccessRequestBody", req))
844
845        try:
846            resp = port.RequestAccess(msg)
847        except ZSI.ParseException, e:
848            raise service_error(service_error.req,
849                    "Bad format message (XMLRPC??): %s" %
850                    str(e))
851        r = unpack_soap(resp)
852
853        if r.has_key('RequestAccessResponseBody'):
854            r = r['RequestAccessResponseBody']
855        else:
856            raise service_error(service_error.proxy,
857                    "Bad proxy response")
858
859
860        e = r['emulab']
861        p = e['project']
862        tbparam[tb] = { 
863                "boss": e['boss'],
864                "host": e['ops'],
865                "domain": e['domain'],
866                "fs": e['fileServer'],
867                "eventserver": e['eventServer'],
868                "project": unpack_id(p['name']),
869                "emulab" : e
870                }
871        # Make the testbed name be the label the user applied
872        p['testbed'] = {'localname': tb }
873
874        for u in p['user']:
875            tbparam[tb]['user'] = unpack_id(u['userID'])
876
877        for a in e['fedAttr']:
878            if a['attribute']:
879                key = translate_attr.get(a['attribute'].lower(), None)
880                if key:
881                    tbparam[tb][key]= a['value']
882       
883    class current_testbed:
884        """
885        Object for collecting the current testbed description.  The testbed
886        description is saved to a file with the local testbed variables
887        subsittuted line by line.
888        """
889        def __init__(self, eid, tmpdir):
890            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
891            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
892            self.current_testbed = None
893            self.testbed_file = None
894
895            self.def_expstart = \
896                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
897            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
898            self.def_gwstart = \
899                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
900            self.def_mgwstart = \
901                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
902            self.def_gwimage = "FBSD61-TUNNEL2";
903            self.def_gwtype = "pc";
904
905            self.eid = eid
906            self.tmpdir = tmpdir
907
908        def __call__(self, line, master, allocated, tbparams):
909            # Capture testbed topology descriptions
910            if self.current_testbed == None:
911                m = self.begin_testbed.match(line)
912                if m != None:
913                    self.current_testbed = m.group(1)
914                    if self.current_testbed == None:
915                        raise service_error(service_error.req,
916                                "Bad request format (unnamed testbed)")
917                    allocated[self.current_testbed] = \
918                            allocated.get(self.current_testbed,0) + 1
919                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
920                    if not os.path.exists(tb_dir):
921                        try:
922                            os.mkdir(tb_dir)
923                        except IOError:
924                            raise service_error(service_error.internal,
925                                    "Cannot create %s" % tb_dir)
926                    try:
927                        self.testbed_file = open("%s/%s.%s.tcl" %
928                                (tb_dir, self.eid, self.current_testbed), 'w')
929                    except IOError:
930                        self.testbed_file = None
931                    return True
932                else: return False
933            else:
934                m = self.end_testbed.match(line)
935                if m != None:
936                    if m.group(1) != self.current_testbed:
937                        raise service_error(service_error.internal, 
938                                "Mismatched testbed markers!?")
939                    if self.testbed_file != None: 
940                        self.testbed_file.close()
941                        self.testbed_file = None
942                    self.current_testbed = None
943                elif self.testbed_file:
944                    # Substitute variables and put the line into the local
945                    # testbed file.
946                    gwtype = tbparams[self.current_testbed].get('gwtype', 
947                            self.def_gwtype)
948                    gwimage = tbparams[self.current_testbed].get('gwimage', 
949                            self.def_gwimage)
950                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
951                            self.def_mgwstart)
952                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
953                            self.def_mexpstart)
954                    gwstart = tbparams[self.current_testbed].get('gwstart', 
955                            self.def_gwstart)
956                    expstart = tbparams[self.current_testbed].get('expstart', 
957                            self.def_expstart)
958                    project = tbparams[self.current_testbed].get('project')
959                    line = re.sub("GWTYPE", gwtype, line)
960                    line = re.sub("GWIMAGE", gwimage, line)
961                    if self.current_testbed == master:
962                        line = re.sub("GWSTART", mgwstart, line)
963                        line = re.sub("EXPSTART", mexpstart, line)
964                    else:
965                        line = re.sub("GWSTART", gwstart, line)
966                        line = re.sub("EXPSTART", expstart, line)
967                    # XXX: does `` embed without doing enything else?
968                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
969                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
970                    line = re.sub("EID", self.eid, line)
971                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
972                            (project, self.eid), line)
973                    print >>self.testbed_file, line
974                return True
975
976    class allbeds:
977        """
978        Process the Allbeds section.  Get access to each federant and save the
979        parameters in tbparams
980        """
981        def __init__(self, get_access):
982            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
983            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
984            self.in_allbeds = False
985            self.get_access = get_access
986
987        def __call__(self, line, user, tbparams):
988            # Testbed access parameters
989            if not self.in_allbeds:
990                if self.begin_allbeds.match(line):
991                    self.in_allbeds = True
992                    return True
993                else:
994                    return False
995            else:
996                if self.end_allbeds.match(line):
997                    self.in_allbeds = False
998                else:
999                    nodes = line.split('|')
1000                    tb = nodes.pop(0)
1001                    self.get_access(tb, nodes, user, tbparams)
1002                return True
1003
1004    class gateways:
1005        def __init__(self, eid, master, tmpdir, gw_pubkey,
1006                gw_secretkey, copy_file):
1007            self.begin_gateways = \
1008                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1009            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1010            self.current_gateways = None
1011            self.control_gateway = None
1012            self.active_end = { }
1013
1014            self.eid = eid
1015            self.master = master
1016            self.tmpdir = tmpdir
1017            self.gw_pubkey_base = gw_pubkey
1018            self.gw_secretkey_base = gw_secretkey
1019
1020            self.copy_file = copy_file
1021
1022
1023        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1024                active_end, tbparams, dtb, myname, desthost, type):
1025            """
1026            Produce a gateway configuration file from a gateways line.
1027            """
1028
1029            sproject = tbparams[gw].get('project', 'project')
1030            dproject = tbparams[dtb].get('project', 'project')
1031            sdomain = ".%s.%s%s" % (eid, sproject,
1032                    tbparams[gw].get('domain', ".example.com"))
1033            ddomain = ".%s.%s%s" % (eid, dproject,
1034                    tbparams[dtb].get('domain', ".example.com"))
1035            boss = tbparams[master].get('boss', "boss")
1036            fs = tbparams[master].get('fs', "fs")
1037            event_server = "%s%s" % \
1038                    (tbparams[gw].get('eventserver', "event_server"),
1039                            tbparams[gw].get('domain', "example.com"))
1040            remote_event_server = "%s%s" % \
1041                    (tbparams[dtb].get('eventserver', "event_server"),
1042                            tbparams[dtb].get('domain', "example.com"))
1043            seer_control = "%s%s" % \
1044                    (tbparams[gw].get('control', "control"), sdomain)
1045            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1046            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1047            tunnel_cfg = tbparams[gw].get("tun", "false")
1048
1049            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1050            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1051
1052            # translate to lower case so the `hostname` hack for specifying
1053            # configuration files works.
1054            conf_file = conf_file.lower();
1055            remote_conf_file = remote_conf_file.lower();
1056
1057            if dtb == master:
1058                active = "false"
1059            elif gw == master:
1060                active = "true"
1061            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1062                active = "false"
1063            else:
1064                active_end['%s-%s' % (gw, dtb)] = 1
1065                active = "true"
1066
1067            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1068            print >>gwconfig, "Active: %s" % active
1069            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1070            print >>gwconfig, "BossName: %s" % boss
1071            print >>gwconfig, "FsName: %s" % fs
1072            print >>gwconfig, "EventServerName: %s" % event_server
1073            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1074            print >>gwconfig, "SeerControl: %s" % seer_control
1075            print >>gwconfig, "Type: %s" % type
1076            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1077            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1078                    local_script_dir
1079            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1080            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1081            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1082                    (remote_script_dir, remote_conf_file)
1083            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1084            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
1085            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
1086            gwconfig.close()
1087
1088            return active == "true"
1089
1090        def __call__(self, line, allocated, tbparams):
1091            # Process gateways
1092            if not self.current_gateways:
1093                m = self.begin_gateways.match(line)
1094                if m:
1095                    self.current_gateways = m.group(1)
1096                    if allocated.has_key(self.current_gateways):
1097                        # This test should always succeed
1098                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1099                        if not os.path.exists(tb_dir):
1100                            try:
1101                                os.mkdir(tb_dir)
1102                            except IOError:
1103                                raise service_error(service_error.internal,
1104                                        "Cannot create %s" % tb_dir)
1105                    else:
1106                        # XXX
1107                        self.log.error("[gateways]: Ignoring gateways for " + \
1108                                "unknown testbed %s" % self.current_gateways)
1109                        self.current_gateways = None
1110                    return True
1111                else:
1112                    return False
1113            else:
1114                m = self.end_gateways.match(line)
1115                if m :
1116                    if m.group(1) != self.current_gateways:
1117                        raise service_error(service_error.internal,
1118                                "Mismatched gateway markers!?")
1119                    if self.control_gateway:
1120                        try:
1121                            cc = open("%s/%s/client.conf" %
1122                                    (self.tmpdir, self.current_gateways), 'w')
1123                            print >>cc, "ControlGateway: %s" % \
1124                                    self.control_gateway
1125                            if tbparams[self.master].has_key('smbshare'):
1126                                print >>cc, "SMBSHare: %s" % \
1127                                        tbparams[self.master]['smbshare']
1128                            print >>cc, "ProjectUser: %s" % \
1129                                    tbparams[self.master]['user']
1130                            print >>cc, "ProjectName: %s" % \
1131                                    tbparams[self.master]['project']
1132                            cc.close()
1133                        except IOError:
1134                            raise service_error(service_error.internal,
1135                                    "Error creating client config")
1136                        try:
1137                            cc = open("%s/%s/seer.conf" %
1138                                    (self.tmpdir, self.current_gateways),
1139                                    'w')
1140                            if self.current_gateways != self.master:
1141                                print >>cc, "ControlNode: %s" % \
1142                                        self.control_gateway
1143                            print >>cc, "ExperimentID: %s/%s" % \
1144                                    ( tbparams[self.master]['project'], \
1145                                    self.eid )
1146                            cc.close()
1147                        except IOError:
1148                            raise service_error(service_error.internal,
1149                                    "Error creating seer config")
1150                    else:
1151                        debug.error("[gateways]: No control gateway for %s" %\
1152                                    self.current_gateways)
1153                    self.current_gateways = None
1154                else:
1155                    dtb, myname, desthost, type = line.split(" ")
1156
1157                    if type == "control" or type == "both":
1158                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1159                                self.eid, 
1160                                tbparams[self.current_gateways]['project'],
1161                                tbparams[self.current_gateways]['domain'])
1162                    try:
1163                        active = self.gateway_conf_file(self.current_gateways,
1164                                self.master, self.eid, self.gw_pubkey_base,
1165                                self.gw_secretkey_base,
1166                                self.active_end, tbparams, dtb, myname,
1167                                desthost, type)
1168                    except IOError, e:
1169                        raise service_error(service_error.internal,
1170                                "Failed to write config file for %s" % \
1171                                        self.current_gateway)
1172           
1173                    gw_pubkey = "%s/keys/%s" % \
1174                            (self.tmpdir, self.gw_pubkey_base)
1175                    gw_secretkey = "%s/keys/%s" % \
1176                            (self.tmpdir, self.gw_secretkey_base)
1177
1178                    pkfile = "%s/%s/%s" % \
1179                            ( self.tmpdir, self.current_gateways, 
1180                                    self.gw_pubkey_base)
1181                    skfile = "%s/%s/%s" % \
1182                            ( self.tmpdir, self.current_gateways, 
1183                                    self.gw_secretkey_base)
1184
1185                    if not os.path.exists(pkfile):
1186                        try:
1187                            self.copy_file(gw_pubkey, pkfile)
1188                        except IOError:
1189                            service_error(service_error.internal,
1190                                    "Failed to copy pubkey file")
1191
1192                    if active and not os.path.exists(skfile):
1193                        try:
1194                            self.copy_file(gw_secretkey, skfile)
1195                        except IOError:
1196                            service_error(service_error.internal,
1197                                    "Failed to copy secretkey file")
1198                return True
1199
1200    class shunt_to_file:
1201        """
1202        Simple class to write data between two regexps to a file.
1203        """
1204        def __init__(self, begin, end, filename):
1205            """
1206            Begin shunting on a match of begin, stop on end, send data to
1207            filename.
1208            """
1209            self.begin = re.compile(begin)
1210            self.end = re.compile(end)
1211            self.in_shunt = False
1212            self.file = None
1213            self.filename = filename
1214
1215        def __call__(self, line):
1216            """
1217            Call this on each line in the input that may be shunted.
1218            """
1219            if not self.in_shunt:
1220                if self.begin.match(line):
1221                    self.in_shunt = True
1222                    try:
1223                        self.file = open(self.filename, "w")
1224                    except:
1225                        self.file = None
1226                        raise
1227                    return True
1228                else:
1229                    return False
1230            else:
1231                if self.end.match(line):
1232                    if self.file: 
1233                        self.file.close()
1234                        self.file = None
1235                    self.in_shunt = False
1236                else:
1237                    if self.file:
1238                        print >>self.file, line
1239                return True
1240
1241    class shunt_to_list:
1242        """
1243        Same interface as shunt_to_file.  Data collected in self.list, one list
1244        element per line.
1245        """
1246        def __init__(self, begin, end):
1247            self.begin = re.compile(begin)
1248            self.end = re.compile(end)
1249            self.in_shunt = False
1250            self.list = [ ]
1251       
1252        def __call__(self, line):
1253            if not self.in_shunt:
1254                if self.begin.match(line):
1255                    self.in_shunt = True
1256                    return True
1257                else:
1258                    return False
1259            else:
1260                if self.end.match(line):
1261                    self.in_shunt = False
1262                else:
1263                    self.list.append(line)
1264                return True
1265
1266    class shunt_to_string:
1267        """
1268        Same interface as shunt_to_file.  Data collected in self.str, all in
1269        one string.
1270        """
1271        def __init__(self, begin, end):
1272            self.begin = re.compile(begin)
1273            self.end = re.compile(end)
1274            self.in_shunt = False
1275            self.str = ""
1276       
1277        def __call__(self, line):
1278            if not self.in_shunt:
1279                if self.begin.match(line):
1280                    self.in_shunt = True
1281                    return True
1282                else:
1283                    return False
1284            else:
1285                if self.end.match(line):
1286                    self.in_shunt = False
1287                else:
1288                    self.str += line
1289                return True
1290
1291    def create_experiment(self, req, fid):
1292        """
1293        The external interface to experiment creation called from the
1294        dispatcher.
1295
1296        Creates a working directory, splits the incoming description using the
1297        splitter script and parses out the avrious subsections using the
1298        lcasses above.  Once each sub-experiment is created, use pooled threads
1299        to instantiate them and start it all up.
1300        """
1301        try:
1302            tmpdir = tempfile.mkdtemp(prefix="split-")
1303        except IOError:
1304            raise service_error(service_error.internal, "Cannot create tmp dir")
1305
1306        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1307        gw_secretkey_base = "fed.%s" % self.ssh_type
1308        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1309        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1310        tclfile = tmpdir + "/experiment.tcl"
1311        tbparams = { }
1312
1313        pid = "dummy"
1314        gid = "dummy"
1315        # XXX
1316        fail_soft = False
1317
1318        try:
1319            os.mkdir(tmpdir+"/keys")
1320        except OSError:
1321            raise service_error(service_error.internal,
1322                    "Can't make temporary dir")
1323
1324        req = req.get('CreateRequestBody', None)
1325        if not req:
1326            raise service_error(service_error.req,
1327                    "Bad request format (no CreateRequestBody)")
1328        # The tcl parser needs to read a file so put the content into that file
1329        file_content=req.get('experimentdescription', None)
1330        if file_content:
1331            try:
1332                f = open(tclfile, 'w')
1333                f.write(file_content)
1334                f.close()
1335            except IOError:
1336                raise service_error(service_error.internal,
1337                        "Cannot write temp experiment description")
1338        else:
1339            raise service_error(service_error.req, "No experiment description")
1340
1341        if req.has_key('experimentID') and \
1342                req['experimentID'].has_key('localname'):
1343            eid = req['experimentID']['localname']
1344            self.state_lock.acquire()
1345            while (self.state.has_key(eid)):
1346                eid += random.choice(string.ascii_letters)
1347            # To avoid another thread picking this localname
1348            self.state[eid] = "placeholder"
1349            self.state_lock.release()
1350        else:
1351            eid = self.exp_stem
1352            for i in range(0,5):
1353                eid += random.choice(string.ascii_letters)
1354            self.state_lock.acquire()
1355            while (self.state.has_key(eid)):
1356                eid = self.exp_stem
1357                for i in range(0,5):
1358                    eid += random.choice(string.ascii_letters)
1359            # To avoid another thread picking this localname
1360            self.state[eid] = "placeholder"
1361            self.state_lock.release()
1362
1363        try:
1364            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1365        except ValueError:
1366            raise service_error(service_error.server_config, 
1367                    "Bad key type (%s)" % self.ssh_type)
1368
1369        user = req.get('user', None)
1370        if user == None:
1371            raise service_error(service_error.req, "No user")
1372
1373        master = req.get('master', None)
1374        if master == None:
1375            raise service_error(service_error.req, "No master testbed label")
1376       
1377       
1378        tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1379            str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1380        tclparser = Popen(tclcmd, stdout=PIPE)
1381
1382        allocated = { }     # Testbeds we can access
1383        started = { }       # Testbeds where a sub-experiment started
1384                            # successfully
1385
1386        # Objects to parse the splitter output (defined above)
1387        parse_current_testbed = self.current_testbed(eid, tmpdir)
1388        parse_allbeds = self.allbeds(self.get_access)
1389        parse_gateways = self.gateways(eid, master, tmpdir,
1390                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1391        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1392                    "^#\s+End\s+Vtopo")
1393        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1394                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1395        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1396                "^#\s+End\s+tarfiles")
1397        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1398                "^#\s+End\s+rpms")
1399
1400        # Worling on the split data
1401        for line in tclparser.stdout:
1402            line = line.rstrip()
1403            if parse_current_testbed(line, master, allocated, tbparams):
1404                continue
1405            elif parse_allbeds(line, user, tbparams):
1406                continue
1407            elif parse_gateways(line, allocated, tbparams):
1408                continue
1409            elif parse_vtopo(line):
1410                continue
1411            elif parse_hostnames(line):
1412                continue
1413            elif parse_tarfiles(line):
1414                continue
1415            elif parse_rpms(line):
1416                continue
1417            else:
1418                raise service_error(service_error.internal, 
1419                        "Bad tcl parse? %s" % line)
1420
1421        # Virtual topology and visualization
1422        vtopo = self.gentopo(parse_vtopo.str)
1423        if not vtopo:
1424            raise service_error(service_error.internal, 
1425                    "Failed to generate virtual topology")
1426
1427        vis = self.genviz(vtopo)
1428        if not vis:
1429            raise service_error(service_error.internal, 
1430                    "Failed to generate visualization")
1431
1432        # save federant information
1433        for k in allocated.keys():
1434            tbparams[k]['federant'] = {\
1435                    'name': [ { 'localname' : eid} ],\
1436                    'emulab': tbparams[k]['emulab'],\
1437                    'master' : k == master,\
1438                }
1439
1440
1441        # Copy tarfiles and rpms needed at remote sites into a staging area
1442        try:
1443            for t in parse_tarfiles.list:
1444                if not os.path.exists("%s/tarfiles" % tmpdir):
1445                    os.mkdir("%s/tarfiles" % tmpdir)
1446                self.copy_file(t, "%s/tarfiles/%s" % \
1447                        (tmpdir, os.path.basename(t)))
1448            for r in parse_rpms.list:
1449                if not os.path.exists("%s/rpms" % tmpdir):
1450                    os.mkdir("%s/rpms" % tmpdir)
1451                self.copy_file(r, "%s/rpms/%s" % \
1452                        (tmpdir, os.path.basename(r)))
1453        except IOError, e:
1454            raise service_error(service_error.internal, 
1455                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1456
1457        thread_pool_info = self.thread_pool()
1458        threads = [ ]
1459
1460        for tb in [ k for k in allocated.keys() if k != master]:
1461            # Wait until we have a free slot to start the next testbed load
1462            thread_pool_info.acquire()
1463            while thread_pool_info.started - \
1464                    thread_pool_info.terminated >= self.nthreads:
1465                thread_pool_info.wait()
1466            thread_pool_info.release()
1467
1468            # Create and start a thread to start the segment, and save it to
1469            # get the return value later
1470            t  = self.pooled_thread(target=self.start_segment, 
1471                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1472                    pdata=thread_pool_info, trace_file=self.trace_file)
1473            threads.append(t)
1474            t.start()
1475
1476        # Wait until all finish (the first clause of the while is to make sure
1477        # one starts)
1478        thread_pool_info.acquire()
1479        while thread_pool_info.started == 0 or \
1480                thread_pool_info.started > thread_pool_info.terminated:
1481            thread_pool_info.wait()
1482        thread_pool_info.release()
1483
1484        # If none failed, start the master
1485        failed = [ t.getName() for t in threads if not t.rv ]
1486
1487        if len(failed) == 0:
1488            if not self.start_segment(master, eid, tbparams, tmpdir):
1489                failed.append(master)
1490
1491        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1492        # If one failed clean up, unless fail_soft is set
1493        if failed:
1494            if not fail_soft:
1495                for tb in succeeded:
1496                    self.stop_segment(tb, eid, tbparams)
1497                # Remove the placeholder
1498                self.state_lock.acquire()
1499                del self.state[eid]
1500                self.state_lock.release()
1501
1502                raise service_error(service_error.federant,
1503                    "Swap in failed on %s" % ",".join(failed))
1504        else:
1505            self.log.info("[start_segment]: Experiment %s started" % eid)
1506
1507        # Generate an ID for the experiment (slice) and a certificate that the
1508        # allocator can use to prove they own it.  We'll ship it back through
1509        # the encrypted connection.
1510        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1511
1512        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1513
1514        # Walk up tmpdir, deleting as we go
1515        for path, dirs, files in os.walk(tmpdir, topdown=False):
1516            for f in files:
1517                os.remove(os.path.join(path, f))
1518            for d in dirs:
1519                os.rmdir(os.path.join(path, d))
1520        os.rmdir(tmpdir)
1521
1522        resp = { 'federant' : [ tbparams[tb]['federant'] \
1523                for tb in tbparams.keys() \
1524                    if tbparams[tb].has_key('federant') ],\
1525                    'vtopo': vtopo,\
1526                    'vis' : vis,
1527                    'experimentID' : [\
1528                            { 'fedid': copy.copy(expid) }, \
1529                            { 'localname': eid },\
1530                        ],\
1531                    'experimentAccess': { 'X509' : expcert },\
1532                }
1533
1534        # Insert the experiment into our state and update the disk copy
1535        self.state_lock.acquire()
1536        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1537                for tb in tbparams.keys() \
1538                    if tbparams[tb].has_key('federant') ],\
1539                    'vtopo': vtopo,\
1540                    'vis' : vis,
1541                    'experimentID' : [\
1542                            { 'fedid': expid }, { 'localname': eid },\
1543                        ],\
1544                }
1545        self.state[eid] = self.state[expid]
1546        if self.state_filename: self.write_state()
1547        self.state_lock.release()
1548
1549        if not failed:
1550            return resp
1551        else:
1552            raise service_error(service_error.partial, \
1553                    "Partial swap in on %s" % ",".join(succeeded))
1554
1555
1556    def get_vtopo(self, req, fid):
1557        """
1558        Return the stored virtual topology for this experiment
1559        """
1560        rv = None
1561
1562        req = req.get('VtopoRequestBody', None)
1563        if not req:
1564            raise service_error(service_error.req,
1565                    "Bad request format (no VtopoRequestBody)")
1566        exp = req.get('experiment', None)
1567        if exp:
1568            if exp.has_key('fedid'):
1569                key = fedid(bits=exp['fedid'])
1570                keytype = "fedid"
1571            elif exp.has_key('localname'):
1572                key = exp['localname']
1573                keytype = "localname"
1574            else:
1575                raise service_error(service_error.req, "Unknown lookup type")
1576        else:
1577            raise service_error(service_error.req, "No request?")
1578
1579        self.state_lock.acquire()
1580        if self.state.has_key(key):
1581            rv = { 'experiment' : {keytype: key },\
1582                    'vtopo': self.state[key]['vtopo'],\
1583                }
1584        self.state_lock.release()
1585
1586        if rv: return rv
1587        else: raise service_error(service_error.req, "No such experiment")
1588
1589    def get_vis(self, req, fid):
1590        """
1591        Return the stored visualization for this experiment
1592        """
1593        rv = None
1594
1595        req = req.get('VisRequestBody', None)
1596        if not req:
1597            raise service_error(service_error.req,
1598                    "Bad request format (no VisRequestBody)")
1599        exp = req.get('experiment', None)
1600        if exp:
1601            if exp.has_key('fedid'):
1602                key = fedid(bits=exp['fedid'])
1603                keytype = "fedid"
1604            elif exp.has_key('localname'):
1605                key = exp['localname']
1606                keytype = "localname"
1607            else:
1608                raise service_error(service_error.req, "Unknown lookup type")
1609        else:
1610            raise service_error(service_error.req, "No request?")
1611
1612        self.state_lock.acquire()
1613        if self.state.has_key(key):
1614            rv =  { 'experiment' : {keytype: key },\
1615                    'vis': self.state[key]['vis'],\
1616                    }
1617        self.state_lock.release()
1618
1619        if rv: return rv
1620        else: raise service_error(service_error.req, "No such experiment")
1621
1622    def get_info(self, req, fid):
1623        """
1624        Return all the stored info about this experiment
1625        """
1626        rv = None
1627
1628        req = req.get('InfoRequestBody', None)
1629        if not req:
1630            raise service_error(service_error.req,
1631                    "Bad request format (no VisRequestBody)")
1632        exp = req.get('experiment', None)
1633        if exp:
1634            if exp.has_key('fedid'):
1635                key = fedid(bits=exp['fedid'])
1636                keytype = "fedid"
1637            elif exp.has_key('localname'):
1638                key = exp['localname']
1639                keytype = "localname"
1640            else:
1641                raise service_error(service_error.req, "Unknown lookup type")
1642        else:
1643            raise service_error(service_error.req, "No request?")
1644
1645        # The state may be massaged by the service function that called
1646        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1647        # state.
1648        self.state_lock.acquire()
1649        if self.state.has_key(key):
1650            rv = copy.deepcopy(self.state[key])
1651        self.state_lock.release()
1652
1653        if rv: return rv
1654        else: raise service_error(service_error.req, "No such experiment")
1655
1656
1657    def terminate_experiment(self, req, fid):
1658        """
1659        Swap this experiment out on the federants and delete the shared
1660        information
1661        """
1662        tbparams = { }
1663        req = req.get('TerminateRequestBody', None)
1664        if not req:
1665            raise service_error(service_error.req,
1666                    "Bad request format (no TerminateRequestBody)")
1667        exp = req.get('experiment', None)
1668        if exp:
1669            if exp.has_key('fedid'):
1670                key = fedid(bits=exp['fedid'])
1671                keytype = "fedid"
1672            elif exp.has_key('localname'):
1673                key = exp['localname']
1674                keytype = "localname"
1675            else:
1676                raise service_error(service_error.req, "Unknown lookup type")
1677        else:
1678            raise service_error(service_error.req, "No request?")
1679
1680        self.state_lock.acquire()
1681        fed_exp = self.state.get(key, None)
1682
1683        if fed_exp:
1684            # This branch of the conditional holds the lock to generate a
1685            # consistent temporary tbparams variable to deallocate experiments.
1686            # It releases the lock to do the deallocations and reacquires it to
1687            # remove the experiment state when the termination is complete.
1688            ids = []
1689            #  experimentID is a list of dicts that are self-describing
1690            #  identifiers.  This finds all the fedids and localnames - the
1691            #  keys of self.state - and puts them into ids.
1692            for id in fed_exp.get('experimentID', []):
1693                if id.has_key('fedid'): ids.append(id['fedid'])
1694                if id.has_key('localname'): ids.append(id['localname'])
1695
1696            # Construct enough of the tbparams to make the stop_segment calls
1697            # work
1698            for fed in fed_exp['federant']:
1699                try:
1700                    for e in fed['name']:
1701                        eid = e.get('localname', None)
1702                        if eid: break
1703                    else:
1704                        continue
1705
1706                    p = fed['emulab']['project']
1707
1708                    project = p['name']['localname']
1709                    tb = p['testbed']['localname']
1710                    user = p['user'][0]['userID']['localname']
1711
1712                    domain = fed['emulab']['domain']
1713                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1714                except KeyError, e:
1715                    continue
1716                tbparams[tb] = {\
1717                        'user': user,\
1718                        'domain': domain,\
1719                        'project': project,\
1720                        'host': host,\
1721                        'eid': eid,\
1722                    }
1723            self.state_lock.release()
1724
1725            # Stop everyone.
1726            for tb in tbparams.keys():
1727                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1728
1729            # Remove teh terminated experiment
1730            self.state_lock.acquire()
1731            for id in ids:
1732                if self.state.has_key(id): del self.state[id]
1733
1734            if self.state_filename: self.write_state()
1735            self.state_lock.release()
1736
1737            return { 'experiment': exp }
1738        else:
1739            # Don't forget to release the lock
1740            self.state_lock.release()
1741            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.