source: fedd/fedd_experiment_control.py @ f8b118e

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since f8b118e was 058f58e, checked in by Ted Faber <faber@…>, 16 years ago

Unify the code for calling SOAP and XMLRPC services into a couple classes.
Before there were slightly different semantics everywhere.

Also make the handlers classes rather than the output of stub compiling
functions.

  • Property mode set to 100644
File size: 52.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47   
48    class thread_pool:
49        """
50        A class to keep track of a set of threads all invoked for the same
51        task.  Manages the mutual exclusion of the states.
52        """
53        def __init__(self):
54            """
55            Start a pool.
56            """
57            self.changed = Condition()
58            self.started = 0
59            self.terminated = 0
60
61        def acquire(self):
62            """
63            Get the pool's lock.
64            """
65            self.changed.acquire()
66
67        def release(self):
68            """
69            Release the pool's lock.
70            """
71            self.changed.release()
72
73        def wait(self, timeout = None):
74            """
75            Wait for a pool thread to start or stop.
76            """
77            self.changed.wait(timeout)
78
79        def start(self):
80            """
81            Called by a pool thread to report starting.
82            """
83            self.changed.acquire()
84            self.started += 1
85            self.changed.notifyAll()
86            self.changed.release()
87
88        def terminate(self):
89            """
90            Called by a pool thread to report finishing.
91            """
92            self.changed.acquire()
93            self.terminated += 1
94            self.changed.notifyAll()
95            self.changed.release()
96
97        def clear(self):
98            """
99            Clear all pool data.
100            """
101            self.changed.acquire()
102            self.started = 0
103            self.terminated =0
104            self.changed.notifyAll()
105            self.changed.release()
106
107    class pooled_thread(Thread):
108        """
109        One of a set of threads dedicated to a specific task.  Uses the
110        thread_pool class above for coordination.
111        """
112        def __init__(self, group=None, target=None, name=None, args=(), 
113                kwargs={}, pdata=None, trace_file=None):
114            Thread.__init__(self, group, target, name, args, kwargs)
115            self.rv = None          # Return value of the ops in this thread
116            self.exception = None   # Exception that terminated this thread
117            self.target=target      # Target function to run on start()
118            self.args = args        # Args to pass to target
119            self.kwargs = kwargs    # Additional kw args
120            self.pdata = pdata      # thread_pool for this class
121            # Logger for this thread
122            self.log = logging.getLogger("fedd.experiment_control")
123       
124        def run(self):
125            """
126            Emulate Thread.run, except add pool data manipulation and error
127            logging.
128            """
129            if self.pdata:
130                self.pdata.start()
131
132            if self.target:
133                try:
134                    self.rv = self.target(*self.args, **self.kwargs)
135                except service_error, s:
136                    self.exception = s
137                    self.log.error("Thread exception: %s %s" % \
138                            (s.code_string(), s.desc))
139                except:
140                    self.exception = sys.exc_info()[1]
141                    self.log.error(("Unexpected thread exception: %s" +\
142                            "Trace %s") % (self.exception,\
143                                traceback.format_exc()))
144            if self.pdata:
145                self.pdata.terminate()
146
147    call_RequestAccess = service_caller('RequestAccess',
148            'getfeddPortType', feddServiceLocator, 
149            RequestAccessRequestMessage, 'RequestAccessRequestBody')
150
151    call_ReleaseAccess = service_caller('ReleaseAccess',
152            'getfeddPortType', feddServiceLocator, 
153            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
154
155    call_Ns2Split = service_caller('Ns2Split',
156            'getfeddInternalPortType', feddInternalServiceLocator, 
157            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
158
159    def __init__(self, config=None):
160        """
161        Intialize the various attributes, most from the config object
162        """
163        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
164        self.thread_pool = fedd_experiment_control_local.thread_pool
165
166        self.cert_file = None
167        self.cert_pwd = None
168        self.trusted_certs = None
169
170        # Walk through the various relevant certificat specifying config
171        # attributes until the local certificate attributes can be resolved.
172        # The walk is from most specific to most general specification.
173        for s in ("experiment_control", "globals"):
174            if config.has_section(s): 
175                if config.has_option(s, "cert_file"):
176                    if not self.cert_file:
177                        self.cert_file = config.get(s, "cert_file")
178                        self.cert_pwd = config.get(s, "cert_pwd")
179
180                if config.has_option(s, "trusted_certs"):
181                    if not self.trusted_certs:
182                        self.trusted_certs = config.get(s, "trusted_certs")
183
184        self.exp_stem = "fed-stem"
185        self.log = logging.getLogger("fedd.experiment_control")
186        self.muxmax = 2
187        self.nthreads = 2
188        self.randomize_experiments = False
189
190        self.scp_exec = "/usr/bin/scp"
191        self.splitter = None
192        self.ssh_exec="/usr/bin/ssh"
193        self.ssh_keygen = "/usr/bin/ssh-keygen"
194        self.ssh_identity_file = None
195
196        if config.has_section("experiment_control"):
197            self.debug = config.get("experiment_control", "create_debug")
198            self.state_filename = config.get("experiment_control", 
199                    "experiment_state_file")
200            self.splitter_url = config.get("experiment_control", "splitter_url")
201            self.fedkit = config.get("experiment_control", "fedkit")
202        else:
203            self.debug = False
204            self.state_filename = None
205            self.splitter_url = None
206            self.fedkit = None
207
208        # XXX
209        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
210        self.ssh_type = "rsa"
211        self.state = { }
212        self.state_lock = Lock()
213        self.tclsh = "/usr/local/bin/otclsh"
214        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
215        self.tbmap = { 
216                'deter':'https://users.isi.deterlab.net:23235',
217                'emulab':'https://users.isi.deterlab.net:23236',
218                'ucb':'https://users.isi.deterlab.net:23237',
219                }
220        self.trace_file = sys.stderr
221
222        self.def_expstart = \
223                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
224                "/tmp/federate";
225        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
226                "FEDDIR/hosts";
227        self.def_gwstart = \
228                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
229                "/tmp/bridge.log";
230        self.def_mgwstart = \
231                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
232                "/tmp/bridge.log";
233        self.def_gwimage = "FBSD61-TUNNEL2";
234        self.def_gwtype = "pc";
235
236
237        if self.ssh_pubkey_file:
238            try:
239                f = open(self.ssh_pubkey_file, 'r')
240                self.ssh_pubkey = f.read()
241                f.close()
242            except IOError:
243                raise service_error(service_error.internal,
244                        "Cannot read sshpubkey")
245
246        set_log_level(config, "experiment_control", self.log)
247
248        # Grab saved state.  OK to do this w/o locking because it's read only
249        # and only one thread should be in existence that can see self.state at
250        # this point.
251        if self.state_filename:
252            self.read_state()
253
254        # Dispatch tables
255        self.soap_services = {\
256                'Create': soap_handler(\
257                        CreateRequestMessage.typecode,
258                        getattr(self, "create_experiment"), 
259                        CreateResponseMessage,
260                        "CreateResponseBody"),
261                'Vtopo': soap_handler(\
262                        VtopoRequestMessage.typecode,
263                        getattr(self, "get_vtopo"),
264                        VtopoResponseMessage,
265                        "VtopoResponseBody"),
266                'Vis': soap_handler(\
267                        VisRequestMessage.typecode,
268                        getattr(self, "get_vis"),
269                        VisResponseMessage,
270                        "VisResponseBody"),
271                'Info': soap_handler(\
272                        InfoRequestMessage.typecode,
273                        getattr(self, "get_info"),
274                        InfoResponseMessage,
275                        "InfoResponseBody"),
276                'Terminate': soap_handler(\
277                        TerminateRequestMessage.typecode,
278                        getattr(self, "terminate_experiment"),
279                        TerminateResponseMessage,
280                        "TerminateResponseBody"),
281        }
282
283        self.xmlrpc_services = {\
284                'Create': xmlrpc_handler(\
285                        getattr(self, "create_experiment"), 
286                        "CreateResponseBody"),
287                'Vtopo': xmlrpc_handler(\
288                        getattr(self, "get_vtopo"),
289                        "VtopoResponseBody"),
290                'Vis': xmlrpc_handler(\
291                        getattr(self, "get_vis"),
292                        "VisResponseBody"),
293                'Info': xmlrpc_handler(\
294                        getattr(self, "get_info"),
295                        "InfoResponseBody"),
296                'Terminate': xmlrpc_handler(\
297                        getattr(self, "terminate_experiment"),
298                        "TerminateResponseBody"),
299        }
300
301    def copy_file(self, src, dest, size=1024):
302        """
303        Exceedingly simple file copy.
304        """
305        s = open(src,'r')
306        d = open(dest, 'w')
307
308        buf = "x"
309        while buf != "":
310            buf = s.read(size)
311            d.write(buf)
312        s.close()
313        d.close()
314
315    # Call while holding self.state_lock
316    def write_state(self):
317        """
318        Write a new copy of experiment state after copying the existing state
319        to a backup.
320
321        State format is a simple pickling of the state dictionary.
322        """
323        if os.access(self.state_filename, os.W_OK):
324            self.copy_file(self.state_filename, \
325                    "%s.bak" % self.state_filename)
326        try:
327            f = open(self.state_filename, 'w')
328            pickle.dump(self.state, f)
329        except IOError, e:
330            self.log.error("Can't write file %s: %s" % \
331                    (self.state_filename, e))
332        except pickle.PicklingError, e:
333            self.log.error("Pickling problem: %s" % e)
334        except TypeError, e:
335            self.log.error("Pickling problem (TypeError): %s" % e)
336
337    # Call while holding self.state_lock
338    def read_state(self):
339        """
340        Read a new copy of experiment state.  Old state is overwritten.
341
342        State format is a simple pickling of the state dictionary.
343        """
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except IOError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355
356    def scp_file(self, file, user, host, dest=""):
357        """
358        scp a file to the remote host.  If debug is set the action is only
359        logged.
360        """
361
362        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
363        rv = 0
364
365        try:
366            dnull = open("/dev/null", "r")
367        except IOError:
368            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
369            dnull = Null
370
371        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
372        if not self.debug:
373            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
374            else: rv = call(scp_cmd)
375
376        return rv == 0
377
378    def ssh_cmd(self, user, host, cmd, wname=None):
379        """
380        Run a remote command on host as user.  If debug is set, the action is
381        only logged.
382        """
383        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
384
385        try:
386            dnull = open("/dev/null", "r")
387        except IOError:
388            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
389            dnull = Null
390
391        self.log.debug("[ssh_cmd]: %s" % sh_str)
392        if not self.debug:
393            if dnull:
394                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
395            else:
396                sub = Popen(sh_str, shell=True)
397            return sub.wait() == 0
398        else:
399            return True
400
401    def ship_configs(self, host, user, src_dir, dest_dir):
402        """
403        Copy federant-specific configuration files to the federant.
404        """
405        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
406            return False
407        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
408            return False
409
410        for f in os.listdir(src_dir):
411            if os.path.isdir(f):
412                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
413                        "%s/%s" % (dest_dir, f)):
414                    return False
415            else:
416                if not self.scp_file("%s/%s" % (src_dir, f), 
417                        user, host, dest_dir):
418                    return False
419        return True
420
421    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
422        """
423        Start a sub-experiment on a federant.
424
425        Get the current state, modify or create as appropriate, ship data and
426        configs and start the experiment.  There are small ordering differences
427        based on the initial state of the sub-experiment.
428        """
429        # ops node in the federant
430        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
431        user = tbparams[tb]['user']     # federant user
432        pid = tbparams[tb]['project']   # federant project
433        # XXX
434        base_confs = ( "hosts",)
435        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
436        # command to test experiment state
437        expinfo_exec = "/usr/testbed/bin/expinfo" 
438        # Configuration directories on the remote machine
439        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
440        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
441        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
442        # Regular expressions to parse the expinfo response
443        state_re = re.compile("State:\s+(\w+)")
444        no_exp_re = re.compile("^No\s+such\s+experiment")
445        state = None    # Experiment state parsed from expinfo
446        # The expinfo ssh command
447        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
448
449        # Get status
450        self.log.debug("[start_segment]: %s"% " ".join(cmd))
451        dev_null = None
452        try:
453            dev_null = open("/dev/null", "a")
454        except IOError, e:
455            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
456
457        if self.debug:
458            state = 'swapped'
459            rv = 0
460        else:
461            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
462            for line in status.stdout:
463                m = state_re.match(line)
464                if m: state = m.group(1)
465                else:
466                    m = no_exp_re.match(line)
467                    if m: state = "none"
468            rv = status.wait()
469
470        # If the experiment is not present the subcommand returns a non-zero
471        # return value.  If we successfully parsed a "none" outcome, ignore the
472        # return code.
473        if rv != 0 and state != "none":
474            raise service_error(service_error.internal,
475                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
476
477        self.log.debug("[start_segment]: %s: %s" % (tb, state))
478        self.log.info("[start_segment]:transferring experiment to %s" % tb)
479
480        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
481            return False
482        # Clear the federation files
483        if not self.ssh_cmd(user, host, 
484                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
485            return False
486        if not self.ssh_cmd(user, host, 
487                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
488            return False
489        # Clear and create the tarfiles and rpm directories
490        for d in (tarfiles_dir, rpms_dir):
491            if not self.ssh_cmd(user, host, 
492                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
493                return False
494            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
495                    "create tarfiles"):
496                return False
497       
498        if state == 'active':
499            # Remote experiment is active.  Modify it.
500            for f in base_confs:
501                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
502                        "%s/%s" % (proj_dir, f)):
503                    return False
504            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
505                    proj_dir):
506                return False
507            if os.path.isdir("%s/tarfiles" % tmpdir):
508                if not self.ship_configs(host, user,
509                        "%s/tarfiles" % tmpdir, tarfiles_dir):
510                    return False
511            if os.path.isdir("%s/rpms" % tmpdir):
512                if not self.ship_configs(host, user,
513                        "%s/rpms" % tmpdir, tarfiles_dir):
514                    return False
515            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
516            if not self.ssh_cmd(user, host,
517                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
518                            (pid, eid, tclfile), "modexp"):
519                return False
520            return True
521        elif state == "swapped":
522            # Remote experiment swapped out.  Modify it and swap it in.
523            for f in base_confs:
524                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
525                        "%s/%s" % (proj_dir, f)):
526                    return False
527            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
528                    proj_dir):
529                return False
530            if os.path.isdir("%s/tarfiles" % tmpdir):
531                if not self.ship_configs(host, user,
532                        "%s/tarfiles" % tmpdir, tarfiles_dir):
533                    return False
534            if os.path.isdir("%s/rpms" % tmpdir):
535                if not self.ship_configs(host, user,
536                        "%s/rpms" % tmpdir, tarfiles_dir):
537                    return False
538            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
539            if not self.ssh_cmd(user, host,
540                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
541                    "modexp"):
542                return False
543            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
544            if not self.ssh_cmd(user, host,
545                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
546                    "swapexp"):
547                return False
548            return True
549        elif state == "none":
550            # No remote experiment.  Create one.  We do this in 2 steps so we
551            # can put the configuration files and scripts into the new
552            # experiment directories.
553
554            # Tarfiles must be present for creation to work
555            if os.path.isdir("%s/tarfiles" % tmpdir):
556                if not self.ship_configs(host, user,
557                        "%s/tarfiles" % tmpdir, tarfiles_dir):
558                    return False
559            if os.path.isdir("%s/rpms" % tmpdir):
560                if not self.ship_configs(host, user,
561                        "%s/rpms" % tmpdir, tarfiles_dir):
562                    return False
563            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
564            if not self.ssh_cmd(user, host,
565                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
566                            (pid, eid, tclfile), "startexp"):
567                return False
568            # After startexp the per-experiment directories exist
569            for f in base_confs:
570                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
571                        "%s/%s" % (proj_dir, f)):
572                    return False
573            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
574                    proj_dir):
575                return False
576            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
577            if not self.ssh_cmd(user, host,
578                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
579                    "swapexp"):
580                return False
581            return True
582        else:
583            self.log.debug("[start_segment]:unknown state %s" % state)
584            return False
585
586    def stop_segment(self, tb, eid, tbparams):
587        """
588        Stop a sub experiment by calling swapexp on the federant
589        """
590        user = tbparams[tb]['user']
591        host = tbparams[tb]['host']
592        pid = tbparams[tb]['project']
593
594        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
595        return self.ssh_cmd(user, host,
596                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
597
598       
599    def generate_ssh_keys(self, dest, type="rsa" ):
600        """
601        Generate a set of keys for the gateways to use to talk.
602
603        Keys are of type type and are stored in the required dest file.
604        """
605        valid_types = ("rsa", "dsa")
606        t = type.lower();
607        if t not in valid_types: raise ValueError
608        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
609
610        try:
611            trace = open("/dev/null", "w")
612        except IOError:
613            raise service_error(service_error.internal,
614                    "Cannot open /dev/null??");
615
616        # May raise CalledProcessError
617        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
618        rv = call(cmd, stdout=trace, stderr=trace)
619        if rv != 0:
620            raise service_error(service_error.internal, 
621                    "Cannot generate nonce ssh keys.  %s return code %d" \
622                            % (self.ssh_keygen, rv))
623
624    def gentopo(self, str):
625        """
626        Generate the topology dtat structure from the splitter's XML
627        representation of it.
628
629        The topology XML looks like:
630            <experiment>
631                <nodes>
632                    <node><vname></vname><ips>ip1:ip2</ips></node>
633                </nodes>
634                <lans>
635                    <lan>
636                        <vname></vname><vnode></vnode><ip></ip>
637                        <bandwidth></bandwidth><member>node:port</member>
638                    </lan>
639                </lans>
640        """
641        class topo_parse:
642            """
643            Parse the topology XML and create the dats structure.
644            """
645            def __init__(self):
646                # Typing of the subelements for data conversion
647                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
648                self.int_subelements = ( 'bandwidth',)
649                self.float_subelements = ( 'delay',)
650                # The final data structure
651                self.nodes = [ ]
652                self.lans =  [ ]
653                self.topo = { \
654                        'node': self.nodes,\
655                        'lan' : self.lans,\
656                    }
657                self.element = { }  # Current element being created
658                self.chars = ""     # Last text seen
659
660            def end_element(self, name):
661                # After each sub element the contents is added to the current
662                # element or to the appropriate list.
663                if name == 'node':
664                    self.nodes.append(self.element)
665                    self.element = { }
666                elif name == 'lan':
667                    self.lans.append(self.element)
668                    self.element = { }
669                elif name in self.str_subelements:
670                    self.element[name] = self.chars
671                    self.chars = ""
672                elif name in self.int_subelements:
673                    self.element[name] = int(self.chars)
674                    self.chars = ""
675                elif name in self.float_subelements:
676                    self.element[name] = float(self.chars)
677                    self.chars = ""
678
679            def found_chars(self, data):
680                self.chars += data.rstrip()
681
682
683        tp = topo_parse();
684        parser = xml.parsers.expat.ParserCreate()
685        parser.EndElementHandler = tp.end_element
686        parser.CharacterDataHandler = tp.found_chars
687
688        parser.Parse(str)
689
690        return tp.topo
691       
692
693    def genviz(self, topo):
694        """
695        Generate the visualization the virtual topology
696        """
697
698        neato = "/usr/local/bin/neato"
699        # These are used to parse neato output and to create the visualization
700        # file.
701        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
702        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
703                "%s</type></node>"
704
705        try:
706            # Node names
707            nodes = [ n['vname'] for n in topo['node'] ]
708            topo_lans = topo['lan']
709        except KeyError:
710            raise service_error(service_error.internal, "Bad topology")
711
712        lans = { }
713        links = { }
714
715        # Walk through the virtual topology, organizing the connections into
716        # 2-node connections (links) and more-than-2-node connections (lans).
717        # When a lan is created, it's added to the list of nodes (there's a
718        # node in the visualization for the lan).
719        for l in topo_lans:
720            if links.has_key(l['vname']):
721                if len(links[l['vname']]) < 2:
722                    links[l['vname']].append(l['vnode'])
723                else:
724                    nodes.append(l['vname'])
725                    lans[l['vname']] = links[l['vname']]
726                    del links[l['vname']]
727                    lans[l['vname']].append(l['vnode'])
728            elif lans.has_key(l['vname']):
729                lans[l['vname']].append(l['vnode'])
730            else:
731                links[l['vname']] = [ l['vnode'] ]
732
733
734        # Open up a temporary file for dot to turn into a visualization
735        try:
736            df, dotname = tempfile.mkstemp()
737            dotfile = os.fdopen(df, 'w')
738        except IOError:
739            raise service_error(service_error.internal,
740                    "Failed to open file in genviz")
741
742        # Generate a dot/neato input file from the links, nodes and lans
743        try:
744            print >>dotfile, "graph G {"
745            for n in nodes:
746                print >>dotfile, '\t"%s"' % n
747            for l in links.keys():
748                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
749            for l in lans.keys():
750                for n in lans[l]:
751                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
752            print >>dotfile, "}"
753            dotfile.close()
754        except TypeError:
755            raise service_error(service_error.internal,
756                    "Single endpoint link in vtopo")
757        except IOError:
758            raise service_error(service_error.internal, "Cannot write dot file")
759
760        # Use dot to create a visualization
761        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
762                '-Gpack=true', dotname], stdout=PIPE)
763
764        # Translate dot to vis format
765        vis_nodes = [ ]
766        vis = { 'node': vis_nodes }
767        for line in dot.stdout:
768            m = vis_re.match(line)
769            if m:
770                vn = m.group(1)
771                vis_node = {'name': vn, \
772                        'x': float(m.group(2)),\
773                        'y' : float(m.group(3)),\
774                    }
775                if vn in links.keys() or vn in lans.keys():
776                    vis_node['type'] = 'lan'
777                else:
778                    vis_node['type'] = 'node'
779                vis_nodes.append(vis_node)
780        rv = dot.wait()
781
782        os.remove(dotname)
783        if rv == 0 : return vis
784        else: return None
785
786    def get_access(self, tb, nodes, user, tbparam):
787        """
788        Get access to testbed through fedd and set the parameters for that tb
789        """
790
791        translate_attr = {
792            'slavenodestartcmd': 'expstart',
793            'slaveconnectorstartcmd': 'gwstart',
794            'masternodestartcmd': 'mexpstart',
795            'masterconnectorstartcmd': 'mgwstart',
796            'connectorimage': 'gwimage',
797            'connectortype': 'gwtype',
798            'tunnelcfg': 'tun',
799            'smbshare': 'smbshare',
800        }
801
802        uri = self.tbmap.get(tb, None)
803        if not uri:
804            raise service_error(serice_error.server_config, 
805                    "Unknown testbed: %s" % tb)
806
807        # The basic request
808        req = {\
809                'destinationTestbed' : { 'uri' : uri },
810                'user':  user,
811                'allocID' : { 'localname': 'test' },
812                # XXX: need to get service access stright
813                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
814                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
815            }
816       
817        # node resources if any
818        if nodes != None and len(nodes) > 0:
819            rnodes = [ ]
820            for n in nodes:
821                rn = { }
822                image, hw, count = n.split(":")
823                if image: rn['image'] = [ image ]
824                if hw: rn['hardware'] = [ hw ]
825                if count: rn['count'] = int(count)
826                rnodes.append(rn)
827            req['resources']= { }
828            req['resources']['node'] = rnodes
829
830        r = self.call_RequestAccess(uri, req, 
831                self.cert_file, self.cert_pwd, self.trusted_certs)
832
833        if r.has_key('RequestAccessResponseBody'):
834            r = r['RequestAccessResponseBody']
835        else:
836            raise service_error(service_error.protocol,
837                    "Bad proxy response")
838
839        e = r['emulab']
840        p = e['project']
841        tbparam[tb] = { 
842                "boss": e['boss'],
843                "host": e['ops'],
844                "domain": e['domain'],
845                "fs": e['fileServer'],
846                "eventserver": e['eventServer'],
847                "project": unpack_id(p['name']),
848                "emulab" : e,
849                "allocID" : r['allocID'],
850                }
851        # Make the testbed name be the label the user applied
852        p['testbed'] = {'localname': tb }
853
854        for u in p['user']:
855            tbparam[tb]['user'] = unpack_id(u['userID'])
856
857        for a in e['fedAttr']:
858            if a['attribute']:
859                key = translate_attr.get(a['attribute'].lower(), None)
860                if key:
861                    tbparam[tb][key]= a['value']
862       
863    def release_access(self, tb, aid):
864        """
865        Release access to testbed through fedd
866        """
867
868        uri = self.tbmap.get(tb, None)
869        if not uri:
870            raise service_error(serice_error.server_config, 
871                    "Unknown testbed: %s" % tb)
872
873        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
874                self.cert_file, self.cert_pwd, self.trusted_certs)
875
876        # better error coding
877
878    def remote_splitter(self, uri, desc, master):
879
880        req = {
881                'description' : { 'ns2description': desc },
882                'master': master,
883                'include_fedkit': bool(self.fedkit)
884            }
885
886        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
887                self.trusted_certs)
888
889        if r.has_key('Ns2SplitResponseBody'):
890            r = r['Ns2SplitResponseBody']
891            if r.has_key('output'):
892                return r['output'].splitlines()
893            else:
894                raise service_error(service_error.protocol, 
895                        "Bad splitter response (no output)")
896        else:
897            raise service_error(service_error.protocol, "Bad splitter response")
898       
899    class current_testbed:
900        """
901        Object for collecting the current testbed description.  The testbed
902        description is saved to a file with the local testbed variables
903        subsittuted line by line.
904        """
905        def __init__(self, eid, tmpdir, fedkit):
906            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
907            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
908            self.current_testbed = None
909            self.testbed_file = None
910
911            self.def_expstart = \
912                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
913            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
914            self.def_gwstart = \
915                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
916            self.def_mgwstart = \
917                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
918            self.def_gwimage = "FBSD61-TUNNEL2";
919            self.def_gwtype = "pc";
920
921            self.eid = eid
922            self.tmpdir = tmpdir
923            self.fedkit = fedkit
924
925        def __call__(self, line, master, allocated, tbparams):
926            # Capture testbed topology descriptions
927            if self.current_testbed == None:
928                m = self.begin_testbed.match(line)
929                if m != None:
930                    self.current_testbed = m.group(1)
931                    if self.current_testbed == None:
932                        raise service_error(service_error.req,
933                                "Bad request format (unnamed testbed)")
934                    allocated[self.current_testbed] = \
935                            allocated.get(self.current_testbed,0) + 1
936                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
937                    if not os.path.exists(tb_dir):
938                        try:
939                            os.mkdir(tb_dir)
940                        except IOError:
941                            raise service_error(service_error.internal,
942                                    "Cannot create %s" % tb_dir)
943                    try:
944                        self.testbed_file = open("%s/%s.%s.tcl" %
945                                (tb_dir, self.eid, self.current_testbed), 'w')
946                    except IOError:
947                        self.testbed_file = None
948                    return True
949                else: return False
950            else:
951                m = self.end_testbed.match(line)
952                if m != None:
953                    if m.group(1) != self.current_testbed:
954                        raise service_error(service_error.internal, 
955                                "Mismatched testbed markers!?")
956                    if self.testbed_file != None: 
957                        self.testbed_file.close()
958                        self.testbed_file = None
959                    self.current_testbed = None
960                elif self.testbed_file:
961                    # Substitute variables and put the line into the local
962                    # testbed file.
963                    gwtype = tbparams[self.current_testbed].get('gwtype', 
964                            self.def_gwtype)
965                    gwimage = tbparams[self.current_testbed].get('gwimage', 
966                            self.def_gwimage)
967                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
968                            self.def_mgwstart)
969                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
970                            self.def_mexpstart)
971                    gwstart = tbparams[self.current_testbed].get('gwstart', 
972                            self.def_gwstart)
973                    expstart = tbparams[self.current_testbed].get('expstart', 
974                            self.def_expstart)
975                    project = tbparams[self.current_testbed].get('project')
976                    line = re.sub("GWTYPE", gwtype, line)
977                    line = re.sub("GWIMAGE", gwimage, line)
978                    if self.current_testbed == master:
979                        line = re.sub("GWSTART", mgwstart, line)
980                        line = re.sub("EXPSTART", mexpstart, line)
981                    else:
982                        line = re.sub("GWSTART", gwstart, line)
983                        line = re.sub("EXPSTART", expstart, line)
984                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
985                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
986                    line = re.sub("EID", self.eid, line)
987                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
988                            (project, self.eid), line)
989                    if self.fedkit:
990                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
991                                line)
992                    print >>self.testbed_file, line
993                return True
994
995    class allbeds:
996        """
997        Process the Allbeds section.  Get access to each federant and save the
998        parameters in tbparams
999        """
1000        def __init__(self, get_access):
1001            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1002            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1003            self.in_allbeds = False
1004            self.get_access = get_access
1005
1006        def __call__(self, line, user, tbparams):
1007            # Testbed access parameters
1008            if not self.in_allbeds:
1009                if self.begin_allbeds.match(line):
1010                    self.in_allbeds = True
1011                    return True
1012                else:
1013                    return False
1014            else:
1015                if self.end_allbeds.match(line):
1016                    self.in_allbeds = False
1017                else:
1018                    nodes = line.split('|')
1019                    tb = nodes.pop(0)
1020                    self.get_access(tb, nodes, user, tbparams)
1021                return True
1022
1023    class gateways:
1024        def __init__(self, eid, master, tmpdir, gw_pubkey,
1025                gw_secretkey, copy_file, fedkit):
1026            self.begin_gateways = \
1027                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1028            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1029            self.current_gateways = None
1030            self.control_gateway = None
1031            self.active_end = { }
1032
1033            self.eid = eid
1034            self.master = master
1035            self.tmpdir = tmpdir
1036            self.gw_pubkey_base = gw_pubkey
1037            self.gw_secretkey_base = gw_secretkey
1038
1039            self.copy_file = copy_file
1040            self.fedkit = fedkit
1041
1042
1043        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1044                active_end, tbparams, dtb, myname, desthost, type):
1045            """
1046            Produce a gateway configuration file from a gateways line.
1047            """
1048
1049            sproject = tbparams[gw].get('project', 'project')
1050            dproject = tbparams[dtb].get('project', 'project')
1051            sdomain = ".%s.%s%s" % (eid, sproject,
1052                    tbparams[gw].get('domain', ".example.com"))
1053            ddomain = ".%s.%s%s" % (eid, dproject,
1054                    tbparams[dtb].get('domain', ".example.com"))
1055            boss = tbparams[master].get('boss', "boss")
1056            fs = tbparams[master].get('fs', "fs")
1057            event_server = "%s%s" % \
1058                    (tbparams[gw].get('eventserver', "event_server"),
1059                            tbparams[gw].get('domain', "example.com"))
1060            remote_event_server = "%s%s" % \
1061                    (tbparams[dtb].get('eventserver', "event_server"),
1062                            tbparams[dtb].get('domain', "example.com"))
1063            seer_control = "%s%s" % \
1064                    (tbparams[gw].get('control', "control"), sdomain)
1065
1066            if self.fedkit:
1067                remote_script_dir = "/usr/local/federation/bin"
1068                local_script_dir = "/usr/local/federation/bin"
1069            else:
1070                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1071                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1072
1073            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1074            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1075            tunnel_cfg = tbparams[gw].get("tun", "false")
1076
1077            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1078            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1079
1080            # translate to lower case so the `hostname` hack for specifying
1081            # configuration files works.
1082            conf_file = conf_file.lower();
1083            remote_conf_file = remote_conf_file.lower();
1084
1085            if dtb == master:
1086                active = "false"
1087            elif gw == master:
1088                active = "true"
1089            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1090                active = "false"
1091            else:
1092                active_end['%s-%s' % (gw, dtb)] = 1
1093                active = "true"
1094
1095            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1096            print >>gwconfig, "Active: %s" % active
1097            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1098            print >>gwconfig, "BossName: %s" % boss
1099            print >>gwconfig, "FsName: %s" % fs
1100            print >>gwconfig, "EventServerName: %s" % event_server
1101            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1102            print >>gwconfig, "SeerControl: %s" % seer_control
1103            print >>gwconfig, "Type: %s" % type
1104            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1105            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1106                    local_script_dir
1107            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1108            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1109            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1110                    (remote_conf_dir, remote_conf_file)
1111            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1112            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1113            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1114            gwconfig.close()
1115
1116            return active == "true"
1117
1118        def __call__(self, line, allocated, tbparams):
1119            # Process gateways
1120            if not self.current_gateways:
1121                m = self.begin_gateways.match(line)
1122                if m:
1123                    self.current_gateways = m.group(1)
1124                    if allocated.has_key(self.current_gateways):
1125                        # This test should always succeed
1126                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1127                        if not os.path.exists(tb_dir):
1128                            try:
1129                                os.mkdir(tb_dir)
1130                            except IOError:
1131                                raise service_error(service_error.internal,
1132                                        "Cannot create %s" % tb_dir)
1133                    else:
1134                        # XXX
1135                        self.log.error("[gateways]: Ignoring gateways for " + \
1136                                "unknown testbed %s" % self.current_gateways)
1137                        self.current_gateways = None
1138                    return True
1139                else:
1140                    return False
1141            else:
1142                m = self.end_gateways.match(line)
1143                if m :
1144                    if m.group(1) != self.current_gateways:
1145                        raise service_error(service_error.internal,
1146                                "Mismatched gateway markers!?")
1147                    if self.control_gateway:
1148                        try:
1149                            cc = open("%s/%s/client.conf" %
1150                                    (self.tmpdir, self.current_gateways), 'w')
1151                            print >>cc, "ControlGateway: %s" % \
1152                                    self.control_gateway
1153                            if tbparams[self.master].has_key('smbshare'):
1154                                print >>cc, "SMBSHare: %s" % \
1155                                        tbparams[self.master]['smbshare']
1156                            print >>cc, "ProjectUser: %s" % \
1157                                    tbparams[self.master]['user']
1158                            print >>cc, "ProjectName: %s" % \
1159                                    tbparams[self.master]['project']
1160                            cc.close()
1161                        except IOError:
1162                            raise service_error(service_error.internal,
1163                                    "Error creating client config")
1164                        try:
1165                            cc = open("%s/%s/seer.conf" %
1166                                    (self.tmpdir, self.current_gateways),
1167                                    'w')
1168                            if self.current_gateways != self.master:
1169                                print >>cc, "ControlNode: %s" % \
1170                                        self.control_gateway
1171                            print >>cc, "ExperimentID: %s/%s" % \
1172                                    ( tbparams[self.master]['project'], \
1173                                    self.eid )
1174                            cc.close()
1175                        except IOError:
1176                            raise service_error(service_error.internal,
1177                                    "Error creating seer config")
1178                    else:
1179                        debug.error("[gateways]: No control gateway for %s" %\
1180                                    self.current_gateways)
1181                    self.current_gateways = None
1182                else:
1183                    dtb, myname, desthost, type = line.split(" ")
1184
1185                    if type == "control" or type == "both":
1186                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1187                                self.eid, 
1188                                tbparams[self.current_gateways]['project'],
1189                                tbparams[self.current_gateways]['domain'])
1190                    try:
1191                        active = self.gateway_conf_file(self.current_gateways,
1192                                self.master, self.eid, self.gw_pubkey_base,
1193                                self.gw_secretkey_base,
1194                                self.active_end, tbparams, dtb, myname,
1195                                desthost, type)
1196                    except IOError, e:
1197                        raise service_error(service_error.internal,
1198                                "Failed to write config file for %s" % \
1199                                        self.current_gateway)
1200           
1201                    gw_pubkey = "%s/keys/%s" % \
1202                            (self.tmpdir, self.gw_pubkey_base)
1203                    gw_secretkey = "%s/keys/%s" % \
1204                            (self.tmpdir, self.gw_secretkey_base)
1205
1206                    pkfile = "%s/%s/%s" % \
1207                            ( self.tmpdir, self.current_gateways, 
1208                                    self.gw_pubkey_base)
1209                    skfile = "%s/%s/%s" % \
1210                            ( self.tmpdir, self.current_gateways, 
1211                                    self.gw_secretkey_base)
1212
1213                    if not os.path.exists(pkfile):
1214                        try:
1215                            self.copy_file(gw_pubkey, pkfile)
1216                        except IOError:
1217                            service_error(service_error.internal,
1218                                    "Failed to copy pubkey file")
1219
1220                    if active and not os.path.exists(skfile):
1221                        try:
1222                            self.copy_file(gw_secretkey, skfile)
1223                        except IOError:
1224                            service_error(service_error.internal,
1225                                    "Failed to copy secretkey file")
1226                return True
1227
1228    class shunt_to_file:
1229        """
1230        Simple class to write data between two regexps to a file.
1231        """
1232        def __init__(self, begin, end, filename):
1233            """
1234            Begin shunting on a match of begin, stop on end, send data to
1235            filename.
1236            """
1237            self.begin = re.compile(begin)
1238            self.end = re.compile(end)
1239            self.in_shunt = False
1240            self.file = None
1241            self.filename = filename
1242
1243        def __call__(self, line):
1244            """
1245            Call this on each line in the input that may be shunted.
1246            """
1247            if not self.in_shunt:
1248                if self.begin.match(line):
1249                    self.in_shunt = True
1250                    try:
1251                        self.file = open(self.filename, "w")
1252                    except:
1253                        self.file = None
1254                        raise
1255                    return True
1256                else:
1257                    return False
1258            else:
1259                if self.end.match(line):
1260                    if self.file: 
1261                        self.file.close()
1262                        self.file = None
1263                    self.in_shunt = False
1264                else:
1265                    if self.file:
1266                        print >>self.file, line
1267                return True
1268
1269    class shunt_to_list:
1270        """
1271        Same interface as shunt_to_file.  Data collected in self.list, one list
1272        element per line.
1273        """
1274        def __init__(self, begin, end):
1275            self.begin = re.compile(begin)
1276            self.end = re.compile(end)
1277            self.in_shunt = False
1278            self.list = [ ]
1279       
1280        def __call__(self, line):
1281            if not self.in_shunt:
1282                if self.begin.match(line):
1283                    self.in_shunt = True
1284                    return True
1285                else:
1286                    return False
1287            else:
1288                if self.end.match(line):
1289                    self.in_shunt = False
1290                else:
1291                    self.list.append(line)
1292                return True
1293
1294    class shunt_to_string:
1295        """
1296        Same interface as shunt_to_file.  Data collected in self.str, all in
1297        one string.
1298        """
1299        def __init__(self, begin, end):
1300            self.begin = re.compile(begin)
1301            self.end = re.compile(end)
1302            self.in_shunt = False
1303            self.str = ""
1304       
1305        def __call__(self, line):
1306            if not self.in_shunt:
1307                if self.begin.match(line):
1308                    self.in_shunt = True
1309                    return True
1310                else:
1311                    return False
1312            else:
1313                if self.end.match(line):
1314                    self.in_shunt = False
1315                else:
1316                    self.str += line
1317                return True
1318
1319    def create_experiment(self, req, fid):
1320        """
1321        The external interface to experiment creation called from the
1322        dispatcher.
1323
1324        Creates a working directory, splits the incoming description using the
1325        splitter script and parses out the avrious subsections using the
1326        lcasses above.  Once each sub-experiment is created, use pooled threads
1327        to instantiate them and start it all up.
1328        """
1329        try:
1330            tmpdir = tempfile.mkdtemp(prefix="split-")
1331        except IOError:
1332            raise service_error(service_error.internal, "Cannot create tmp dir")
1333
1334        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1335        gw_secretkey_base = "fed.%s" % self.ssh_type
1336        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1337        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1338        tclfile = tmpdir + "/experiment.tcl"
1339        tbparams = { }
1340
1341        pid = "dummy"
1342        gid = "dummy"
1343        # XXX
1344        fail_soft = False
1345
1346        try:
1347            os.mkdir(tmpdir+"/keys")
1348        except OSError:
1349            raise service_error(service_error.internal,
1350                    "Can't make temporary dir")
1351
1352        req = req.get('CreateRequestBody', None)
1353        if not req:
1354            raise service_error(service_error.req,
1355                    "Bad request format (no CreateRequestBody)")
1356        # The tcl parser needs to read a file so put the content into that file
1357        descr=req.get('experimentdescription', None)
1358        if descr:
1359            file_content=descr.get('ns2description', None)
1360            if file_content:
1361                try:
1362                    f = open(tclfile, 'w')
1363                    f.write(file_content)
1364                    f.close()
1365                except IOError:
1366                    raise service_error(service_error.internal,
1367                            "Cannot write temp experiment description")
1368            else:
1369                raise service_error(service_error.req, 
1370                        "Only ns2descriptions supported")
1371        else:
1372            raise service_error(service_error.req, "No experiment description")
1373
1374        if req.has_key('experimentID') and \
1375                req['experimentID'].has_key('localname'):
1376            eid = req['experimentID']['localname']
1377            self.state_lock.acquire()
1378            while (self.state.has_key(eid)):
1379                eid += random.choice(string.ascii_letters)
1380            # To avoid another thread picking this localname
1381            self.state[eid] = "placeholder"
1382            self.state_lock.release()
1383        else:
1384            eid = self.exp_stem
1385            for i in range(0,5):
1386                eid += random.choice(string.ascii_letters)
1387            self.state_lock.acquire()
1388            while (self.state.has_key(eid)):
1389                eid = self.exp_stem
1390                for i in range(0,5):
1391                    eid += random.choice(string.ascii_letters)
1392            # To avoid another thread picking this localname
1393            self.state[eid] = "placeholder"
1394            self.state_lock.release()
1395
1396        try:
1397            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1398        except ValueError:
1399            raise service_error(service_error.server_config, 
1400                    "Bad key type (%s)" % self.ssh_type)
1401
1402        user = req.get('user', None)
1403        if user == None:
1404            raise service_error(service_error.req, "No user")
1405
1406        master = req.get('master', None)
1407        if master == None:
1408            raise service_error(service_error.req, "No master testbed label")
1409       
1410        if self.splitter_url:
1411            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1412            split_data = self.remote_splitter(self.splitter_url, file_content,
1413                    master)
1414        else:
1415            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1416                str(self.muxmax), '-m', master]
1417
1418            if self.fedkit:
1419                tclcmd.append('-k')
1420
1421            tclcmd.extend([pid, gid, eid, tclfile])
1422
1423            self.log.debug("running local splitter %s", " ".join(tclcmd))
1424            tclparser = Popen(tclcmd, stdout=PIPE)
1425            split_data = tclparser.stdout
1426
1427        allocated = { }     # Testbeds we can access
1428        started = { }       # Testbeds where a sub-experiment started
1429                            # successfully
1430
1431        # Objects to parse the splitter output (defined above)
1432        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1433        parse_allbeds = self.allbeds(self.get_access)
1434        parse_gateways = self.gateways(eid, master, tmpdir,
1435                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1436        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1437                    "^#\s+End\s+Vtopo")
1438        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1439                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1440        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1441                "^#\s+End\s+tarfiles")
1442        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1443                "^#\s+End\s+rpms")
1444
1445        # Worling on the split data
1446        for line in split_data:
1447            line = line.rstrip()
1448            if parse_current_testbed(line, master, allocated, tbparams):
1449                continue
1450            elif parse_allbeds(line, user, tbparams):
1451                continue
1452            elif parse_gateways(line, allocated, tbparams):
1453                continue
1454            elif parse_vtopo(line):
1455                continue
1456            elif parse_hostnames(line):
1457                continue
1458            elif parse_tarfiles(line):
1459                continue
1460            elif parse_rpms(line):
1461                continue
1462            else:
1463                raise service_error(service_error.internal, 
1464                        "Bad tcl parse? %s" % line)
1465
1466        # Virtual topology and visualization
1467        vtopo = self.gentopo(parse_vtopo.str)
1468        if not vtopo:
1469            raise service_error(service_error.internal, 
1470                    "Failed to generate virtual topology")
1471
1472        vis = self.genviz(vtopo)
1473        if not vis:
1474            raise service_error(service_error.internal, 
1475                    "Failed to generate visualization")
1476
1477        # save federant information
1478        for k in allocated.keys():
1479            tbparams[k]['federant'] = {\
1480                    'name': [ { 'localname' : eid} ],\
1481                    'emulab': tbparams[k]['emulab'],\
1482                    'allocID' : tbparams[k]['allocID'],\
1483                    'master' : k == master,\
1484                }
1485
1486
1487        # Copy tarfiles and rpms needed at remote sites into a staging area
1488        try:
1489            if self.fedkit:
1490                parse_tarfiles.list.append(self.fedkit)
1491            for t in parse_tarfiles.list:
1492                if not os.path.exists("%s/tarfiles" % tmpdir):
1493                    os.mkdir("%s/tarfiles" % tmpdir)
1494                self.copy_file(t, "%s/tarfiles/%s" % \
1495                        (tmpdir, os.path.basename(t)))
1496            for r in parse_rpms.list:
1497                if not os.path.exists("%s/rpms" % tmpdir):
1498                    os.mkdir("%s/rpms" % tmpdir)
1499                self.copy_file(r, "%s/rpms/%s" % \
1500                        (tmpdir, os.path.basename(r)))
1501        except IOError, e:
1502            raise service_error(service_error.internal, 
1503                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1504
1505        thread_pool_info = self.thread_pool()
1506        threads = [ ]
1507
1508        for tb in [ k for k in allocated.keys() if k != master]:
1509            # Wait until we have a free slot to start the next testbed load
1510            thread_pool_info.acquire()
1511            while thread_pool_info.started - \
1512                    thread_pool_info.terminated >= self.nthreads:
1513                thread_pool_info.wait()
1514            thread_pool_info.release()
1515
1516            # Create and start a thread to start the segment, and save it to
1517            # get the return value later
1518            t  = self.pooled_thread(target=self.start_segment, 
1519                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1520                    pdata=thread_pool_info, trace_file=self.trace_file)
1521            threads.append(t)
1522            t.start()
1523
1524        # Wait until all finish (the first clause of the while is to make sure
1525        # one starts)
1526        thread_pool_info.acquire()
1527        while thread_pool_info.started == 0 or \
1528                thread_pool_info.started > thread_pool_info.terminated:
1529            thread_pool_info.wait()
1530        thread_pool_info.release()
1531
1532        # If none failed, start the master
1533        failed = [ t.getName() for t in threads if not t.rv ]
1534
1535        if len(failed) == 0:
1536            if not self.start_segment(master, eid, tbparams, tmpdir):
1537                failed.append(master)
1538
1539        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1540        # If one failed clean up, unless fail_soft is set
1541        if failed:
1542            if not fail_soft:
1543                for tb in succeeded:
1544                    self.stop_segment(tb, eid, tbparams)
1545                # Remove the placeholder
1546                self.state_lock.acquire()
1547                del self.state[eid]
1548                self.state_lock.release()
1549
1550                raise service_error(service_error.federant,
1551                    "Swap in failed on %s" % ",".join(failed))
1552        else:
1553            self.log.info("[start_segment]: Experiment %s started" % eid)
1554
1555        # Generate an ID for the experiment (slice) and a certificate that the
1556        # allocator can use to prove they own it.  We'll ship it back through
1557        # the encrypted connection.
1558        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1559
1560        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1561
1562        # Walk up tmpdir, deleting as we go
1563        for path, dirs, files in os.walk(tmpdir, topdown=False):
1564            for f in files:
1565                os.remove(os.path.join(path, f))
1566            for d in dirs:
1567                os.rmdir(os.path.join(path, d))
1568        os.rmdir(tmpdir)
1569
1570        # The deepcopy prevents the allocation ID and other binaries from being
1571        # translated into other formats
1572        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1573                for tb in tbparams.keys() \
1574                    if tbparams[tb].has_key('federant') ],\
1575                    'vtopo': vtopo,\
1576                    'vis' : vis,
1577                    'experimentID' : [\
1578                            { 'fedid': copy.copy(expid) }, \
1579                            { 'localname': eid },\
1580                        ],\
1581                    'experimentAccess': { 'X509' : expcert },\
1582                }
1583
1584        # Insert the experiment into our state and update the disk copy
1585        self.state_lock.acquire()
1586        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1587                for tb in tbparams.keys() \
1588                    if tbparams[tb].has_key('federant') ],\
1589                    'vtopo': vtopo,\
1590                    'vis' : vis,
1591                    'experimentID' : [\
1592                            { 'fedid': expid }, { 'localname': eid },\
1593                        ],\
1594                }
1595        self.state[eid] = self.state[expid]
1596        if self.state_filename: self.write_state()
1597        self.state_lock.release()
1598
1599        if not failed:
1600            return resp
1601        else:
1602            raise service_error(service_error.partial, \
1603                    "Partial swap in on %s" % ",".join(succeeded))
1604
1605
1606    def get_vtopo(self, req, fid):
1607        """
1608        Return the stored virtual topology for this experiment
1609        """
1610        rv = None
1611
1612        req = req.get('VtopoRequestBody', None)
1613        if not req:
1614            raise service_error(service_error.req,
1615                    "Bad request format (no VtopoRequestBody)")
1616        exp = req.get('experiment', None)
1617        if exp:
1618            if exp.has_key('fedid'):
1619                key = exp['fedid']
1620                keytype = "fedid"
1621            elif exp.has_key('localname'):
1622                key = exp['localname']
1623                keytype = "localname"
1624            else:
1625                raise service_error(service_error.req, "Unknown lookup type")
1626        else:
1627            raise service_error(service_error.req, "No request?")
1628
1629        self.state_lock.acquire()
1630        if self.state.has_key(key):
1631            rv = { 'experiment' : {keytype: key },\
1632                    'vtopo': self.state[key]['vtopo'],\
1633                }
1634        self.state_lock.release()
1635
1636        if rv: return rv
1637        else: raise service_error(service_error.req, "No such experiment")
1638
1639    def get_vis(self, req, fid):
1640        """
1641        Return the stored visualization for this experiment
1642        """
1643        rv = None
1644
1645        req = req.get('VisRequestBody', None)
1646        if not req:
1647            raise service_error(service_error.req,
1648                    "Bad request format (no VisRequestBody)")
1649        exp = req.get('experiment', None)
1650        if exp:
1651            if exp.has_key('fedid'):
1652                key = exp['fedid']
1653                keytype = "fedid"
1654            elif exp.has_key('localname'):
1655                key = exp['localname']
1656                keytype = "localname"
1657            else:
1658                raise service_error(service_error.req, "Unknown lookup type")
1659        else:
1660            raise service_error(service_error.req, "No request?")
1661
1662        self.state_lock.acquire()
1663        if self.state.has_key(key):
1664            rv =  { 'experiment' : {keytype: key },\
1665                    'vis': self.state[key]['vis'],\
1666                    }
1667        self.state_lock.release()
1668
1669        if rv: return rv
1670        else: raise service_error(service_error.req, "No such experiment")
1671
1672    def get_info(self, req, fid):
1673        """
1674        Return all the stored info about this experiment
1675        """
1676        rv = None
1677
1678        req = req.get('InfoRequestBody', None)
1679        if not req:
1680            raise service_error(service_error.req,
1681                    "Bad request format (no VisRequestBody)")
1682        exp = req.get('experiment', None)
1683        if exp:
1684            if exp.has_key('fedid'):
1685                key = exp['fedid']
1686                keytype = "fedid"
1687            elif exp.has_key('localname'):
1688                key = exp['localname']
1689                keytype = "localname"
1690            else:
1691                raise service_error(service_error.req, "Unknown lookup type")
1692        else:
1693            raise service_error(service_error.req, "No request?")
1694
1695        # The state may be massaged by the service function that called
1696        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1697        # state.
1698        self.state_lock.acquire()
1699        if self.state.has_key(key):
1700            rv = copy.deepcopy(self.state[key])
1701        self.state_lock.release()
1702
1703        if rv: return rv
1704        else: raise service_error(service_error.req, "No such experiment")
1705
1706
1707    def terminate_experiment(self, req, fid):
1708        """
1709        Swap this experiment out on the federants and delete the shared
1710        information
1711        """
1712        tbparams = { }
1713        req = req.get('TerminateRequestBody', None)
1714        if not req:
1715            raise service_error(service_error.req,
1716                    "Bad request format (no TerminateRequestBody)")
1717        exp = req.get('experiment', None)
1718        if exp:
1719            if exp.has_key('fedid'):
1720                key = exp['fedid']
1721                keytype = "fedid"
1722            elif exp.has_key('localname'):
1723                key = exp['localname']
1724                keytype = "localname"
1725            else:
1726                raise service_error(service_error.req, "Unknown lookup type")
1727        else:
1728            raise service_error(service_error.req, "No request?")
1729
1730        self.state_lock.acquire()
1731        fed_exp = self.state.get(key, None)
1732
1733        if fed_exp:
1734            # This branch of the conditional holds the lock to generate a
1735            # consistent temporary tbparams variable to deallocate experiments.
1736            # It releases the lock to do the deallocations and reacquires it to
1737            # remove the experiment state when the termination is complete.
1738            ids = []
1739            #  experimentID is a list of dicts that are self-describing
1740            #  identifiers.  This finds all the fedids and localnames - the
1741            #  keys of self.state - and puts them into ids.
1742            for id in fed_exp.get('experimentID', []):
1743                if id.has_key('fedid'): ids.append(id['fedid'])
1744                if id.has_key('localname'): ids.append(id['localname'])
1745
1746            # Construct enough of the tbparams to make the stop_segment calls
1747            # work
1748            for fed in fed_exp['federant']:
1749                try:
1750                    for e in fed['name']:
1751                        eid = e.get('localname', None)
1752                        if eid: break
1753                    else:
1754                        continue
1755
1756                    p = fed['emulab']['project']
1757
1758                    project = p['name']['localname']
1759                    tb = p['testbed']['localname']
1760                    user = p['user'][0]['userID']['localname']
1761
1762                    domain = fed['emulab']['domain']
1763                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1764                    aid = fed['allocID']
1765                except KeyError, e:
1766                    continue
1767                tbparams[tb] = {\
1768                        'user': user,\
1769                        'domain': domain,\
1770                        'project': project,\
1771                        'host': host,\
1772                        'eid': eid,\
1773                        'aid': aid,\
1774                    }
1775            self.state_lock.release()
1776
1777            # Stop everyone.
1778            for tb in tbparams.keys():
1779                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1780
1781            # release the allocations
1782            for tb in tbparams.keys():
1783                self.release_access(tb, tbparams[tb]['aid'])
1784
1785            # Remove the terminated experiment
1786            self.state_lock.acquire()
1787            for id in ids:
1788                if self.state.has_key(id): del self.state[id]
1789
1790            if self.state_filename: self.write_state()
1791            self.state_lock.release()
1792
1793            return { 'experiment': exp }
1794        else:
1795            # Don't forget to release the lock
1796            self.state_lock.release()
1797            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.