source: fedd/fedd_experiment_control.py @ f4f4117

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since f4f4117 was f4f4117, checked in by Ted Faber <faber@…>, 16 years ago

add remote splitter interface

  • Property mode set to 100644
File size: 53.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47    scripts = ["fed_bootstrap", "federate.sh", "smbmount.FreeBSD.pl",
48        "smbmount.Linux.pl", "make_hosts", "fed-tun.pl", "fed-tun.ucb.pl",
49        "fed_evrepeater", "rc.accounts.patch"]
50   
51    class thread_pool:
52        """
53        A class to keep track of a set of threads all invoked for the same
54        task.  Manages the mutual exclusion of the states.
55        """
56        def __init__(self):
57            """
58            Start a pool.
59            """
60            self.changed = Condition()
61            self.started = 0
62            self.terminated = 0
63
64        def acquire(self):
65            """
66            Get the pool's lock.
67            """
68            self.changed.acquire()
69
70        def release(self):
71            """
72            Release the pool's lock.
73            """
74            self.changed.release()
75
76        def wait(self, timeout = None):
77            """
78            Wait for a pool thread to start or stop.
79            """
80            self.changed.wait(timeout)
81
82        def start(self):
83            """
84            Called by a pool thread to report starting.
85            """
86            self.changed.acquire()
87            self.started += 1
88            self.changed.notifyAll()
89            self.changed.release()
90
91        def terminate(self):
92            """
93            Called by a pool thread to report finishing.
94            """
95            self.changed.acquire()
96            self.terminated += 1
97            self.changed.notifyAll()
98            self.changed.release()
99
100        def clear(self):
101            """
102            Clear all pool data.
103            """
104            self.changed.acquire()
105            self.started = 0
106            self.terminated =0
107            self.changed.notifyAll()
108            self.changed.release()
109
110    class pooled_thread(Thread):
111        """
112        One of a set of threads dedicated to a specific task.  Uses the
113        thread_pool class above for coordination.
114        """
115        def __init__(self, group=None, target=None, name=None, args=(), 
116                kwargs={}, pdata=None, trace_file=None):
117            Thread.__init__(self, group, target, name, args, kwargs)
118            self.rv = None          # Return value of the ops in this thread
119            self.exception = None   # Exception that terminated this thread
120            self.target=target      # Target function to run on start()
121            self.args = args        # Args to pass to target
122            self.kwargs = kwargs    # Additional kw args
123            self.pdata = pdata      # thread_pool for this class
124            # Logger for this thread
125            self.log = logging.getLogger("fedd.experiment_control")
126       
127        def run(self):
128            """
129            Emulate Thread.run, except add pool data manipulation and error
130            logging.
131            """
132            if self.pdata:
133                self.pdata.start()
134
135            if self.target:
136                try:
137                    self.rv = self.target(*self.args, **self.kwargs)
138                except service_error, s:
139                    self.exception = s
140                    self.log.error("Thread exception: %s %s" % \
141                            (s.code_string(), s.desc))
142                except:
143                    self.exception = sys.exc_info()[1]
144                    self.log.error(("Unexpected thread exception: %s" +\
145                            "Trace %s") % (self.exception,\
146                                traceback.format_exc()))
147            if self.pdata:
148                self.pdata.terminate()
149
150    def __init__(self, config=None):
151        """
152        Intialize the various attributes, most from the config object
153        """
154        self.scripts = fedd_experiment_control_local.scripts
155        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
156        self.thread_pool = fedd_experiment_control_local.thread_pool
157
158        self.cert_file = None
159        self.cert_pwd = None
160        self.trusted_certs = None
161
162        # Walk through the various relevant certificat specifying config
163        # attributes until the local certificate attributes can be resolved.
164        # The walk is from most specific to most general specification.
165        for s in ("experiment_control", "globals"):
166            if config.has_section(s): 
167                if config.has_option(s, "cert_file"):
168                    if not self.cert_file:
169                        self.cert_file = config.get(s, "cert_file")
170                        self.cert_pwd = config.get(s, "cert_pwd")
171
172                if config.has_option(s, "trusted_certs"):
173                    if not self.trusted_certs:
174                        self.trusted_certs = config.get(s, "trusted_certs")
175
176        self.exp_stem = "fed-stem"
177        self.log = logging.getLogger("fedd.experiment_control")
178        self.muxmax = 2
179        self.nthreads = 2
180        self.randomize_experiments = False
181
182        self.scp_exec = "/usr/bin/scp"
183        self.splitter = None
184        self.ssh_exec="/usr/bin/ssh"
185        self.ssh_keygen = "/usr/bin/ssh-keygen"
186        self.ssh_identity_file = None
187
188        if config.has_section("experiment_control"):
189            self.debug = config.get("experiment_control", "create_debug")
190            self.state_filename = config.get("experiment_control", 
191                    "experiment_state_file")
192            self.scripts_dir = config.get("experiment_control",
193                    "federation_script_dir")
194            self.splitter_url = config.get("experiment_control", "splitter_url")
195        else:
196            self.debug = False
197            self.state_filename = None
198            self.scripts_dir = "/users/faber/testbed/federation"
199            self.splitter_url = None
200
201        # XXX
202        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
203        self.ssh_type = "rsa"
204        self.state = { }
205        self.state_lock = Lock()
206        self.tclsh = "/usr/local/bin/otclsh"
207        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
208        self.tbmap = { 
209                'deter':'https://users.isi.deterlab.net:23235',
210                'emulab':'https://users.isi.deterlab.net:23236',
211                'ucb':'https://users.isi.deterlab.net:23237',
212                }
213        self.trace_file = sys.stderr
214
215        self.def_expstart = \
216                "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
217        self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
218        self.def_gwstart = \
219                "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
220        self.def_mgwstart = \
221                "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
222        self.def_gwimage = "FBSD61-TUNNEL2";
223        self.def_gwtype = "pc";
224
225
226        if self.ssh_pubkey_file:
227            try:
228                f = open(self.ssh_pubkey_file, 'r')
229                self.ssh_pubkey = f.read()
230                f.close()
231            except IOError:
232                raise service_error(service_error.internal,
233                        "Cannot read sshpubkey")
234
235        # Set the logging level to the value passed in.  The getattr slieght of
236        # hand finds the logging level constant corrersponding to the string.
237        # We're a little paranoid to avoid user mayhem.
238        if config.has_option("experiment_control", "log_level"):
239            try:
240                level = int(getattr(logging, 
241                    config.get("experiment_control", "log_level").upper(),-1))
242
243                if  logging.DEBUG <= level <= logging.CRITICAL:
244                    self.log.setLevel(level)
245                else:
246                    self.log.error("Bad experiment_log value: %s" % \
247                            config.get("experiment_control", "log_level"))
248
249            except ValueError:
250                self.log.error("Bad experiment_log value: %s" % \
251                        config.experiment_log)
252
253        # Grab saved state.  OK to do this w/o locking because it's read only
254        # and only one thread should be in existence that can see self.state at
255        # this point.
256        if self.state_filename:
257            self.read_state()
258
259        # Confirm federation scripts in the right place
260        for s in self.scripts:
261            if not os.path.exists(self.scripts_dir + "/" + s):
262                raise service_error(service_error.server_config,
263                        "%s/%s not in local script dir" % (self.scripts_dir, s))
264
265        # Dispatch tables
266        self.soap_services = {\
267                'Create': make_soap_handler(\
268                        CreateRequestMessage.typecode,
269                        getattr(self, "create_experiment"), 
270                        CreateResponseMessage,
271                        "CreateResponseBody"),
272                'Vtopo': make_soap_handler(\
273                        VtopoRequestMessage.typecode,
274                        getattr(self, "get_vtopo"),
275                        VtopoResponseMessage,
276                        "VtopoResponseBody"),
277                'Vis': make_soap_handler(\
278                        VisRequestMessage.typecode,
279                        getattr(self, "get_vis"),
280                        VisResponseMessage,
281                        "VisResponseBody"),
282                'Info': make_soap_handler(\
283                        InfoRequestMessage.typecode,
284                        getattr(self, "get_info"),
285                        InfoResponseMessage,
286                        "InfoResponseBody"),
287                'Terminate': make_soap_handler(\
288                        TerminateRequestMessage.typecode,
289                        getattr(self, "terminate_experiment"),
290                        TerminateResponseMessage,
291                        "TerminateResponseBody"),
292        }
293
294        self.xmlrpc_services = {\
295                'Create': make_xmlrpc_handler(\
296                        getattr(self, "create_experiment"), 
297                        "CreateResponseBody"),
298                'Vtopo': make_xmlrpc_handler(\
299                        getattr(self, "get_vtopo"),
300                        "VtopoResponseBody"),
301                'Vis': make_xmlrpc_handler(\
302                        getattr(self, "get_vis"),
303                        "VisResponseBody"),
304                'Info': make_xmlrpc_handler(\
305                        getattr(self, "get_info"),
306                        "InfoResponseBody"),
307                'Terminate': make_xmlrpc_handler(\
308                        getattr(self, "terminate_experiment"),
309                        "TerminateResponseBody"),
310        }
311
312    def copy_file(self, src, dest, size=1024):
313        """
314        Exceedingly simple file copy.
315        """
316        s = open(src,'r')
317        d = open(dest, 'w')
318
319        buf = "x"
320        while buf != "":
321            buf = s.read(size)
322            d.write(buf)
323        s.close()
324        d.close()
325
326    # Call while holding self.state_lock
327    def write_state(self):
328        """
329        Write a new copy of experiment state after copying the existing state
330        to a backup.
331
332        State format is a simple pickling of the state dictionary.
333        """
334        if os.access(self.state_filename, os.W_OK):
335            self.copy_file(self.state_filename, \
336                    "%s.bak" % self.state_filename)
337        try:
338            f = open(self.state_filename, 'w')
339            pickle.dump(self.state, f)
340        except IOError, e:
341            self.log.error("Can't write file %s: %s" % \
342                    (self.state_filename, e))
343        except pickle.PicklingError, e:
344            self.log.error("Pickling problem: %s" % e)
345
346    # Call while holding self.state_lock
347    def read_state(self):
348        """
349        Read a new copy of experiment state.  Old state is overwritten.
350
351        State format is a simple pickling of the state dictionary.
352        """
353        try:
354            f = open(self.state_filename, "r")
355            self.state = pickle.load(f)
356            self.log.debug("[read_state]: Read state from %s" % \
357                    self.state_filename)
358        except IOError, e:
359            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
360                    % (self.state_filename, e))
361        except pickle.UnpicklingError, e:
362            self.log.warning(("[read_state]: No saved state: " + \
363                    "Unpickling failed: %s") % e)
364
365    def scp_file(self, file, user, host, dest=""):
366        """
367        scp a file to the remote host.  If debug is set the action is only
368        logged.
369        """
370
371        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
372        rv = 0
373
374        try:
375            dnull = open("/dev/null", "r")
376        except IOError:
377            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
378            dnull = Null
379
380        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
381        if not self.debug:
382            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
383            else: rv = call(scp_cmd)
384
385        return rv == 0
386
387    def ssh_cmd(self, user, host, cmd, wname=None):
388        """
389        Run a remote command on host as user.  If debug is set, the action is
390        only logged.
391        """
392        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
393
394        try:
395            dnull = open("/dev/null", "r")
396        except IOError:
397            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
398            dnull = Null
399
400        self.log.debug("[ssh_cmd]: %s" % sh_str)
401        if not self.debug:
402            if dnull:
403                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
404            else:
405                sub = Popen(sh_str, shell=True)
406            return sub.wait() == 0
407        else:
408            return True
409
410    def ship_scripts(self, host, user, dest_dir):
411        """
412        Copy the federation scripts (fedkit) to the a federant.
413        """
414        if self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
415            for s in self.scripts:
416                if not self.scp_file("%s/%s" % (self.scripts_dir, s),
417                        user, host, dest_dir):
418                    return False
419            return True
420        else:
421            return False
422
423    def ship_configs(self, host, user, src_dir, dest_dir):
424        """
425        Copy federant-specific configuration files to the federant.
426        """
427        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
428            return False
429        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
430            return False
431
432        for f in os.listdir(src_dir):
433            if os.path.isdir(f):
434                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
435                        "%s/%s" % (dest_dir, f)):
436                    return False
437            else:
438                if not self.scp_file("%s/%s" % (src_dir, f), 
439                        user, host, dest_dir):
440                    return False
441        return True
442
443    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
444        """
445        Start a sub-experiment on a federant.
446
447        Get the current state, modify or create as appropriate, ship data and
448        configs and start the experiment.  There are small ordering differences
449        based on the initial state of the sub-experiment.
450        """
451        # ops node in the federant
452        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
453        user = tbparams[tb]['user']     # federant user
454        pid = tbparams[tb]['project']   # federant project
455        # XXX
456        base_confs = ( "hosts",)
457        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
458        # command to test experiment state
459        expinfo_exec = "/usr/testbed/bin/expinfo" 
460        # Configuration directories on the remote machine
461        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
462        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
463        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
464        # Regular expressions to parse the expinfo response
465        state_re = re.compile("State:\s+(\w+)")
466        no_exp_re = re.compile("^No\s+such\s+experiment")
467        state = None    # Experiment state parsed from expinfo
468        # The expinfo ssh command
469        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
470
471        # Get status
472        self.log.debug("[start_segment]: %s"% " ".join(cmd))
473        dev_null = None
474        try:
475            dev_null = open("/dev/null", "a")
476        except IOError, e:
477            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
478
479        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
480        for line in status.stdout:
481            m = state_re.match(line)
482            if m: state = m.group(1)
483            else:
484                m = no_exp_re.match(line)
485                if m: state = "none"
486        rv = status.wait()
487
488        # If the experiment is not present the subcommand returns a non-zero
489        # return value.  If we successfully parsed a "none" outcome, ignore the
490        # return code.
491        if rv != 0 and state != "none":
492            raise service_error(service_error.internal,
493                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
494
495        self.log.debug("[start_segment]: %s: %s" % (tb, state))
496        self.log.info("[start_segment]:transferring experiment to %s" % tb)
497
498        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
499            return False
500        # Clear the federation files
501        if not self.ssh_cmd(user, host, 
502                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
503            return False
504        if not self.ssh_cmd(user, host, 
505                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
506            return False
507        # Clear and create the tarfiles and rpm directories
508        for d in (tarfiles_dir, rpms_dir):
509            if not self.ssh_cmd(user, host, 
510                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
511                return False
512            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
513                    "create tarfiles"):
514                return False
515       
516        if state == 'active':
517            # Remote experiment is active.  Modify it.
518            for f in base_confs:
519                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
520                        "%s/%s" % (proj_dir, f)):
521                    return False
522            if not self.ship_scripts(host, user, proj_dir):
523                return False
524            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
525                    proj_dir):
526                return False
527            if os.path.isdir("%s/tarfiles" % tmpdir):
528                if not self.ship_configs(host, user,
529                        "%s/tarfiles" % tmpdir, tarfiles_dir):
530                    return False
531            if os.path.isdir("%s/rpms" % tmpdir):
532                if not self.ship_configs(host, user,
533                        "%s/rpms" % tmpdir, tarfiles_dir):
534                    return False
535            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
536            if not self.ssh_cmd(user, host,
537                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
538                            (pid, eid, tclfile), "modexp"):
539                return False
540            return True
541        elif state == "swapped":
542            # Remote experiment swapped out.  Modify it and swap it in.
543            for f in base_confs:
544                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
545                        "%s/%s" % (proj_dir, f)):
546                    return False
547            if not self.ship_scripts(host, user, proj_dir):
548                return False
549            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
550                    proj_dir):
551                return False
552            if os.path.isdir("%s/tarfiles" % tmpdir):
553                if not self.ship_configs(host, user,
554                        "%s/tarfiles" % tmpdir, tarfiles_dir):
555                    return False
556            if os.path.isdir("%s/rpms" % tmpdir):
557                if not self.ship_configs(host, user,
558                        "%s/rpms" % tmpdir, tarfiles_dir):
559                    return False
560            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
561            if not self.ssh_cmd(user, host,
562                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
563                    "modexp"):
564                return False
565            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
566            if not self.ssh_cmd(user, host,
567                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
568                    "swapexp"):
569                return False
570            return True
571        elif state == "none":
572            # No remote experiment.  Create one.  We do this in 2 steps so we
573            # can put the configuration files and scripts into the new
574            # experiment directories.
575
576            # Tarfiles must be present for creation to work
577            if os.path.isdir("%s/tarfiles" % tmpdir):
578                if not self.ship_configs(host, user,
579                        "%s/tarfiles" % tmpdir, tarfiles_dir):
580                    return False
581            if os.path.isdir("%s/rpms" % tmpdir):
582                if not self.ship_configs(host, user,
583                        "%s/rpms" % tmpdir, tarfiles_dir):
584                    return False
585            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
586            if not self.ssh_cmd(user, host,
587                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
588                            (pid, eid, tclfile), "startexp"):
589                return False
590            # After startexp the per-experiment directories exist
591            for f in base_confs:
592                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
593                        "%s/%s" % (proj_dir, f)):
594                    return False
595            if not self.ship_scripts(host, user, proj_dir):
596                return False
597            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
598                    proj_dir):
599                return False
600            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
601            if not self.ssh_cmd(user, host,
602                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
603                    "swapexp"):
604                return False
605            return True
606        else:
607            self.log.debug("[start_segment]:unknown state %s" % state)
608            return False
609
610    def stop_segment(self, tb, eid, tbparams):
611        """
612        Stop a sub experiment by calling swapexp on the federant
613        """
614        user = tbparams[tb]['user']
615        host = tbparams[tb]['host']
616        pid = tbparams[tb]['project']
617
618        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
619        return self.ssh_cmd(user, host,
620                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
621
622       
623    def generate_ssh_keys(self, dest, type="rsa" ):
624        """
625        Generate a set of keys for the gateways to use to talk.
626
627        Keys are of type type and are stored in the required dest file.
628        """
629        valid_types = ("rsa", "dsa")
630        t = type.lower();
631        if t not in valid_types: raise ValueError
632        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
633
634        try:
635            trace = open("/dev/null", "w")
636        except IOError:
637            raise service_error(service_error.internal,
638                    "Cannot open /dev/null??");
639
640        # May raise CalledProcessError
641        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
642        rv = call(cmd, stdout=trace, stderr=trace)
643        if rv != 0:
644            raise service_error(service_error.internal, 
645                    "Cannot generate nonce ssh keys.  %s return code %d" \
646                            % (self.ssh_keygen, rv))
647
648    def gentopo(self, str):
649        """
650        Generate the topology dtat structure from the splitter's XML
651        representation of it.
652
653        The topology XML looks like:
654            <experiment>
655                <nodes>
656                    <node><vname></vname><ips>ip1:ip2</ips></node>
657                </nodes>
658                <lans>
659                    <lan>
660                        <vname></vname><vnode></vnode><ip></ip>
661                        <bandwidth></bandwidth><member>node:port</member>
662                    </lan>
663                </lans>
664        """
665        class topo_parse:
666            """
667            Parse the topology XML and create the dats structure.
668            """
669            def __init__(self):
670                # Typing of the subelements for data conversion
671                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
672                self.int_subelements = ( 'bandwidth',)
673                self.float_subelements = ( 'delay',)
674                # The final data structure
675                self.nodes = [ ]
676                self.lans =  [ ]
677                self.topo = { \
678                        'node': self.nodes,\
679                        'lan' : self.lans,\
680                    }
681                self.element = { }  # Current element being created
682                self.chars = ""     # Last text seen
683
684            def end_element(self, name):
685                # After each sub element the contents is added to the current
686                # element or to the appropriate list.
687                if name == 'node':
688                    self.nodes.append(self.element)
689                    self.element = { }
690                elif name == 'lan':
691                    self.lans.append(self.element)
692                    self.element = { }
693                elif name in self.str_subelements:
694                    self.element[name] = self.chars
695                    self.chars = ""
696                elif name in self.int_subelements:
697                    self.element[name] = int(self.chars)
698                    self.chars = ""
699                elif name in self.float_subelements:
700                    self.element[name] = float(self.chars)
701                    self.chars = ""
702
703            def found_chars(self, data):
704                self.chars += data.rstrip()
705
706
707        tp = topo_parse();
708        parser = xml.parsers.expat.ParserCreate()
709        parser.EndElementHandler = tp.end_element
710        parser.CharacterDataHandler = tp.found_chars
711
712        parser.Parse(str)
713
714        return tp.topo
715       
716
717    def genviz(self, topo):
718        """
719        Generate the visualization the virtual topology
720        """
721
722        neato = "/usr/local/bin/neato"
723        # These are used to parse neato output and to create the visualization
724        # file.
725        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
726        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
727                "%s</type></node>"
728
729        try:
730            # Node names
731            nodes = [ n['vname'] for n in topo['node'] ]
732            topo_lans = topo['lan']
733        except KeyError:
734            raise service_error(service_error.internal, "Bad topology")
735
736        lans = { }
737        links = { }
738
739        # Walk through the virtual topology, organizing the connections into
740        # 2-node connections (links) and more-than-2-node connections (lans).
741        # When a lan is created, it's added to the list of nodes (there's a
742        # node in the visualization for the lan).
743        for l in topo_lans:
744            if links.has_key(l['vname']):
745                if len(links[l['vname']]) < 2:
746                    links[l['vname']].append(l['vnode'])
747                else:
748                    nodes.append(l['vname'])
749                    lans[l['vname']] = links[l['vname']]
750                    del links[l['vname']]
751                    lans[l['vname']].append(l['vnode'])
752            elif lans.has_key(l['vname']):
753                lans[l['vname']].append(l['vnode'])
754            else:
755                links[l['vname']] = [ l['vnode'] ]
756
757
758        # Open up a temporary file for dot to turn into a visualization
759        try:
760            df, dotname = tempfile.mkstemp()
761            dotfile = os.fdopen(df, 'w')
762        except IOError:
763            raise service_error(service_error.internal,
764                    "Failed to open file in genviz")
765
766        # Generate a dot/neato input file from the links, nodes and lans
767        try:
768            print >>dotfile, "graph G {"
769            for n in nodes:
770                print >>dotfile, '\t"%s"' % n
771            for l in links.keys():
772                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
773            for l in lans.keys():
774                for n in lans[l]:
775                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
776            print >>dotfile, "}"
777            dotfile.close()
778        except TypeError:
779            raise service_error(service_error.internal,
780                    "Single endpoint link in vtopo")
781        except IOError:
782            raise service_error(service_error.internal, "Cannot write dot file")
783
784        # Use dot to create a visualization
785        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
786                '-Gpack=true', dotname], stdout=PIPE)
787
788        # Translate dot to vis format
789        vis_nodes = [ ]
790        vis = { 'node': vis_nodes }
791        for line in dot.stdout:
792            m = vis_re.match(line)
793            if m:
794                vn = m.group(1)
795                vis_node = {'name': vn, \
796                        'x': float(m.group(2)),\
797                        'y' : float(m.group(3)),\
798                    }
799                if vn in links.keys() or vn in lans.keys():
800                    vis_node['type'] = 'lan'
801                else:
802                    vis_node['type'] = 'node'
803                vis_nodes.append(vis_node)
804        rv = dot.wait()
805
806        os.remove(dotname)
807        if rv == 0 : return vis
808        else: return None
809
810
811    def get_access(self, tb, nodes, user, tbparam):
812        """
813        Get access to testbed through fedd and set the parameters for that tb
814        """
815
816        translate_attr = {
817            'slavenodestartcmd': 'expstart',
818            'slaveconnectorstartcmd': 'gwstart',
819            'masternodestartcmd': 'mexpstart',
820            'masterconnectorstartcmd': 'mgwstart',
821            'connectorimage': 'gwimage',
822            'connectortype': 'gwtype',
823            'tunnelcfg': 'tun',
824            'smbshare': 'smbshare',
825        }
826
827        uri = self.tbmap.get(tb, None)
828        if not uri:
829            raise service_error(serice_error.server_config, 
830                    "Unknown testbed: %s" % tb)
831
832        # The basic request
833        req = {\
834                'destinationTestbed' : { 'uri' : uri },
835                'user':  user,
836                'allocID' : { 'localname': 'test' },
837                # XXX: need to get service access stright
838                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
839                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
840            }
841       
842        # node resources if any
843        if nodes != None and len(nodes) > 0:
844            rnodes = [ ]
845            for n in nodes:
846                rn = { }
847                image, hw, count = n.split(":")
848                if image: rn['image'] = [ image ]
849                if hw: rn['hardware'] = [ hw ]
850                if count: rn['count'] = int(count)
851                rnodes.append(rn)
852            req['resources']= { }
853            req['resources']['node'] = rnodes
854
855        # No retry loop here.  Proxy servers must correctly authenticate
856        # themselves without help
857
858        try:
859            ctx = fedd_ssl_context(self.cert_file, 
860                    self.trusted_certs, password=self.cert_pwd)
861        except SSL.SSLError:
862            raise service_error(service_error.server_config, 
863                    "Server certificates misconfigured")
864
865        loc = feddServiceLocator();
866        port = loc.getfeddPortType(uri,
867                transport=M2Crypto.httpslib.HTTPSConnection, 
868                transdict={ 'ssl_context' : ctx })
869
870        # Reconstruct the full request message
871        msg = RequestAccessRequestMessage()
872        msg.set_element_RequestAccessRequestBody(
873                pack_soap(msg, "RequestAccessRequestBody", req))
874
875        try:
876            resp = port.RequestAccess(msg)
877        except ZSI.ParseException, e:
878            raise service_error(service_error.req,
879                    "Bad format message (XMLRPC??): %s" %
880                    str(e))
881        r = unpack_soap(resp)
882
883        if r.has_key('RequestAccessResponseBody'):
884            r = r['RequestAccessResponseBody']
885        else:
886            raise service_error(service_error.proxy,
887                    "Bad proxy response")
888
889
890        e = r['emulab']
891        p = e['project']
892        tbparam[tb] = { 
893                "boss": e['boss'],
894                "host": e['ops'],
895                "domain": e['domain'],
896                "fs": e['fileServer'],
897                "eventserver": e['eventServer'],
898                "project": unpack_id(p['name']),
899                "emulab" : e
900                }
901        # Make the testbed name be the label the user applied
902        p['testbed'] = {'localname': tb }
903
904        for u in p['user']:
905            tbparam[tb]['user'] = unpack_id(u['userID'])
906
907        for a in e['fedAttr']:
908            if a['attribute']:
909                key = translate_attr.get(a['attribute'].lower(), None)
910                if key:
911                    tbparam[tb][key]= a['value']
912
913    def remote_splitter(self, uri, desc, master):
914
915        req = {
916                'description' : { 'ns2description': desc },
917                'master': master,
918            }
919
920        # No retry loop here.  Proxy servers must correctly authenticate
921        # themselves without help
922        try:
923            ctx = fedd_ssl_context(self.cert_file, 
924                    self.trusted_certs, password=self.cert_pwd)
925        except SSL.SSLError:
926            raise service_error(service_error.server_config, 
927                    "Server certificates misconfigured")
928
929        loc = feddInternalServiceLocator();
930        port = loc.getfeddInternalPortType(uri,
931                transport=M2Crypto.httpslib.HTTPSConnection, 
932                transdict={ 'ssl_context' : ctx })
933
934        # Reconstruct the full request message
935        msg = Ns2SplitRequestMessage()
936        msg.set_element_Ns2SplitRequestBody(
937                pack_soap(msg, "Ns2SplitRequestBody", req))
938
939        try:
940            resp = port.Ns2Split(msg)
941        except ZSI.ParseException, e:
942            raise service_error(service_error.req,
943                    "Bad format message (XMLRPC??): %s" %
944                    str(e))
945        r = unpack_soap(resp)
946        if r.has_key('Ns2SplitResponseBody'):
947            r = r['Ns2SplitResponseBody']
948            if r.has_key('output'):
949                return r['output'].splitlines()
950            else:
951                raise service_error(service_error.proxy, 
952                        "Bad splitter response (no output)")
953        else:
954            raise service_error(service_error.proxy, "Bad splitter response")
955       
956    class current_testbed:
957        """
958        Object for collecting the current testbed description.  The testbed
959        description is saved to a file with the local testbed variables
960        subsittuted line by line.
961        """
962        def __init__(self, eid, tmpdir):
963            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
964            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
965            self.current_testbed = None
966            self.testbed_file = None
967
968            self.def_expstart = \
969                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
970            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
971            self.def_gwstart = \
972                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
973            self.def_mgwstart = \
974                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
975            self.def_gwimage = "FBSD61-TUNNEL2";
976            self.def_gwtype = "pc";
977
978            self.eid = eid
979            self.tmpdir = tmpdir
980
981        def __call__(self, line, master, allocated, tbparams):
982            # Capture testbed topology descriptions
983            if self.current_testbed == None:
984                m = self.begin_testbed.match(line)
985                if m != None:
986                    self.current_testbed = m.group(1)
987                    if self.current_testbed == None:
988                        raise service_error(service_error.req,
989                                "Bad request format (unnamed testbed)")
990                    allocated[self.current_testbed] = \
991                            allocated.get(self.current_testbed,0) + 1
992                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
993                    if not os.path.exists(tb_dir):
994                        try:
995                            os.mkdir(tb_dir)
996                        except IOError:
997                            raise service_error(service_error.internal,
998                                    "Cannot create %s" % tb_dir)
999                    try:
1000                        self.testbed_file = open("%s/%s.%s.tcl" %
1001                                (tb_dir, self.eid, self.current_testbed), 'w')
1002                    except IOError:
1003                        self.testbed_file = None
1004                    return True
1005                else: return False
1006            else:
1007                m = self.end_testbed.match(line)
1008                if m != None:
1009                    if m.group(1) != self.current_testbed:
1010                        raise service_error(service_error.internal, 
1011                                "Mismatched testbed markers!?")
1012                    if self.testbed_file != None: 
1013                        self.testbed_file.close()
1014                        self.testbed_file = None
1015                    self.current_testbed = None
1016                elif self.testbed_file:
1017                    # Substitute variables and put the line into the local
1018                    # testbed file.
1019                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1020                            self.def_gwtype)
1021                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1022                            self.def_gwimage)
1023                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1024                            self.def_mgwstart)
1025                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1026                            self.def_mexpstart)
1027                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1028                            self.def_gwstart)
1029                    expstart = tbparams[self.current_testbed].get('expstart', 
1030                            self.def_expstart)
1031                    project = tbparams[self.current_testbed].get('project')
1032                    line = re.sub("GWTYPE", gwtype, line)
1033                    line = re.sub("GWIMAGE", gwimage, line)
1034                    if self.current_testbed == master:
1035                        line = re.sub("GWSTART", mgwstart, line)
1036                        line = re.sub("EXPSTART", mexpstart, line)
1037                    else:
1038                        line = re.sub("GWSTART", gwstart, line)
1039                        line = re.sub("EXPSTART", expstart, line)
1040                    # XXX: does `` embed without doing enything else?
1041                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1042                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1043                    line = re.sub("EID", self.eid, line)
1044                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1045                            (project, self.eid), line)
1046                    print >>self.testbed_file, line
1047                return True
1048
1049    class allbeds:
1050        """
1051        Process the Allbeds section.  Get access to each federant and save the
1052        parameters in tbparams
1053        """
1054        def __init__(self, get_access):
1055            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1056            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1057            self.in_allbeds = False
1058            self.get_access = get_access
1059
1060        def __call__(self, line, user, tbparams):
1061            # Testbed access parameters
1062            if not self.in_allbeds:
1063                if self.begin_allbeds.match(line):
1064                    self.in_allbeds = True
1065                    return True
1066                else:
1067                    return False
1068            else:
1069                if self.end_allbeds.match(line):
1070                    self.in_allbeds = False
1071                else:
1072                    nodes = line.split('|')
1073                    tb = nodes.pop(0)
1074                    self.get_access(tb, nodes, user, tbparams)
1075                return True
1076
1077    class gateways:
1078        def __init__(self, eid, master, tmpdir, gw_pubkey,
1079                gw_secretkey, copy_file):
1080            self.begin_gateways = \
1081                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1082            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1083            self.current_gateways = None
1084            self.control_gateway = None
1085            self.active_end = { }
1086
1087            self.eid = eid
1088            self.master = master
1089            self.tmpdir = tmpdir
1090            self.gw_pubkey_base = gw_pubkey
1091            self.gw_secretkey_base = gw_secretkey
1092
1093            self.copy_file = copy_file
1094
1095
1096        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1097                active_end, tbparams, dtb, myname, desthost, type):
1098            """
1099            Produce a gateway configuration file from a gateways line.
1100            """
1101
1102            sproject = tbparams[gw].get('project', 'project')
1103            dproject = tbparams[dtb].get('project', 'project')
1104            sdomain = ".%s.%s%s" % (eid, sproject,
1105                    tbparams[gw].get('domain', ".example.com"))
1106            ddomain = ".%s.%s%s" % (eid, dproject,
1107                    tbparams[dtb].get('domain', ".example.com"))
1108            boss = tbparams[master].get('boss', "boss")
1109            fs = tbparams[master].get('fs', "fs")
1110            event_server = "%s%s" % \
1111                    (tbparams[gw].get('eventserver', "event_server"),
1112                            tbparams[gw].get('domain', "example.com"))
1113            remote_event_server = "%s%s" % \
1114                    (tbparams[dtb].get('eventserver', "event_server"),
1115                            tbparams[dtb].get('domain', "example.com"))
1116            seer_control = "%s%s" % \
1117                    (tbparams[gw].get('control', "control"), sdomain)
1118            remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1119            local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1120            tunnel_cfg = tbparams[gw].get("tun", "false")
1121
1122            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1123            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1124
1125            # translate to lower case so the `hostname` hack for specifying
1126            # configuration files works.
1127            conf_file = conf_file.lower();
1128            remote_conf_file = remote_conf_file.lower();
1129
1130            if dtb == master:
1131                active = "false"
1132            elif gw == master:
1133                active = "true"
1134            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1135                active = "false"
1136            else:
1137                active_end['%s-%s' % (gw, dtb)] = 1
1138                active = "true"
1139
1140            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1141            print >>gwconfig, "Active: %s" % active
1142            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1143            print >>gwconfig, "BossName: %s" % boss
1144            print >>gwconfig, "FsName: %s" % fs
1145            print >>gwconfig, "EventServerName: %s" % event_server
1146            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1147            print >>gwconfig, "SeerControl: %s" % seer_control
1148            print >>gwconfig, "Type: %s" % type
1149            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1150            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1151                    local_script_dir
1152            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1153            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1154            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1155                    (remote_script_dir, remote_conf_file)
1156            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1157            print >>gwconfig, "Pubkeys: %s/%s" % (local_script_dir, pubkey)
1158            print >>gwconfig, "Privkeys: %s/%s" % (local_script_dir, privkey)
1159            gwconfig.close()
1160
1161            return active == "true"
1162
1163        def __call__(self, line, allocated, tbparams):
1164            # Process gateways
1165            if not self.current_gateways:
1166                m = self.begin_gateways.match(line)
1167                if m:
1168                    self.current_gateways = m.group(1)
1169                    if allocated.has_key(self.current_gateways):
1170                        # This test should always succeed
1171                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1172                        if not os.path.exists(tb_dir):
1173                            try:
1174                                os.mkdir(tb_dir)
1175                            except IOError:
1176                                raise service_error(service_error.internal,
1177                                        "Cannot create %s" % tb_dir)
1178                    else:
1179                        # XXX
1180                        self.log.error("[gateways]: Ignoring gateways for " + \
1181                                "unknown testbed %s" % self.current_gateways)
1182                        self.current_gateways = None
1183                    return True
1184                else:
1185                    return False
1186            else:
1187                m = self.end_gateways.match(line)
1188                if m :
1189                    if m.group(1) != self.current_gateways:
1190                        raise service_error(service_error.internal,
1191                                "Mismatched gateway markers!?")
1192                    if self.control_gateway:
1193                        try:
1194                            cc = open("%s/%s/client.conf" %
1195                                    (self.tmpdir, self.current_gateways), 'w')
1196                            print >>cc, "ControlGateway: %s" % \
1197                                    self.control_gateway
1198                            if tbparams[self.master].has_key('smbshare'):
1199                                print >>cc, "SMBSHare: %s" % \
1200                                        tbparams[self.master]['smbshare']
1201                            print >>cc, "ProjectUser: %s" % \
1202                                    tbparams[self.master]['user']
1203                            print >>cc, "ProjectName: %s" % \
1204                                    tbparams[self.master]['project']
1205                            cc.close()
1206                        except IOError:
1207                            raise service_error(service_error.internal,
1208                                    "Error creating client config")
1209                        try:
1210                            cc = open("%s/%s/seer.conf" %
1211                                    (self.tmpdir, self.current_gateways),
1212                                    'w')
1213                            if self.current_gateways != self.master:
1214                                print >>cc, "ControlNode: %s" % \
1215                                        self.control_gateway
1216                            print >>cc, "ExperimentID: %s/%s" % \
1217                                    ( tbparams[self.master]['project'], \
1218                                    self.eid )
1219                            cc.close()
1220                        except IOError:
1221                            raise service_error(service_error.internal,
1222                                    "Error creating seer config")
1223                    else:
1224                        debug.error("[gateways]: No control gateway for %s" %\
1225                                    self.current_gateways)
1226                    self.current_gateways = None
1227                else:
1228                    dtb, myname, desthost, type = line.split(" ")
1229
1230                    if type == "control" or type == "both":
1231                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1232                                self.eid, 
1233                                tbparams[self.current_gateways]['project'],
1234                                tbparams[self.current_gateways]['domain'])
1235                    try:
1236                        active = self.gateway_conf_file(self.current_gateways,
1237                                self.master, self.eid, self.gw_pubkey_base,
1238                                self.gw_secretkey_base,
1239                                self.active_end, tbparams, dtb, myname,
1240                                desthost, type)
1241                    except IOError, e:
1242                        raise service_error(service_error.internal,
1243                                "Failed to write config file for %s" % \
1244                                        self.current_gateway)
1245           
1246                    gw_pubkey = "%s/keys/%s" % \
1247                            (self.tmpdir, self.gw_pubkey_base)
1248                    gw_secretkey = "%s/keys/%s" % \
1249                            (self.tmpdir, self.gw_secretkey_base)
1250
1251                    pkfile = "%s/%s/%s" % \
1252                            ( self.tmpdir, self.current_gateways, 
1253                                    self.gw_pubkey_base)
1254                    skfile = "%s/%s/%s" % \
1255                            ( self.tmpdir, self.current_gateways, 
1256                                    self.gw_secretkey_base)
1257
1258                    if not os.path.exists(pkfile):
1259                        try:
1260                            self.copy_file(gw_pubkey, pkfile)
1261                        except IOError:
1262                            service_error(service_error.internal,
1263                                    "Failed to copy pubkey file")
1264
1265                    if active and not os.path.exists(skfile):
1266                        try:
1267                            self.copy_file(gw_secretkey, skfile)
1268                        except IOError:
1269                            service_error(service_error.internal,
1270                                    "Failed to copy secretkey file")
1271                return True
1272
1273    class shunt_to_file:
1274        """
1275        Simple class to write data between two regexps to a file.
1276        """
1277        def __init__(self, begin, end, filename):
1278            """
1279            Begin shunting on a match of begin, stop on end, send data to
1280            filename.
1281            """
1282            self.begin = re.compile(begin)
1283            self.end = re.compile(end)
1284            self.in_shunt = False
1285            self.file = None
1286            self.filename = filename
1287
1288        def __call__(self, line):
1289            """
1290            Call this on each line in the input that may be shunted.
1291            """
1292            if not self.in_shunt:
1293                if self.begin.match(line):
1294                    self.in_shunt = True
1295                    try:
1296                        self.file = open(self.filename, "w")
1297                    except:
1298                        self.file = None
1299                        raise
1300                    return True
1301                else:
1302                    return False
1303            else:
1304                if self.end.match(line):
1305                    if self.file: 
1306                        self.file.close()
1307                        self.file = None
1308                    self.in_shunt = False
1309                else:
1310                    if self.file:
1311                        print >>self.file, line
1312                return True
1313
1314    class shunt_to_list:
1315        """
1316        Same interface as shunt_to_file.  Data collected in self.list, one list
1317        element per line.
1318        """
1319        def __init__(self, begin, end):
1320            self.begin = re.compile(begin)
1321            self.end = re.compile(end)
1322            self.in_shunt = False
1323            self.list = [ ]
1324       
1325        def __call__(self, line):
1326            if not self.in_shunt:
1327                if self.begin.match(line):
1328                    self.in_shunt = True
1329                    return True
1330                else:
1331                    return False
1332            else:
1333                if self.end.match(line):
1334                    self.in_shunt = False
1335                else:
1336                    self.list.append(line)
1337                return True
1338
1339    class shunt_to_string:
1340        """
1341        Same interface as shunt_to_file.  Data collected in self.str, all in
1342        one string.
1343        """
1344        def __init__(self, begin, end):
1345            self.begin = re.compile(begin)
1346            self.end = re.compile(end)
1347            self.in_shunt = False
1348            self.str = ""
1349       
1350        def __call__(self, line):
1351            if not self.in_shunt:
1352                if self.begin.match(line):
1353                    self.in_shunt = True
1354                    return True
1355                else:
1356                    return False
1357            else:
1358                if self.end.match(line):
1359                    self.in_shunt = False
1360                else:
1361                    self.str += line
1362                return True
1363
1364    def create_experiment(self, req, fid):
1365        """
1366        The external interface to experiment creation called from the
1367        dispatcher.
1368
1369        Creates a working directory, splits the incoming description using the
1370        splitter script and parses out the avrious subsections using the
1371        lcasses above.  Once each sub-experiment is created, use pooled threads
1372        to instantiate them and start it all up.
1373        """
1374        try:
1375            tmpdir = tempfile.mkdtemp(prefix="split-")
1376        except IOError:
1377            raise service_error(service_error.internal, "Cannot create tmp dir")
1378
1379        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1380        gw_secretkey_base = "fed.%s" % self.ssh_type
1381        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1382        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1383        tclfile = tmpdir + "/experiment.tcl"
1384        tbparams = { }
1385
1386        pid = "dummy"
1387        gid = "dummy"
1388        # XXX
1389        fail_soft = False
1390
1391        try:
1392            os.mkdir(tmpdir+"/keys")
1393        except OSError:
1394            raise service_error(service_error.internal,
1395                    "Can't make temporary dir")
1396
1397        req = req.get('CreateRequestBody', None)
1398        if not req:
1399            raise service_error(service_error.req,
1400                    "Bad request format (no CreateRequestBody)")
1401        # The tcl parser needs to read a file so put the content into that file
1402        descr=req.get('experimentdescription', None)
1403        if descr:
1404            file_content=descr.get('ns2description', None)
1405            if file_content:
1406                try:
1407                    f = open(tclfile, 'w')
1408                    f.write(file_content)
1409                    f.close()
1410                except IOError:
1411                    raise service_error(service_error.internal,
1412                            "Cannot write temp experiment description")
1413            else:
1414                raise service_error(service_error.req, 
1415                        "Only ns2descriptions supported")
1416        else:
1417            raise service_error(service_error.req, "No experiment description")
1418
1419        if req.has_key('experimentID') and \
1420                req['experimentID'].has_key('localname'):
1421            eid = req['experimentID']['localname']
1422            self.state_lock.acquire()
1423            while (self.state.has_key(eid)):
1424                eid += random.choice(string.ascii_letters)
1425            # To avoid another thread picking this localname
1426            self.state[eid] = "placeholder"
1427            self.state_lock.release()
1428        else:
1429            eid = self.exp_stem
1430            for i in range(0,5):
1431                eid += random.choice(string.ascii_letters)
1432            self.state_lock.acquire()
1433            while (self.state.has_key(eid)):
1434                eid = self.exp_stem
1435                for i in range(0,5):
1436                    eid += random.choice(string.ascii_letters)
1437            # To avoid another thread picking this localname
1438            self.state[eid] = "placeholder"
1439            self.state_lock.release()
1440
1441        try:
1442            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1443        except ValueError:
1444            raise service_error(service_error.server_config, 
1445                    "Bad key type (%s)" % self.ssh_type)
1446
1447        user = req.get('user', None)
1448        if user == None:
1449            raise service_error(service_error.req, "No user")
1450
1451        master = req.get('master', None)
1452        if master == None:
1453            raise service_error(service_error.req, "No master testbed label")
1454       
1455        if self.splitter_url:
1456            split_data = self.remote_splitter(self.splitter_url, file_content,
1457                    master)
1458        else:
1459            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1460                str(self.muxmax), '-m', master, pid, gid, eid, tclfile]
1461            tclparser = Popen(tclcmd, stdout=PIPE)
1462            split_data = tclparser.stdout
1463
1464        allocated = { }     # Testbeds we can access
1465        started = { }       # Testbeds where a sub-experiment started
1466                            # successfully
1467
1468        # Objects to parse the splitter output (defined above)
1469        parse_current_testbed = self.current_testbed(eid, tmpdir)
1470        parse_allbeds = self.allbeds(self.get_access)
1471        parse_gateways = self.gateways(eid, master, tmpdir,
1472                gw_pubkey_base, gw_secretkey_base, self.copy_file)
1473        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1474                    "^#\s+End\s+Vtopo")
1475        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1476                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1477        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1478                "^#\s+End\s+tarfiles")
1479        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1480                "^#\s+End\s+rpms")
1481
1482        # Worling on the split data
1483        for line in split_data:
1484            line = line.rstrip()
1485            if parse_current_testbed(line, master, allocated, tbparams):
1486                continue
1487            elif parse_allbeds(line, user, tbparams):
1488                continue
1489            elif parse_gateways(line, allocated, tbparams):
1490                continue
1491            elif parse_vtopo(line):
1492                continue
1493            elif parse_hostnames(line):
1494                continue
1495            elif parse_tarfiles(line):
1496                continue
1497            elif parse_rpms(line):
1498                continue
1499            else:
1500                raise service_error(service_error.internal, 
1501                        "Bad tcl parse? %s" % line)
1502
1503        # Virtual topology and visualization
1504        vtopo = self.gentopo(parse_vtopo.str)
1505        if not vtopo:
1506            raise service_error(service_error.internal, 
1507                    "Failed to generate virtual topology")
1508
1509        vis = self.genviz(vtopo)
1510        if not vis:
1511            raise service_error(service_error.internal, 
1512                    "Failed to generate visualization")
1513
1514        # save federant information
1515        for k in allocated.keys():
1516            tbparams[k]['federant'] = {\
1517                    'name': [ { 'localname' : eid} ],\
1518                    'emulab': tbparams[k]['emulab'],\
1519                    'master' : k == master,\
1520                }
1521
1522
1523        # Copy tarfiles and rpms needed at remote sites into a staging area
1524        try:
1525            for t in parse_tarfiles.list:
1526                if not os.path.exists("%s/tarfiles" % tmpdir):
1527                    os.mkdir("%s/tarfiles" % tmpdir)
1528                self.copy_file(t, "%s/tarfiles/%s" % \
1529                        (tmpdir, os.path.basename(t)))
1530            for r in parse_rpms.list:
1531                if not os.path.exists("%s/rpms" % tmpdir):
1532                    os.mkdir("%s/rpms" % tmpdir)
1533                self.copy_file(r, "%s/rpms/%s" % \
1534                        (tmpdir, os.path.basename(r)))
1535        except IOError, e:
1536            raise service_error(service_error.internal, 
1537                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1538
1539        thread_pool_info = self.thread_pool()
1540        threads = [ ]
1541
1542        for tb in [ k for k in allocated.keys() if k != master]:
1543            # Wait until we have a free slot to start the next testbed load
1544            thread_pool_info.acquire()
1545            while thread_pool_info.started - \
1546                    thread_pool_info.terminated >= self.nthreads:
1547                thread_pool_info.wait()
1548            thread_pool_info.release()
1549
1550            # Create and start a thread to start the segment, and save it to
1551            # get the return value later
1552            t  = self.pooled_thread(target=self.start_segment, 
1553                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1554                    pdata=thread_pool_info, trace_file=self.trace_file)
1555            threads.append(t)
1556            t.start()
1557
1558        # Wait until all finish (the first clause of the while is to make sure
1559        # one starts)
1560        thread_pool_info.acquire()
1561        while thread_pool_info.started == 0 or \
1562                thread_pool_info.started > thread_pool_info.terminated:
1563            thread_pool_info.wait()
1564        thread_pool_info.release()
1565
1566        # If none failed, start the master
1567        failed = [ t.getName() for t in threads if not t.rv ]
1568
1569        if len(failed) == 0:
1570            if not self.start_segment(master, eid, tbparams, tmpdir):
1571                failed.append(master)
1572
1573        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1574        # If one failed clean up, unless fail_soft is set
1575        if failed:
1576            if not fail_soft:
1577                for tb in succeeded:
1578                    self.stop_segment(tb, eid, tbparams)
1579                # Remove the placeholder
1580                self.state_lock.acquire()
1581                del self.state[eid]
1582                self.state_lock.release()
1583
1584                raise service_error(service_error.federant,
1585                    "Swap in failed on %s" % ",".join(failed))
1586        else:
1587            self.log.info("[start_segment]: Experiment %s started" % eid)
1588
1589        # Generate an ID for the experiment (slice) and a certificate that the
1590        # allocator can use to prove they own it.  We'll ship it back through
1591        # the encrypted connection.
1592        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1593
1594        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1595
1596        # Walk up tmpdir, deleting as we go
1597        for path, dirs, files in os.walk(tmpdir, topdown=False):
1598            for f in files:
1599                os.remove(os.path.join(path, f))
1600            for d in dirs:
1601                os.rmdir(os.path.join(path, d))
1602        os.rmdir(tmpdir)
1603
1604        resp = { 'federant' : [ tbparams[tb]['federant'] \
1605                for tb in tbparams.keys() \
1606                    if tbparams[tb].has_key('federant') ],\
1607                    'vtopo': vtopo,\
1608                    'vis' : vis,
1609                    'experimentID' : [\
1610                            { 'fedid': copy.copy(expid) }, \
1611                            { 'localname': eid },\
1612                        ],\
1613                    'experimentAccess': { 'X509' : expcert },\
1614                }
1615
1616        # Insert the experiment into our state and update the disk copy
1617        self.state_lock.acquire()
1618        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1619                for tb in tbparams.keys() \
1620                    if tbparams[tb].has_key('federant') ],\
1621                    'vtopo': vtopo,\
1622                    'vis' : vis,
1623                    'experimentID' : [\
1624                            { 'fedid': expid }, { 'localname': eid },\
1625                        ],\
1626                }
1627        self.state[eid] = self.state[expid]
1628        if self.state_filename: self.write_state()
1629        self.state_lock.release()
1630
1631        if not failed:
1632            return resp
1633        else:
1634            raise service_error(service_error.partial, \
1635                    "Partial swap in on %s" % ",".join(succeeded))
1636
1637
1638    def get_vtopo(self, req, fid):
1639        """
1640        Return the stored virtual topology for this experiment
1641        """
1642        rv = None
1643
1644        req = req.get('VtopoRequestBody', None)
1645        if not req:
1646            raise service_error(service_error.req,
1647                    "Bad request format (no VtopoRequestBody)")
1648        exp = req.get('experiment', None)
1649        if exp:
1650            if exp.has_key('fedid'):
1651                key = fedid(bits=exp['fedid'])
1652                keytype = "fedid"
1653            elif exp.has_key('localname'):
1654                key = exp['localname']
1655                keytype = "localname"
1656            else:
1657                raise service_error(service_error.req, "Unknown lookup type")
1658        else:
1659            raise service_error(service_error.req, "No request?")
1660
1661        self.state_lock.acquire()
1662        if self.state.has_key(key):
1663            rv = { 'experiment' : {keytype: key },\
1664                    'vtopo': self.state[key]['vtopo'],\
1665                }
1666        self.state_lock.release()
1667
1668        if rv: return rv
1669        else: raise service_error(service_error.req, "No such experiment")
1670
1671    def get_vis(self, req, fid):
1672        """
1673        Return the stored visualization for this experiment
1674        """
1675        rv = None
1676
1677        req = req.get('VisRequestBody', None)
1678        if not req:
1679            raise service_error(service_error.req,
1680                    "Bad request format (no VisRequestBody)")
1681        exp = req.get('experiment', None)
1682        if exp:
1683            if exp.has_key('fedid'):
1684                key = fedid(bits=exp['fedid'])
1685                keytype = "fedid"
1686            elif exp.has_key('localname'):
1687                key = exp['localname']
1688                keytype = "localname"
1689            else:
1690                raise service_error(service_error.req, "Unknown lookup type")
1691        else:
1692            raise service_error(service_error.req, "No request?")
1693
1694        self.state_lock.acquire()
1695        if self.state.has_key(key):
1696            rv =  { 'experiment' : {keytype: key },\
1697                    'vis': self.state[key]['vis'],\
1698                    }
1699        self.state_lock.release()
1700
1701        if rv: return rv
1702        else: raise service_error(service_error.req, "No such experiment")
1703
1704    def get_info(self, req, fid):
1705        """
1706        Return all the stored info about this experiment
1707        """
1708        rv = None
1709
1710        req = req.get('InfoRequestBody', None)
1711        if not req:
1712            raise service_error(service_error.req,
1713                    "Bad request format (no VisRequestBody)")
1714        exp = req.get('experiment', None)
1715        if exp:
1716            if exp.has_key('fedid'):
1717                key = fedid(bits=exp['fedid'])
1718                keytype = "fedid"
1719            elif exp.has_key('localname'):
1720                key = exp['localname']
1721                keytype = "localname"
1722            else:
1723                raise service_error(service_error.req, "Unknown lookup type")
1724        else:
1725            raise service_error(service_error.req, "No request?")
1726
1727        # The state may be massaged by the service function that called
1728        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1729        # state.
1730        self.state_lock.acquire()
1731        if self.state.has_key(key):
1732            rv = copy.deepcopy(self.state[key])
1733        self.state_lock.release()
1734
1735        if rv: return rv
1736        else: raise service_error(service_error.req, "No such experiment")
1737
1738
1739    def terminate_experiment(self, req, fid):
1740        """
1741        Swap this experiment out on the federants and delete the shared
1742        information
1743        """
1744        tbparams = { }
1745        req = req.get('TerminateRequestBody', None)
1746        if not req:
1747            raise service_error(service_error.req,
1748                    "Bad request format (no TerminateRequestBody)")
1749        exp = req.get('experiment', None)
1750        if exp:
1751            if exp.has_key('fedid'):
1752                key = fedid(bits=exp['fedid'])
1753                keytype = "fedid"
1754            elif exp.has_key('localname'):
1755                key = exp['localname']
1756                keytype = "localname"
1757            else:
1758                raise service_error(service_error.req, "Unknown lookup type")
1759        else:
1760            raise service_error(service_error.req, "No request?")
1761
1762        self.state_lock.acquire()
1763        fed_exp = self.state.get(key, None)
1764
1765        if fed_exp:
1766            # This branch of the conditional holds the lock to generate a
1767            # consistent temporary tbparams variable to deallocate experiments.
1768            # It releases the lock to do the deallocations and reacquires it to
1769            # remove the experiment state when the termination is complete.
1770            ids = []
1771            #  experimentID is a list of dicts that are self-describing
1772            #  identifiers.  This finds all the fedids and localnames - the
1773            #  keys of self.state - and puts them into ids.
1774            for id in fed_exp.get('experimentID', []):
1775                if id.has_key('fedid'): ids.append(id['fedid'])
1776                if id.has_key('localname'): ids.append(id['localname'])
1777
1778            # Construct enough of the tbparams to make the stop_segment calls
1779            # work
1780            for fed in fed_exp['federant']:
1781                try:
1782                    for e in fed['name']:
1783                        eid = e.get('localname', None)
1784                        if eid: break
1785                    else:
1786                        continue
1787
1788                    p = fed['emulab']['project']
1789
1790                    project = p['name']['localname']
1791                    tb = p['testbed']['localname']
1792                    user = p['user'][0]['userID']['localname']
1793
1794                    domain = fed['emulab']['domain']
1795                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1796                except KeyError, e:
1797                    continue
1798                tbparams[tb] = {\
1799                        'user': user,\
1800                        'domain': domain,\
1801                        'project': project,\
1802                        'host': host,\
1803                        'eid': eid,\
1804                    }
1805            self.state_lock.release()
1806
1807            # Stop everyone.
1808            for tb in tbparams.keys():
1809                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1810
1811            # Remove teh terminated experiment
1812            self.state_lock.acquire()
1813            for id in ids:
1814                if self.state.has_key(id): del self.state[id]
1815
1816            if self.state_filename: self.write_state()
1817            self.state_lock.release()
1818
1819            return { 'experiment': exp }
1820        else:
1821            # Don't forget to release the lock
1822            self.state_lock.release()
1823            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.