source: fedd/fedd_experiment_control.py @ 3f6bc5f

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 3f6bc5f was 3f6bc5f, checked in by Ted Faber <faber@…>, 15 years ago

Initial move to general authorization framework. Currently integrated with Access stuff fully.

  • Property mode set to 100644
File size: 52.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5
6import re
7import random
8import string
9import subprocess
10import tempfile
11import copy
12import pickle
13import logging
14
15import traceback
16# For parsing visualization output and splitter output
17import xml.parsers.expat
18
19from threading import *
20from subprocess import *
21
22from fedd_services import *
23from fedd_internal_services import *
24from fedd_util import *
25from fedid import fedid, generate_fedid
26from remote_service import xmlrpc_handler, soap_handler, service_caller
27import parse_detail
28from service_error import service_error
29
30
31class nullHandler(logging.Handler):
32    def emit(self, record): pass
33
34fl = logging.getLogger("fedd.experiment_control")
35fl.addHandler(nullHandler())
36
37class fedd_experiment_control_local:
38    """
39    Control of experiments that this system can directly access.
40
41    Includes experiment creation, termination and information dissemination.
42    Thred safe.
43    """
44   
45    class thread_pool:
46        """
47        A class to keep track of a set of threads all invoked for the same
48        task.  Manages the mutual exclusion of the states.
49        """
50        def __init__(self):
51            """
52            Start a pool.
53            """
54            self.changed = Condition()
55            self.started = 0
56            self.terminated = 0
57
58        def acquire(self):
59            """
60            Get the pool's lock.
61            """
62            self.changed.acquire()
63
64        def release(self):
65            """
66            Release the pool's lock.
67            """
68            self.changed.release()
69
70        def wait(self, timeout = None):
71            """
72            Wait for a pool thread to start or stop.
73            """
74            self.changed.wait(timeout)
75
76        def start(self):
77            """
78            Called by a pool thread to report starting.
79            """
80            self.changed.acquire()
81            self.started += 1
82            self.changed.notifyAll()
83            self.changed.release()
84
85        def terminate(self):
86            """
87            Called by a pool thread to report finishing.
88            """
89            self.changed.acquire()
90            self.terminated += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def clear(self):
95            """
96            Clear all pool data.
97            """
98            self.changed.acquire()
99            self.started = 0
100            self.terminated =0
101            self.changed.notifyAll()
102            self.changed.release()
103
104    class pooled_thread(Thread):
105        """
106        One of a set of threads dedicated to a specific task.  Uses the
107        thread_pool class above for coordination.
108        """
109        def __init__(self, group=None, target=None, name=None, args=(), 
110                kwargs={}, pdata=None, trace_file=None):
111            Thread.__init__(self, group, target, name, args, kwargs)
112            self.rv = None          # Return value of the ops in this thread
113            self.exception = None   # Exception that terminated this thread
114            self.target=target      # Target function to run on start()
115            self.args = args        # Args to pass to target
116            self.kwargs = kwargs    # Additional kw args
117            self.pdata = pdata      # thread_pool for this class
118            # Logger for this thread
119            self.log = logging.getLogger("fedd.experiment_control")
120       
121        def run(self):
122            """
123            Emulate Thread.run, except add pool data manipulation and error
124            logging.
125            """
126            if self.pdata:
127                self.pdata.start()
128
129            if self.target:
130                try:
131                    self.rv = self.target(*self.args, **self.kwargs)
132                except service_error, s:
133                    self.exception = s
134                    self.log.error("Thread exception: %s %s" % \
135                            (s.code_string(), s.desc))
136                except:
137                    self.exception = sys.exc_info()[1]
138                    self.log.error(("Unexpected thread exception: %s" +\
139                            "Trace %s") % (self.exception,\
140                                traceback.format_exc()))
141            if self.pdata:
142                self.pdata.terminate()
143
144    call_RequestAccess = service_caller('RequestAccess',
145            'getfeddPortType', feddServiceLocator, 
146            RequestAccessRequestMessage, 'RequestAccessRequestBody')
147
148    call_ReleaseAccess = service_caller('ReleaseAccess',
149            'getfeddPortType', feddServiceLocator, 
150            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
151
152    call_Ns2Split = service_caller('Ns2Split',
153            'getfeddInternalPortType', feddInternalServiceLocator, 
154            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
155
156    def __init__(self, config=None, auth=None):
157        """
158        Intialize the various attributes, most from the config object
159        """
160        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
161        self.thread_pool = fedd_experiment_control_local.thread_pool
162
163        self.cert_file = None
164        self.cert_pwd = None
165        self.trusted_certs = None
166
167        # Walk through the various relevant certificat specifying config
168        # attributes until the local certificate attributes can be resolved.
169        # The walk is from most specific to most general specification.
170        for s in ("experiment_control", "globals"):
171            if config.has_section(s): 
172                if config.has_option(s, "cert_file"):
173                    if not self.cert_file:
174                        self.cert_file = config.get(s, "cert_file")
175                        self.cert_pwd = config.get(s, "cert_pwd")
176
177                if config.has_option(s, "trusted_certs"):
178                    if not self.trusted_certs:
179                        self.trusted_certs = config.get(s, "trusted_certs")
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193        if config.has_section("experiment_control"):
194            self.debug = config.get("experiment_control", "create_debug")
195            self.state_filename = config.get("experiment_control", 
196                    "experiment_state_file")
197            self.splitter_url = config.get("experiment_control", "splitter_url")
198            self.fedkit = config.get("experiment_control", "fedkit")
199        else:
200            self.debug = False
201            self.state_filename = None
202            self.splitter_url = None
203            self.fedkit = None
204
205        # XXX
206        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
207        self.ssh_type = "rsa"
208        self.state = { }
209        self.state_lock = Lock()
210        self.tclsh = "/usr/local/bin/otclsh"
211        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
212        self.tbmap = { 
213                'deter':'https://users.isi.deterlab.net:23235',
214                'emulab':'https://users.isi.deterlab.net:23236',
215                'ucb':'https://users.isi.deterlab.net:23237',
216                }
217        self.trace_file = sys.stderr
218
219        self.def_expstart = \
220                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
221                "/tmp/federate";
222        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
223                "FEDDIR/hosts";
224        self.def_gwstart = \
225                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
226                "/tmp/bridge.log";
227        self.def_mgwstart = \
228                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
229                "/tmp/bridge.log";
230        self.def_gwimage = "FBSD61-TUNNEL2";
231        self.def_gwtype = "pc";
232
233
234        if self.ssh_pubkey_file:
235            try:
236                f = open(self.ssh_pubkey_file, 'r')
237                self.ssh_pubkey = f.read()
238                f.close()
239            except IOError:
240                raise service_error(service_error.internal,
241                        "Cannot read sshpubkey")
242
243        set_log_level(config, "experiment_control", self.log)
244
245        # Grab saved state.  OK to do this w/o locking because it's read only
246        # and only one thread should be in existence that can see self.state at
247        # this point.
248        if self.state_filename:
249            self.read_state()
250
251        # Dispatch tables
252        self.soap_services = {\
253                'Create': soap_handler(\
254                        CreateRequestMessage.typecode,
255                        getattr(self, "create_experiment"), 
256                        CreateResponseMessage,
257                        "CreateResponseBody"),
258                'Vtopo': soap_handler(\
259                        VtopoRequestMessage.typecode,
260                        getattr(self, "get_vtopo"),
261                        VtopoResponseMessage,
262                        "VtopoResponseBody"),
263                'Vis': soap_handler(\
264                        VisRequestMessage.typecode,
265                        getattr(self, "get_vis"),
266                        VisResponseMessage,
267                        "VisResponseBody"),
268                'Info': soap_handler(\
269                        InfoRequestMessage.typecode,
270                        getattr(self, "get_info"),
271                        InfoResponseMessage,
272                        "InfoResponseBody"),
273                'Terminate': soap_handler(\
274                        TerminateRequestMessage.typecode,
275                        getattr(self, "terminate_experiment"),
276                        TerminateResponseMessage,
277                        "TerminateResponseBody"),
278        }
279
280        self.xmlrpc_services = {\
281                'Create': xmlrpc_handler(\
282                        getattr(self, "create_experiment"), 
283                        "CreateResponseBody"),
284                'Vtopo': xmlrpc_handler(\
285                        getattr(self, "get_vtopo"),
286                        "VtopoResponseBody"),
287                'Vis': xmlrpc_handler(\
288                        getattr(self, "get_vis"),
289                        "VisResponseBody"),
290                'Info': xmlrpc_handler(\
291                        getattr(self, "get_info"),
292                        "InfoResponseBody"),
293                'Terminate': xmlrpc_handler(\
294                        getattr(self, "terminate_experiment"),
295                        "TerminateResponseBody"),
296        }
297
298    def copy_file(self, src, dest, size=1024):
299        """
300        Exceedingly simple file copy.
301        """
302        s = open(src,'r')
303        d = open(dest, 'w')
304
305        buf = "x"
306        while buf != "":
307            buf = s.read(size)
308            d.write(buf)
309        s.close()
310        d.close()
311
312    # Call while holding self.state_lock
313    def write_state(self):
314        """
315        Write a new copy of experiment state after copying the existing state
316        to a backup.
317
318        State format is a simple pickling of the state dictionary.
319        """
320        if os.access(self.state_filename, os.W_OK):
321            self.copy_file(self.state_filename, \
322                    "%s.bak" % self.state_filename)
323        try:
324            f = open(self.state_filename, 'w')
325            pickle.dump(self.state, f)
326        except IOError, e:
327            self.log.error("Can't write file %s: %s" % \
328                    (self.state_filename, e))
329        except pickle.PicklingError, e:
330            self.log.error("Pickling problem: %s" % e)
331        except TypeError, e:
332            self.log.error("Pickling problem (TypeError): %s" % e)
333
334    # Call while holding self.state_lock
335    def read_state(self):
336        """
337        Read a new copy of experiment state.  Old state is overwritten.
338
339        State format is a simple pickling of the state dictionary.
340        """
341        try:
342            f = open(self.state_filename, "r")
343            self.state = pickle.load(f)
344            self.log.debug("[read_state]: Read state from %s" % \
345                    self.state_filename)
346        except IOError, e:
347            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
348                    % (self.state_filename, e))
349        except pickle.UnpicklingError, e:
350            self.log.warning(("[read_state]: No saved state: " + \
351                    "Unpickling failed: %s") % e)
352
353    def scp_file(self, file, user, host, dest=""):
354        """
355        scp a file to the remote host.  If debug is set the action is only
356        logged.
357        """
358
359        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
360        rv = 0
361
362        try:
363            dnull = open("/dev/null", "r")
364        except IOError:
365            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
366            dnull = Null
367
368        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
369        if not self.debug:
370            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
371            else: rv = call(scp_cmd)
372
373        return rv == 0
374
375    def ssh_cmd(self, user, host, cmd, wname=None):
376        """
377        Run a remote command on host as user.  If debug is set, the action is
378        only logged.
379        """
380        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
381
382        try:
383            dnull = open("/dev/null", "r")
384        except IOError:
385            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
386            dnull = Null
387
388        self.log.debug("[ssh_cmd]: %s" % sh_str)
389        if not self.debug:
390            if dnull:
391                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
392            else:
393                sub = Popen(sh_str, shell=True)
394            return sub.wait() == 0
395        else:
396            return True
397
398    def ship_configs(self, host, user, src_dir, dest_dir):
399        """
400        Copy federant-specific configuration files to the federant.
401        """
402        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
403            return False
404        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
405            return False
406
407        for f in os.listdir(src_dir):
408            if os.path.isdir(f):
409                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
410                        "%s/%s" % (dest_dir, f)):
411                    return False
412            else:
413                if not self.scp_file("%s/%s" % (src_dir, f), 
414                        user, host, dest_dir):
415                    return False
416        return True
417
418    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
419        """
420        Start a sub-experiment on a federant.
421
422        Get the current state, modify or create as appropriate, ship data and
423        configs and start the experiment.  There are small ordering differences
424        based on the initial state of the sub-experiment.
425        """
426        # ops node in the federant
427        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
428        user = tbparams[tb]['user']     # federant user
429        pid = tbparams[tb]['project']   # federant project
430        # XXX
431        base_confs = ( "hosts",)
432        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
433        # command to test experiment state
434        expinfo_exec = "/usr/testbed/bin/expinfo" 
435        # Configuration directories on the remote machine
436        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
437        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
438        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
439        # Regular expressions to parse the expinfo response
440        state_re = re.compile("State:\s+(\w+)")
441        no_exp_re = re.compile("^No\s+such\s+experiment")
442        state = None    # Experiment state parsed from expinfo
443        # The expinfo ssh command
444        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
445
446        # Get status
447        self.log.debug("[start_segment]: %s"% " ".join(cmd))
448        dev_null = None
449        try:
450            dev_null = open("/dev/null", "a")
451        except IOError, e:
452            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
453
454        if self.debug:
455            state = 'swapped'
456            rv = 0
457        else:
458            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
459            for line in status.stdout:
460                m = state_re.match(line)
461                if m: state = m.group(1)
462                else:
463                    m = no_exp_re.match(line)
464                    if m: state = "none"
465            rv = status.wait()
466
467        # If the experiment is not present the subcommand returns a non-zero
468        # return value.  If we successfully parsed a "none" outcome, ignore the
469        # return code.
470        if rv != 0 and state != "none":
471            raise service_error(service_error.internal,
472                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
473
474        self.log.debug("[start_segment]: %s: %s" % (tb, state))
475        self.log.info("[start_segment]:transferring experiment to %s" % tb)
476
477        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
478            return False
479        # Clear the federation files
480        if not self.ssh_cmd(user, host, 
481                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
482            return False
483        if not self.ssh_cmd(user, host, 
484                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
485            return False
486        # Clear and create the tarfiles and rpm directories
487        for d in (tarfiles_dir, rpms_dir):
488            if not self.ssh_cmd(user, host, 
489                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
490                return False
491            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
492                    "create tarfiles"):
493                return False
494       
495        if state == 'active':
496            # Remote experiment is active.  Modify it.
497            for f in base_confs:
498                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
499                        "%s/%s" % (proj_dir, f)):
500                    return False
501            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
502                    proj_dir):
503                return False
504            if os.path.isdir("%s/tarfiles" % tmpdir):
505                if not self.ship_configs(host, user,
506                        "%s/tarfiles" % tmpdir, tarfiles_dir):
507                    return False
508            if os.path.isdir("%s/rpms" % tmpdir):
509                if not self.ship_configs(host, user,
510                        "%s/rpms" % tmpdir, tarfiles_dir):
511                    return False
512            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
513            if not self.ssh_cmd(user, host,
514                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
515                            (pid, eid, tclfile), "modexp"):
516                return False
517            return True
518        elif state == "swapped":
519            # Remote experiment swapped out.  Modify it and swap it in.
520            for f in base_confs:
521                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
522                        "%s/%s" % (proj_dir, f)):
523                    return False
524            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
525                    proj_dir):
526                return False
527            if os.path.isdir("%s/tarfiles" % tmpdir):
528                if not self.ship_configs(host, user,
529                        "%s/tarfiles" % tmpdir, tarfiles_dir):
530                    return False
531            if os.path.isdir("%s/rpms" % tmpdir):
532                if not self.ship_configs(host, user,
533                        "%s/rpms" % tmpdir, tarfiles_dir):
534                    return False
535            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
536            if not self.ssh_cmd(user, host,
537                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
538                    "modexp"):
539                return False
540            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
541            if not self.ssh_cmd(user, host,
542                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
543                    "swapexp"):
544                return False
545            return True
546        elif state == "none":
547            # No remote experiment.  Create one.  We do this in 2 steps so we
548            # can put the configuration files and scripts into the new
549            # experiment directories.
550
551            # Tarfiles must be present for creation to work
552            if os.path.isdir("%s/tarfiles" % tmpdir):
553                if not self.ship_configs(host, user,
554                        "%s/tarfiles" % tmpdir, tarfiles_dir):
555                    return False
556            if os.path.isdir("%s/rpms" % tmpdir):
557                if not self.ship_configs(host, user,
558                        "%s/rpms" % tmpdir, tarfiles_dir):
559                    return False
560            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
561            if not self.ssh_cmd(user, host,
562                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
563                            (pid, eid, tclfile), "startexp"):
564                return False
565            # After startexp the per-experiment directories exist
566            for f in base_confs:
567                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
568                        "%s/%s" % (proj_dir, f)):
569                    return False
570            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
571                    proj_dir):
572                return False
573            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
574            if not self.ssh_cmd(user, host,
575                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
576                    "swapexp"):
577                return False
578            return True
579        else:
580            self.log.debug("[start_segment]:unknown state %s" % state)
581            return False
582
583    def stop_segment(self, tb, eid, tbparams):
584        """
585        Stop a sub experiment by calling swapexp on the federant
586        """
587        user = tbparams[tb]['user']
588        host = tbparams[tb]['host']
589        pid = tbparams[tb]['project']
590
591        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
592        return self.ssh_cmd(user, host,
593                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
594
595       
596    def generate_ssh_keys(self, dest, type="rsa" ):
597        """
598        Generate a set of keys for the gateways to use to talk.
599
600        Keys are of type type and are stored in the required dest file.
601        """
602        valid_types = ("rsa", "dsa")
603        t = type.lower();
604        if t not in valid_types: raise ValueError
605        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
606
607        try:
608            trace = open("/dev/null", "w")
609        except IOError:
610            raise service_error(service_error.internal,
611                    "Cannot open /dev/null??");
612
613        # May raise CalledProcessError
614        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
615        rv = call(cmd, stdout=trace, stderr=trace)
616        if rv != 0:
617            raise service_error(service_error.internal, 
618                    "Cannot generate nonce ssh keys.  %s return code %d" \
619                            % (self.ssh_keygen, rv))
620
621    def gentopo(self, str):
622        """
623        Generate the topology dtat structure from the splitter's XML
624        representation of it.
625
626        The topology XML looks like:
627            <experiment>
628                <nodes>
629                    <node><vname></vname><ips>ip1:ip2</ips></node>
630                </nodes>
631                <lans>
632                    <lan>
633                        <vname></vname><vnode></vnode><ip></ip>
634                        <bandwidth></bandwidth><member>node:port</member>
635                    </lan>
636                </lans>
637        """
638        class topo_parse:
639            """
640            Parse the topology XML and create the dats structure.
641            """
642            def __init__(self):
643                # Typing of the subelements for data conversion
644                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
645                self.int_subelements = ( 'bandwidth',)
646                self.float_subelements = ( 'delay',)
647                # The final data structure
648                self.nodes = [ ]
649                self.lans =  [ ]
650                self.topo = { \
651                        'node': self.nodes,\
652                        'lan' : self.lans,\
653                    }
654                self.element = { }  # Current element being created
655                self.chars = ""     # Last text seen
656
657            def end_element(self, name):
658                # After each sub element the contents is added to the current
659                # element or to the appropriate list.
660                if name == 'node':
661                    self.nodes.append(self.element)
662                    self.element = { }
663                elif name == 'lan':
664                    self.lans.append(self.element)
665                    self.element = { }
666                elif name in self.str_subelements:
667                    self.element[name] = self.chars
668                    self.chars = ""
669                elif name in self.int_subelements:
670                    self.element[name] = int(self.chars)
671                    self.chars = ""
672                elif name in self.float_subelements:
673                    self.element[name] = float(self.chars)
674                    self.chars = ""
675
676            def found_chars(self, data):
677                self.chars += data.rstrip()
678
679
680        tp = topo_parse();
681        parser = xml.parsers.expat.ParserCreate()
682        parser.EndElementHandler = tp.end_element
683        parser.CharacterDataHandler = tp.found_chars
684
685        parser.Parse(str)
686
687        return tp.topo
688       
689
690    def genviz(self, topo):
691        """
692        Generate the visualization the virtual topology
693        """
694
695        neato = "/usr/local/bin/neato"
696        # These are used to parse neato output and to create the visualization
697        # file.
698        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
699        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
700                "%s</type></node>"
701
702        try:
703            # Node names
704            nodes = [ n['vname'] for n in topo['node'] ]
705            topo_lans = topo['lan']
706        except KeyError:
707            raise service_error(service_error.internal, "Bad topology")
708
709        lans = { }
710        links = { }
711
712        # Walk through the virtual topology, organizing the connections into
713        # 2-node connections (links) and more-than-2-node connections (lans).
714        # When a lan is created, it's added to the list of nodes (there's a
715        # node in the visualization for the lan).
716        for l in topo_lans:
717            if links.has_key(l['vname']):
718                if len(links[l['vname']]) < 2:
719                    links[l['vname']].append(l['vnode'])
720                else:
721                    nodes.append(l['vname'])
722                    lans[l['vname']] = links[l['vname']]
723                    del links[l['vname']]
724                    lans[l['vname']].append(l['vnode'])
725            elif lans.has_key(l['vname']):
726                lans[l['vname']].append(l['vnode'])
727            else:
728                links[l['vname']] = [ l['vnode'] ]
729
730
731        # Open up a temporary file for dot to turn into a visualization
732        try:
733            df, dotname = tempfile.mkstemp()
734            dotfile = os.fdopen(df, 'w')
735        except IOError:
736            raise service_error(service_error.internal,
737                    "Failed to open file in genviz")
738
739        # Generate a dot/neato input file from the links, nodes and lans
740        try:
741            print >>dotfile, "graph G {"
742            for n in nodes:
743                print >>dotfile, '\t"%s"' % n
744            for l in links.keys():
745                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
746            for l in lans.keys():
747                for n in lans[l]:
748                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
749            print >>dotfile, "}"
750            dotfile.close()
751        except TypeError:
752            raise service_error(service_error.internal,
753                    "Single endpoint link in vtopo")
754        except IOError:
755            raise service_error(service_error.internal, "Cannot write dot file")
756
757        # Use dot to create a visualization
758        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
759                '-Gpack=true', dotname], stdout=PIPE)
760
761        # Translate dot to vis format
762        vis_nodes = [ ]
763        vis = { 'node': vis_nodes }
764        for line in dot.stdout:
765            m = vis_re.match(line)
766            if m:
767                vn = m.group(1)
768                vis_node = {'name': vn, \
769                        'x': float(m.group(2)),\
770                        'y' : float(m.group(3)),\
771                    }
772                if vn in links.keys() or vn in lans.keys():
773                    vis_node['type'] = 'lan'
774                else:
775                    vis_node['type'] = 'node'
776                vis_nodes.append(vis_node)
777        rv = dot.wait()
778
779        os.remove(dotname)
780        if rv == 0 : return vis
781        else: return None
782
783    def get_access(self, tb, nodes, user, tbparam):
784        """
785        Get access to testbed through fedd and set the parameters for that tb
786        """
787
788        translate_attr = {
789            'slavenodestartcmd': 'expstart',
790            'slaveconnectorstartcmd': 'gwstart',
791            'masternodestartcmd': 'mexpstart',
792            'masterconnectorstartcmd': 'mgwstart',
793            'connectorimage': 'gwimage',
794            'connectortype': 'gwtype',
795            'tunnelcfg': 'tun',
796            'smbshare': 'smbshare',
797        }
798
799        uri = self.tbmap.get(tb, None)
800        if not uri:
801            raise service_error(serice_error.server_config, 
802                    "Unknown testbed: %s" % tb)
803
804        # The basic request
805        req = {\
806                'destinationTestbed' : { 'uri' : uri },
807                'user':  user,
808                'allocID' : { 'localname': 'test' },
809                # XXX: need to get service access stright
810                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
811                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
812            }
813       
814        # node resources if any
815        if nodes != None and len(nodes) > 0:
816            rnodes = [ ]
817            for n in nodes:
818                rn = { }
819                image, hw, count = n.split(":")
820                if image: rn['image'] = [ image ]
821                if hw: rn['hardware'] = [ hw ]
822                if count: rn['count'] = int(count)
823                rnodes.append(rn)
824            req['resources']= { }
825            req['resources']['node'] = rnodes
826
827        r = self.call_RequestAccess(uri, req, 
828                self.cert_file, self.cert_pwd, self.trusted_certs)
829
830        if r.has_key('RequestAccessResponseBody'):
831            r = r['RequestAccessResponseBody']
832        else:
833            raise service_error(service_error.protocol,
834                    "Bad proxy response")
835
836        e = r['emulab']
837        p = e['project']
838        tbparam[tb] = { 
839                "boss": e['boss'],
840                "host": e['ops'],
841                "domain": e['domain'],
842                "fs": e['fileServer'],
843                "eventserver": e['eventServer'],
844                "project": unpack_id(p['name']),
845                "emulab" : e,
846                "allocID" : r['allocID'],
847                }
848        # Make the testbed name be the label the user applied
849        p['testbed'] = {'localname': tb }
850
851        for u in p['user']:
852            tbparam[tb]['user'] = unpack_id(u['userID'])
853
854        for a in e['fedAttr']:
855            if a['attribute']:
856                key = translate_attr.get(a['attribute'].lower(), None)
857                if key:
858                    tbparam[tb][key]= a['value']
859       
860    def release_access(self, tb, aid):
861        """
862        Release access to testbed through fedd
863        """
864
865        uri = self.tbmap.get(tb, None)
866        if not uri:
867            raise service_error(serice_error.server_config, 
868                    "Unknown testbed: %s" % tb)
869
870        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
871                self.cert_file, self.cert_pwd, self.trusted_certs)
872
873        # better error coding
874
875    def remote_splitter(self, uri, desc, master):
876
877        req = {
878                'description' : { 'ns2description': desc },
879                'master': master,
880                'include_fedkit': bool(self.fedkit)
881            }
882
883        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
884                self.trusted_certs)
885
886        if r.has_key('Ns2SplitResponseBody'):
887            r = r['Ns2SplitResponseBody']
888            if r.has_key('output'):
889                return r['output'].splitlines()
890            else:
891                raise service_error(service_error.protocol, 
892                        "Bad splitter response (no output)")
893        else:
894            raise service_error(service_error.protocol, "Bad splitter response")
895       
896    class current_testbed:
897        """
898        Object for collecting the current testbed description.  The testbed
899        description is saved to a file with the local testbed variables
900        subsittuted line by line.
901        """
902        def __init__(self, eid, tmpdir, fedkit):
903            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
904            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
905            self.current_testbed = None
906            self.testbed_file = None
907
908            self.def_expstart = \
909                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
910            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
911            self.def_gwstart = \
912                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
913            self.def_mgwstart = \
914                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
915            self.def_gwimage = "FBSD61-TUNNEL2";
916            self.def_gwtype = "pc";
917
918            self.eid = eid
919            self.tmpdir = tmpdir
920            self.fedkit = fedkit
921
922        def __call__(self, line, master, allocated, tbparams):
923            # Capture testbed topology descriptions
924            if self.current_testbed == None:
925                m = self.begin_testbed.match(line)
926                if m != None:
927                    self.current_testbed = m.group(1)
928                    if self.current_testbed == None:
929                        raise service_error(service_error.req,
930                                "Bad request format (unnamed testbed)")
931                    allocated[self.current_testbed] = \
932                            allocated.get(self.current_testbed,0) + 1
933                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
934                    if not os.path.exists(tb_dir):
935                        try:
936                            os.mkdir(tb_dir)
937                        except IOError:
938                            raise service_error(service_error.internal,
939                                    "Cannot create %s" % tb_dir)
940                    try:
941                        self.testbed_file = open("%s/%s.%s.tcl" %
942                                (tb_dir, self.eid, self.current_testbed), 'w')
943                    except IOError:
944                        self.testbed_file = None
945                    return True
946                else: return False
947            else:
948                m = self.end_testbed.match(line)
949                if m != None:
950                    if m.group(1) != self.current_testbed:
951                        raise service_error(service_error.internal, 
952                                "Mismatched testbed markers!?")
953                    if self.testbed_file != None: 
954                        self.testbed_file.close()
955                        self.testbed_file = None
956                    self.current_testbed = None
957                elif self.testbed_file:
958                    # Substitute variables and put the line into the local
959                    # testbed file.
960                    gwtype = tbparams[self.current_testbed].get('gwtype', 
961                            self.def_gwtype)
962                    gwimage = tbparams[self.current_testbed].get('gwimage', 
963                            self.def_gwimage)
964                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
965                            self.def_mgwstart)
966                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
967                            self.def_mexpstart)
968                    gwstart = tbparams[self.current_testbed].get('gwstart', 
969                            self.def_gwstart)
970                    expstart = tbparams[self.current_testbed].get('expstart', 
971                            self.def_expstart)
972                    project = tbparams[self.current_testbed].get('project')
973                    line = re.sub("GWTYPE", gwtype, line)
974                    line = re.sub("GWIMAGE", gwimage, line)
975                    if self.current_testbed == master:
976                        line = re.sub("GWSTART", mgwstart, line)
977                        line = re.sub("EXPSTART", mexpstart, line)
978                    else:
979                        line = re.sub("GWSTART", gwstart, line)
980                        line = re.sub("EXPSTART", expstart, line)
981                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
982                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
983                    line = re.sub("EID", self.eid, line)
984                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
985                            (project, self.eid), line)
986                    if self.fedkit:
987                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
988                                line)
989                    print >>self.testbed_file, line
990                return True
991
992    class allbeds:
993        """
994        Process the Allbeds section.  Get access to each federant and save the
995        parameters in tbparams
996        """
997        def __init__(self, get_access):
998            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
999            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1000            self.in_allbeds = False
1001            self.get_access = get_access
1002
1003        def __call__(self, line, user, tbparams):
1004            # Testbed access parameters
1005            if not self.in_allbeds:
1006                if self.begin_allbeds.match(line):
1007                    self.in_allbeds = True
1008                    return True
1009                else:
1010                    return False
1011            else:
1012                if self.end_allbeds.match(line):
1013                    self.in_allbeds = False
1014                else:
1015                    nodes = line.split('|')
1016                    tb = nodes.pop(0)
1017                    self.get_access(tb, nodes, user, tbparams)
1018                return True
1019
1020    class gateways:
1021        def __init__(self, eid, master, tmpdir, gw_pubkey,
1022                gw_secretkey, copy_file, fedkit):
1023            self.begin_gateways = \
1024                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1025            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1026            self.current_gateways = None
1027            self.control_gateway = None
1028            self.active_end = { }
1029
1030            self.eid = eid
1031            self.master = master
1032            self.tmpdir = tmpdir
1033            self.gw_pubkey_base = gw_pubkey
1034            self.gw_secretkey_base = gw_secretkey
1035
1036            self.copy_file = copy_file
1037            self.fedkit = fedkit
1038
1039
1040        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1041                active_end, tbparams, dtb, myname, desthost, type):
1042            """
1043            Produce a gateway configuration file from a gateways line.
1044            """
1045
1046            sproject = tbparams[gw].get('project', 'project')
1047            dproject = tbparams[dtb].get('project', 'project')
1048            sdomain = ".%s.%s%s" % (eid, sproject,
1049                    tbparams[gw].get('domain', ".example.com"))
1050            ddomain = ".%s.%s%s" % (eid, dproject,
1051                    tbparams[dtb].get('domain', ".example.com"))
1052            boss = tbparams[master].get('boss', "boss")
1053            fs = tbparams[master].get('fs', "fs")
1054            event_server = "%s%s" % \
1055                    (tbparams[gw].get('eventserver', "event_server"),
1056                            tbparams[gw].get('domain', "example.com"))
1057            remote_event_server = "%s%s" % \
1058                    (tbparams[dtb].get('eventserver', "event_server"),
1059                            tbparams[dtb].get('domain', "example.com"))
1060            seer_control = "%s%s" % \
1061                    (tbparams[gw].get('control', "control"), sdomain)
1062
1063            if self.fedkit:
1064                remote_script_dir = "/usr/local/federation/bin"
1065                local_script_dir = "/usr/local/federation/bin"
1066            else:
1067                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1068                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1069
1070            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1071            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1072            tunnel_cfg = tbparams[gw].get("tun", "false")
1073
1074            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1075            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1076
1077            # translate to lower case so the `hostname` hack for specifying
1078            # configuration files works.
1079            conf_file = conf_file.lower();
1080            remote_conf_file = remote_conf_file.lower();
1081
1082            if dtb == master:
1083                active = "false"
1084            elif gw == master:
1085                active = "true"
1086            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1087                active = "false"
1088            else:
1089                active_end['%s-%s' % (gw, dtb)] = 1
1090                active = "true"
1091
1092            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1093            print >>gwconfig, "Active: %s" % active
1094            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1095            print >>gwconfig, "BossName: %s" % boss
1096            print >>gwconfig, "FsName: %s" % fs
1097            print >>gwconfig, "EventServerName: %s" % event_server
1098            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1099            print >>gwconfig, "SeerControl: %s" % seer_control
1100            print >>gwconfig, "Type: %s" % type
1101            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1102            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1103                    local_script_dir
1104            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1105            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1106            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1107                    (remote_conf_dir, remote_conf_file)
1108            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1109            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1110            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1111            gwconfig.close()
1112
1113            return active == "true"
1114
1115        def __call__(self, line, allocated, tbparams):
1116            # Process gateways
1117            if not self.current_gateways:
1118                m = self.begin_gateways.match(line)
1119                if m:
1120                    self.current_gateways = m.group(1)
1121                    if allocated.has_key(self.current_gateways):
1122                        # This test should always succeed
1123                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1124                        if not os.path.exists(tb_dir):
1125                            try:
1126                                os.mkdir(tb_dir)
1127                            except IOError:
1128                                raise service_error(service_error.internal,
1129                                        "Cannot create %s" % tb_dir)
1130                    else:
1131                        # XXX
1132                        self.log.error("[gateways]: Ignoring gateways for " + \
1133                                "unknown testbed %s" % self.current_gateways)
1134                        self.current_gateways = None
1135                    return True
1136                else:
1137                    return False
1138            else:
1139                m = self.end_gateways.match(line)
1140                if m :
1141                    if m.group(1) != self.current_gateways:
1142                        raise service_error(service_error.internal,
1143                                "Mismatched gateway markers!?")
1144                    if self.control_gateway:
1145                        try:
1146                            cc = open("%s/%s/client.conf" %
1147                                    (self.tmpdir, self.current_gateways), 'w')
1148                            print >>cc, "ControlGateway: %s" % \
1149                                    self.control_gateway
1150                            if tbparams[self.master].has_key('smbshare'):
1151                                print >>cc, "SMBSHare: %s" % \
1152                                        tbparams[self.master]['smbshare']
1153                            print >>cc, "ProjectUser: %s" % \
1154                                    tbparams[self.master]['user']
1155                            print >>cc, "ProjectName: %s" % \
1156                                    tbparams[self.master]['project']
1157                            cc.close()
1158                        except IOError:
1159                            raise service_error(service_error.internal,
1160                                    "Error creating client config")
1161                        try:
1162                            cc = open("%s/%s/seer.conf" %
1163                                    (self.tmpdir, self.current_gateways),
1164                                    'w')
1165                            if self.current_gateways != self.master:
1166                                print >>cc, "ControlNode: %s" % \
1167                                        self.control_gateway
1168                            print >>cc, "ExperimentID: %s/%s" % \
1169                                    ( tbparams[self.master]['project'], \
1170                                    self.eid )
1171                            cc.close()
1172                        except IOError:
1173                            raise service_error(service_error.internal,
1174                                    "Error creating seer config")
1175                    else:
1176                        debug.error("[gateways]: No control gateway for %s" %\
1177                                    self.current_gateways)
1178                    self.current_gateways = None
1179                else:
1180                    dtb, myname, desthost, type = line.split(" ")
1181
1182                    if type == "control" or type == "both":
1183                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1184                                self.eid, 
1185                                tbparams[self.current_gateways]['project'],
1186                                tbparams[self.current_gateways]['domain'])
1187                    try:
1188                        active = self.gateway_conf_file(self.current_gateways,
1189                                self.master, self.eid, self.gw_pubkey_base,
1190                                self.gw_secretkey_base,
1191                                self.active_end, tbparams, dtb, myname,
1192                                desthost, type)
1193                    except IOError, e:
1194                        raise service_error(service_error.internal,
1195                                "Failed to write config file for %s" % \
1196                                        self.current_gateway)
1197           
1198                    gw_pubkey = "%s/keys/%s" % \
1199                            (self.tmpdir, self.gw_pubkey_base)
1200                    gw_secretkey = "%s/keys/%s" % \
1201                            (self.tmpdir, self.gw_secretkey_base)
1202
1203                    pkfile = "%s/%s/%s" % \
1204                            ( self.tmpdir, self.current_gateways, 
1205                                    self.gw_pubkey_base)
1206                    skfile = "%s/%s/%s" % \
1207                            ( self.tmpdir, self.current_gateways, 
1208                                    self.gw_secretkey_base)
1209
1210                    if not os.path.exists(pkfile):
1211                        try:
1212                            self.copy_file(gw_pubkey, pkfile)
1213                        except IOError:
1214                            service_error(service_error.internal,
1215                                    "Failed to copy pubkey file")
1216
1217                    if active and not os.path.exists(skfile):
1218                        try:
1219                            self.copy_file(gw_secretkey, skfile)
1220                        except IOError:
1221                            service_error(service_error.internal,
1222                                    "Failed to copy secretkey file")
1223                return True
1224
1225    class shunt_to_file:
1226        """
1227        Simple class to write data between two regexps to a file.
1228        """
1229        def __init__(self, begin, end, filename):
1230            """
1231            Begin shunting on a match of begin, stop on end, send data to
1232            filename.
1233            """
1234            self.begin = re.compile(begin)
1235            self.end = re.compile(end)
1236            self.in_shunt = False
1237            self.file = None
1238            self.filename = filename
1239
1240        def __call__(self, line):
1241            """
1242            Call this on each line in the input that may be shunted.
1243            """
1244            if not self.in_shunt:
1245                if self.begin.match(line):
1246                    self.in_shunt = True
1247                    try:
1248                        self.file = open(self.filename, "w")
1249                    except:
1250                        self.file = None
1251                        raise
1252                    return True
1253                else:
1254                    return False
1255            else:
1256                if self.end.match(line):
1257                    if self.file: 
1258                        self.file.close()
1259                        self.file = None
1260                    self.in_shunt = False
1261                else:
1262                    if self.file:
1263                        print >>self.file, line
1264                return True
1265
1266    class shunt_to_list:
1267        """
1268        Same interface as shunt_to_file.  Data collected in self.list, one list
1269        element per line.
1270        """
1271        def __init__(self, begin, end):
1272            self.begin = re.compile(begin)
1273            self.end = re.compile(end)
1274            self.in_shunt = False
1275            self.list = [ ]
1276       
1277        def __call__(self, line):
1278            if not self.in_shunt:
1279                if self.begin.match(line):
1280                    self.in_shunt = True
1281                    return True
1282                else:
1283                    return False
1284            else:
1285                if self.end.match(line):
1286                    self.in_shunt = False
1287                else:
1288                    self.list.append(line)
1289                return True
1290
1291    class shunt_to_string:
1292        """
1293        Same interface as shunt_to_file.  Data collected in self.str, all in
1294        one string.
1295        """
1296        def __init__(self, begin, end):
1297            self.begin = re.compile(begin)
1298            self.end = re.compile(end)
1299            self.in_shunt = False
1300            self.str = ""
1301       
1302        def __call__(self, line):
1303            if not self.in_shunt:
1304                if self.begin.match(line):
1305                    self.in_shunt = True
1306                    return True
1307                else:
1308                    return False
1309            else:
1310                if self.end.match(line):
1311                    self.in_shunt = False
1312                else:
1313                    self.str += line
1314                return True
1315
1316    def create_experiment(self, req, fid):
1317        """
1318        The external interface to experiment creation called from the
1319        dispatcher.
1320
1321        Creates a working directory, splits the incoming description using the
1322        splitter script and parses out the avrious subsections using the
1323        lcasses above.  Once each sub-experiment is created, use pooled threads
1324        to instantiate them and start it all up.
1325        """
1326        try:
1327            tmpdir = tempfile.mkdtemp(prefix="split-")
1328        except IOError:
1329            raise service_error(service_error.internal, "Cannot create tmp dir")
1330
1331        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1332        gw_secretkey_base = "fed.%s" % self.ssh_type
1333        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1334        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1335        tclfile = tmpdir + "/experiment.tcl"
1336        tbparams = { }
1337
1338        pid = "dummy"
1339        gid = "dummy"
1340        # XXX
1341        fail_soft = False
1342
1343        try:
1344            os.mkdir(tmpdir+"/keys")
1345        except OSError:
1346            raise service_error(service_error.internal,
1347                    "Can't make temporary dir")
1348
1349        req = req.get('CreateRequestBody', None)
1350        if not req:
1351            raise service_error(service_error.req,
1352                    "Bad request format (no CreateRequestBody)")
1353        # The tcl parser needs to read a file so put the content into that file
1354        descr=req.get('experimentdescription', None)
1355        if descr:
1356            file_content=descr.get('ns2description', None)
1357            if file_content:
1358                try:
1359                    f = open(tclfile, 'w')
1360                    f.write(file_content)
1361                    f.close()
1362                except IOError:
1363                    raise service_error(service_error.internal,
1364                            "Cannot write temp experiment description")
1365            else:
1366                raise service_error(service_error.req, 
1367                        "Only ns2descriptions supported")
1368        else:
1369            raise service_error(service_error.req, "No experiment description")
1370
1371        if req.has_key('experimentID') and \
1372                req['experimentID'].has_key('localname'):
1373            eid = req['experimentID']['localname']
1374            self.state_lock.acquire()
1375            while (self.state.has_key(eid)):
1376                eid += random.choice(string.ascii_letters)
1377            # To avoid another thread picking this localname
1378            self.state[eid] = "placeholder"
1379            self.state_lock.release()
1380        else:
1381            eid = self.exp_stem
1382            for i in range(0,5):
1383                eid += random.choice(string.ascii_letters)
1384            self.state_lock.acquire()
1385            while (self.state.has_key(eid)):
1386                eid = self.exp_stem
1387                for i in range(0,5):
1388                    eid += random.choice(string.ascii_letters)
1389            # To avoid another thread picking this localname
1390            self.state[eid] = "placeholder"
1391            self.state_lock.release()
1392
1393        try:
1394            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1395        except ValueError:
1396            raise service_error(service_error.server_config, 
1397                    "Bad key type (%s)" % self.ssh_type)
1398
1399        user = req.get('user', None)
1400        if user == None:
1401            raise service_error(service_error.req, "No user")
1402
1403        master = req.get('master', None)
1404        if master == None:
1405            raise service_error(service_error.req, "No master testbed label")
1406       
1407        if self.splitter_url:
1408            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1409            split_data = self.remote_splitter(self.splitter_url, file_content,
1410                    master)
1411        else:
1412            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1413                str(self.muxmax), '-m', master]
1414
1415            if self.fedkit:
1416                tclcmd.append('-k')
1417
1418            tclcmd.extend([pid, gid, eid, tclfile])
1419
1420            self.log.debug("running local splitter %s", " ".join(tclcmd))
1421            tclparser = Popen(tclcmd, stdout=PIPE)
1422            split_data = tclparser.stdout
1423
1424        allocated = { }     # Testbeds we can access
1425        started = { }       # Testbeds where a sub-experiment started
1426                            # successfully
1427
1428        # Objects to parse the splitter output (defined above)
1429        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1430        parse_allbeds = self.allbeds(self.get_access)
1431        parse_gateways = self.gateways(eid, master, tmpdir,
1432                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1433        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1434                    "^#\s+End\s+Vtopo")
1435        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1436                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1437        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1438                "^#\s+End\s+tarfiles")
1439        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1440                "^#\s+End\s+rpms")
1441
1442        # Worling on the split data
1443        for line in split_data:
1444            line = line.rstrip()
1445            if parse_current_testbed(line, master, allocated, tbparams):
1446                continue
1447            elif parse_allbeds(line, user, tbparams):
1448                continue
1449            elif parse_gateways(line, allocated, tbparams):
1450                continue
1451            elif parse_vtopo(line):
1452                continue
1453            elif parse_hostnames(line):
1454                continue
1455            elif parse_tarfiles(line):
1456                continue
1457            elif parse_rpms(line):
1458                continue
1459            else:
1460                raise service_error(service_error.internal, 
1461                        "Bad tcl parse? %s" % line)
1462
1463        # Virtual topology and visualization
1464        vtopo = self.gentopo(parse_vtopo.str)
1465        if not vtopo:
1466            raise service_error(service_error.internal, 
1467                    "Failed to generate virtual topology")
1468
1469        vis = self.genviz(vtopo)
1470        if not vis:
1471            raise service_error(service_error.internal, 
1472                    "Failed to generate visualization")
1473
1474        # save federant information
1475        for k in allocated.keys():
1476            tbparams[k]['federant'] = {\
1477                    'name': [ { 'localname' : eid} ],\
1478                    'emulab': tbparams[k]['emulab'],\
1479                    'allocID' : tbparams[k]['allocID'],\
1480                    'master' : k == master,\
1481                }
1482
1483
1484        # Copy tarfiles and rpms needed at remote sites into a staging area
1485        try:
1486            if self.fedkit:
1487                parse_tarfiles.list.append(self.fedkit)
1488            for t in parse_tarfiles.list:
1489                if not os.path.exists("%s/tarfiles" % tmpdir):
1490                    os.mkdir("%s/tarfiles" % tmpdir)
1491                self.copy_file(t, "%s/tarfiles/%s" % \
1492                        (tmpdir, os.path.basename(t)))
1493            for r in parse_rpms.list:
1494                if not os.path.exists("%s/rpms" % tmpdir):
1495                    os.mkdir("%s/rpms" % tmpdir)
1496                self.copy_file(r, "%s/rpms/%s" % \
1497                        (tmpdir, os.path.basename(r)))
1498        except IOError, e:
1499            raise service_error(service_error.internal, 
1500                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1501
1502        thread_pool_info = self.thread_pool()
1503        threads = [ ]
1504
1505        for tb in [ k for k in allocated.keys() if k != master]:
1506            # Wait until we have a free slot to start the next testbed load
1507            thread_pool_info.acquire()
1508            while thread_pool_info.started - \
1509                    thread_pool_info.terminated >= self.nthreads:
1510                thread_pool_info.wait()
1511            thread_pool_info.release()
1512
1513            # Create and start a thread to start the segment, and save it to
1514            # get the return value later
1515            t  = self.pooled_thread(target=self.start_segment, 
1516                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1517                    pdata=thread_pool_info, trace_file=self.trace_file)
1518            threads.append(t)
1519            t.start()
1520
1521        # Wait until all finish (the first clause of the while is to make sure
1522        # one starts)
1523        thread_pool_info.acquire()
1524        while thread_pool_info.started == 0 or \
1525                thread_pool_info.started > thread_pool_info.terminated:
1526            thread_pool_info.wait()
1527        thread_pool_info.release()
1528
1529        # If none failed, start the master
1530        failed = [ t.getName() for t in threads if not t.rv ]
1531
1532        if len(failed) == 0:
1533            if not self.start_segment(master, eid, tbparams, tmpdir):
1534                failed.append(master)
1535
1536        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1537        # If one failed clean up, unless fail_soft is set
1538        if failed:
1539            if not fail_soft:
1540                for tb in succeeded:
1541                    self.stop_segment(tb, eid, tbparams)
1542                # Remove the placeholder
1543                self.state_lock.acquire()
1544                del self.state[eid]
1545                self.state_lock.release()
1546
1547                raise service_error(service_error.federant,
1548                    "Swap in failed on %s" % ",".join(failed))
1549        else:
1550            self.log.info("[start_segment]: Experiment %s started" % eid)
1551
1552        # Generate an ID for the experiment (slice) and a certificate that the
1553        # allocator can use to prove they own it.  We'll ship it back through
1554        # the encrypted connection.
1555        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1556
1557        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1558
1559        # Walk up tmpdir, deleting as we go
1560        for path, dirs, files in os.walk(tmpdir, topdown=False):
1561            for f in files:
1562                os.remove(os.path.join(path, f))
1563            for d in dirs:
1564                os.rmdir(os.path.join(path, d))
1565        os.rmdir(tmpdir)
1566
1567        # The deepcopy prevents the allocation ID and other binaries from being
1568        # translated into other formats
1569        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1570                for tb in tbparams.keys() \
1571                    if tbparams[tb].has_key('federant') ],\
1572                    'vtopo': vtopo,\
1573                    'vis' : vis,
1574                    'experimentID' : [\
1575                            { 'fedid': copy.copy(expid) }, \
1576                            { 'localname': eid },\
1577                        ],\
1578                    'experimentAccess': { 'X509' : expcert },\
1579                }
1580
1581        # Insert the experiment into our state and update the disk copy
1582        self.state_lock.acquire()
1583        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1584                for tb in tbparams.keys() \
1585                    if tbparams[tb].has_key('federant') ],\
1586                    'vtopo': vtopo,\
1587                    'vis' : vis,
1588                    'experimentID' : [\
1589                            { 'fedid': expid }, { 'localname': eid },\
1590                        ],\
1591                }
1592        self.state[eid] = self.state[expid]
1593        if self.state_filename: self.write_state()
1594        self.state_lock.release()
1595
1596        if not failed:
1597            return resp
1598        else:
1599            raise service_error(service_error.partial, \
1600                    "Partial swap in on %s" % ",".join(succeeded))
1601
1602
1603    def get_vtopo(self, req, fid):
1604        """
1605        Return the stored virtual topology for this experiment
1606        """
1607        rv = None
1608
1609        req = req.get('VtopoRequestBody', None)
1610        if not req:
1611            raise service_error(service_error.req,
1612                    "Bad request format (no VtopoRequestBody)")
1613        exp = req.get('experiment', None)
1614        if exp:
1615            if exp.has_key('fedid'):
1616                key = exp['fedid']
1617                keytype = "fedid"
1618            elif exp.has_key('localname'):
1619                key = exp['localname']
1620                keytype = "localname"
1621            else:
1622                raise service_error(service_error.req, "Unknown lookup type")
1623        else:
1624            raise service_error(service_error.req, "No request?")
1625
1626        self.state_lock.acquire()
1627        if self.state.has_key(key):
1628            rv = { 'experiment' : {keytype: key },\
1629                    'vtopo': self.state[key]['vtopo'],\
1630                }
1631        self.state_lock.release()
1632
1633        if rv: return rv
1634        else: raise service_error(service_error.req, "No such experiment")
1635
1636    def get_vis(self, req, fid):
1637        """
1638        Return the stored visualization for this experiment
1639        """
1640        rv = None
1641
1642        req = req.get('VisRequestBody', None)
1643        if not req:
1644            raise service_error(service_error.req,
1645                    "Bad request format (no VisRequestBody)")
1646        exp = req.get('experiment', None)
1647        if exp:
1648            if exp.has_key('fedid'):
1649                key = exp['fedid']
1650                keytype = "fedid"
1651            elif exp.has_key('localname'):
1652                key = exp['localname']
1653                keytype = "localname"
1654            else:
1655                raise service_error(service_error.req, "Unknown lookup type")
1656        else:
1657            raise service_error(service_error.req, "No request?")
1658
1659        self.state_lock.acquire()
1660        if self.state.has_key(key):
1661            rv =  { 'experiment' : {keytype: key },\
1662                    'vis': self.state[key]['vis'],\
1663                    }
1664        self.state_lock.release()
1665
1666        if rv: return rv
1667        else: raise service_error(service_error.req, "No such experiment")
1668
1669    def get_info(self, req, fid):
1670        """
1671        Return all the stored info about this experiment
1672        """
1673        rv = None
1674
1675        req = req.get('InfoRequestBody', None)
1676        if not req:
1677            raise service_error(service_error.req,
1678                    "Bad request format (no VisRequestBody)")
1679        exp = req.get('experiment', None)
1680        if exp:
1681            if exp.has_key('fedid'):
1682                key = exp['fedid']
1683                keytype = "fedid"
1684            elif exp.has_key('localname'):
1685                key = exp['localname']
1686                keytype = "localname"
1687            else:
1688                raise service_error(service_error.req, "Unknown lookup type")
1689        else:
1690            raise service_error(service_error.req, "No request?")
1691
1692        # The state may be massaged by the service function that called
1693        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1694        # state.
1695        self.state_lock.acquire()
1696        if self.state.has_key(key):
1697            rv = copy.deepcopy(self.state[key])
1698        self.state_lock.release()
1699
1700        if rv: return rv
1701        else: raise service_error(service_error.req, "No such experiment")
1702
1703
1704    def terminate_experiment(self, req, fid):
1705        """
1706        Swap this experiment out on the federants and delete the shared
1707        information
1708        """
1709        tbparams = { }
1710        req = req.get('TerminateRequestBody', None)
1711        if not req:
1712            raise service_error(service_error.req,
1713                    "Bad request format (no TerminateRequestBody)")
1714        exp = req.get('experiment', None)
1715        if exp:
1716            if exp.has_key('fedid'):
1717                key = exp['fedid']
1718                keytype = "fedid"
1719            elif exp.has_key('localname'):
1720                key = exp['localname']
1721                keytype = "localname"
1722            else:
1723                raise service_error(service_error.req, "Unknown lookup type")
1724        else:
1725            raise service_error(service_error.req, "No request?")
1726
1727        self.state_lock.acquire()
1728        fed_exp = self.state.get(key, None)
1729
1730        if fed_exp:
1731            # This branch of the conditional holds the lock to generate a
1732            # consistent temporary tbparams variable to deallocate experiments.
1733            # It releases the lock to do the deallocations and reacquires it to
1734            # remove the experiment state when the termination is complete.
1735            ids = []
1736            #  experimentID is a list of dicts that are self-describing
1737            #  identifiers.  This finds all the fedids and localnames - the
1738            #  keys of self.state - and puts them into ids.
1739            for id in fed_exp.get('experimentID', []):
1740                if id.has_key('fedid'): ids.append(id['fedid'])
1741                if id.has_key('localname'): ids.append(id['localname'])
1742
1743            # Construct enough of the tbparams to make the stop_segment calls
1744            # work
1745            for fed in fed_exp['federant']:
1746                try:
1747                    for e in fed['name']:
1748                        eid = e.get('localname', None)
1749                        if eid: break
1750                    else:
1751                        continue
1752
1753                    p = fed['emulab']['project']
1754
1755                    project = p['name']['localname']
1756                    tb = p['testbed']['localname']
1757                    user = p['user'][0]['userID']['localname']
1758
1759                    domain = fed['emulab']['domain']
1760                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1761                    aid = fed['allocID']
1762                except KeyError, e:
1763                    continue
1764                tbparams[tb] = {\
1765                        'user': user,\
1766                        'domain': domain,\
1767                        'project': project,\
1768                        'host': host,\
1769                        'eid': eid,\
1770                        'aid': aid,\
1771                    }
1772            self.state_lock.release()
1773
1774            # Stop everyone.
1775            for tb in tbparams.keys():
1776                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1777
1778            # release the allocations
1779            for tb in tbparams.keys():
1780                self.release_access(tb, tbparams[tb]['aid'])
1781
1782            # Remove the terminated experiment
1783            self.state_lock.acquire()
1784            for id in ids:
1785                if self.state.has_key(id): del self.state[id]
1786
1787            if self.state_filename: self.write_state()
1788            self.state_lock.release()
1789
1790            return { 'experiment': exp }
1791        else:
1792            # Don't forget to release the lock
1793            self.state_lock.release()
1794            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.