source: fedd/fedd_experiment_control.py @ 5576a47

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 5576a47 was 5576a47, checked in by Ted Faber <faber@…>, 16 years ago

project exporting in place

  • Property mode set to 100644
File size: 53.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5
6import re
7import random
8import string
9import subprocess
10import tempfile
11import copy
12import pickle
13import logging
14
15import traceback
16# For parsing visualization output and splitter output
17import xml.parsers.expat
18
19from threading import *
20from subprocess import *
21
22from fedd_services import *
23from fedd_internal_services import *
24from fedd_util import *
25from fedid import fedid, generate_fedid
26from remote_service import xmlrpc_handler, soap_handler, service_caller
27import parse_detail
28from service_error import service_error
29
30
31class nullHandler(logging.Handler):
32    def emit(self, record): pass
33
34fl = logging.getLogger("fedd.experiment_control")
35fl.addHandler(nullHandler())
36
37class fedd_experiment_control_local:
38    """
39    Control of experiments that this system can directly access.
40
41    Includes experiment creation, termination and information dissemination.
42    Thred safe.
43    """
44   
45    class thread_pool:
46        """
47        A class to keep track of a set of threads all invoked for the same
48        task.  Manages the mutual exclusion of the states.
49        """
50        def __init__(self):
51            """
52            Start a pool.
53            """
54            self.changed = Condition()
55            self.started = 0
56            self.terminated = 0
57
58        def acquire(self):
59            """
60            Get the pool's lock.
61            """
62            self.changed.acquire()
63
64        def release(self):
65            """
66            Release the pool's lock.
67            """
68            self.changed.release()
69
70        def wait(self, timeout = None):
71            """
72            Wait for a pool thread to start or stop.
73            """
74            self.changed.wait(timeout)
75
76        def start(self):
77            """
78            Called by a pool thread to report starting.
79            """
80            self.changed.acquire()
81            self.started += 1
82            self.changed.notifyAll()
83            self.changed.release()
84
85        def terminate(self):
86            """
87            Called by a pool thread to report finishing.
88            """
89            self.changed.acquire()
90            self.terminated += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def clear(self):
95            """
96            Clear all pool data.
97            """
98            self.changed.acquire()
99            self.started = 0
100            self.terminated =0
101            self.changed.notifyAll()
102            self.changed.release()
103
104    class pooled_thread(Thread):
105        """
106        One of a set of threads dedicated to a specific task.  Uses the
107        thread_pool class above for coordination.
108        """
109        def __init__(self, group=None, target=None, name=None, args=(), 
110                kwargs={}, pdata=None, trace_file=None):
111            Thread.__init__(self, group, target, name, args, kwargs)
112            self.rv = None          # Return value of the ops in this thread
113            self.exception = None   # Exception that terminated this thread
114            self.target=target      # Target function to run on start()
115            self.args = args        # Args to pass to target
116            self.kwargs = kwargs    # Additional kw args
117            self.pdata = pdata      # thread_pool for this class
118            # Logger for this thread
119            self.log = logging.getLogger("fedd.experiment_control")
120       
121        def run(self):
122            """
123            Emulate Thread.run, except add pool data manipulation and error
124            logging.
125            """
126            if self.pdata:
127                self.pdata.start()
128
129            if self.target:
130                try:
131                    self.rv = self.target(*self.args, **self.kwargs)
132                except service_error, s:
133                    self.exception = s
134                    self.log.error("Thread exception: %s %s" % \
135                            (s.code_string(), s.desc))
136                except:
137                    self.exception = sys.exc_info()[1]
138                    self.log.error(("Unexpected thread exception: %s" +\
139                            "Trace %s") % (self.exception,\
140                                traceback.format_exc()))
141            if self.pdata:
142                self.pdata.terminate()
143
144    call_RequestAccess = service_caller('RequestAccess',
145            'getfeddPortType', feddServiceLocator, 
146            RequestAccessRequestMessage, 'RequestAccessRequestBody')
147
148    call_ReleaseAccess = service_caller('ReleaseAccess',
149            'getfeddPortType', feddServiceLocator, 
150            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
151
152    call_Ns2Split = service_caller('Ns2Split',
153            'getfeddInternalPortType', feddInternalServiceLocator, 
154            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
155
156    def __init__(self, config=None, auth=None):
157        """
158        Intialize the various attributes, most from the config object
159        """
160        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
161        self.thread_pool = fedd_experiment_control_local.thread_pool
162
163        self.cert_file = None
164        self.cert_pwd = None
165        self.trusted_certs = None
166
167        # Walk through the various relevant certificat specifying config
168        # attributes until the local certificate attributes can be resolved.
169        # The walk is from most specific to most general specification.
170        for s in ("experiment_control", "globals"):
171            if config.has_section(s): 
172                if config.has_option(s, "cert_file"):
173                    if not self.cert_file:
174                        self.cert_file = config.get(s, "cert_file")
175                        self.cert_pwd = config.get(s, "cert_pwd")
176
177                if config.has_option(s, "trusted_certs"):
178                    if not self.trusted_certs:
179                        self.trusted_certs = config.get(s, "trusted_certs")
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193        if config.has_section("experiment_control"):
194            self.debug = config.get("experiment_control", "create_debug")
195            self.state_filename = config.get("experiment_control", 
196                    "experiment_state_file")
197            self.splitter_url = config.get("experiment_control", "splitter_url")
198            self.fedkit = config.get("experiment_control", "fedkit")
199        else:
200            self.debug = False
201            self.state_filename = None
202            self.splitter_url = None
203            self.fedkit = None
204
205        # XXX
206        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
207        self.ssh_type = "rsa"
208        self.state = { }
209        self.state_lock = Lock()
210        self.tclsh = "/usr/local/bin/otclsh"
211        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
212        self.tbmap = { 
213                'deter':'https://users.isi.deterlab.net:23235',
214                'emulab':'https://users.isi.deterlab.net:23236',
215                'ucb':'https://users.isi.deterlab.net:23237',
216                }
217        self.trace_file = sys.stderr
218
219        self.def_expstart = \
220                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
221                "/tmp/federate";
222        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
223                "FEDDIR/hosts";
224        self.def_gwstart = \
225                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
226                "/tmp/bridge.log";
227        self.def_mgwstart = \
228                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
229                "/tmp/bridge.log";
230        self.def_gwimage = "FBSD61-TUNNEL2";
231        self.def_gwtype = "pc";
232
233
234        if self.ssh_pubkey_file:
235            try:
236                f = open(self.ssh_pubkey_file, 'r')
237                self.ssh_pubkey = f.read()
238                f.close()
239            except IOError:
240                raise service_error(service_error.internal,
241                        "Cannot read sshpubkey")
242
243        set_log_level(config, "experiment_control", self.log)
244
245        # Grab saved state.  OK to do this w/o locking because it's read only
246        # and only one thread should be in existence that can see self.state at
247        # this point.
248        if self.state_filename:
249            self.read_state()
250
251        # Dispatch tables
252        self.soap_services = {\
253                'Create': soap_handler(\
254                        CreateRequestMessage.typecode,
255                        getattr(self, "create_experiment"), 
256                        CreateResponseMessage,
257                        "CreateResponseBody"),
258                'Vtopo': soap_handler(\
259                        VtopoRequestMessage.typecode,
260                        getattr(self, "get_vtopo"),
261                        VtopoResponseMessage,
262                        "VtopoResponseBody"),
263                'Vis': soap_handler(\
264                        VisRequestMessage.typecode,
265                        getattr(self, "get_vis"),
266                        VisResponseMessage,
267                        "VisResponseBody"),
268                'Info': soap_handler(\
269                        InfoRequestMessage.typecode,
270                        getattr(self, "get_info"),
271                        InfoResponseMessage,
272                        "InfoResponseBody"),
273                'Terminate': soap_handler(\
274                        TerminateRequestMessage.typecode,
275                        getattr(self, "terminate_experiment"),
276                        TerminateResponseMessage,
277                        "TerminateResponseBody"),
278        }
279
280        self.xmlrpc_services = {\
281                'Create': xmlrpc_handler(\
282                        getattr(self, "create_experiment"), 
283                        "CreateResponseBody"),
284                'Vtopo': xmlrpc_handler(\
285                        getattr(self, "get_vtopo"),
286                        "VtopoResponseBody"),
287                'Vis': xmlrpc_handler(\
288                        getattr(self, "get_vis"),
289                        "VisResponseBody"),
290                'Info': xmlrpc_handler(\
291                        getattr(self, "get_info"),
292                        "InfoResponseBody"),
293                'Terminate': xmlrpc_handler(\
294                        getattr(self, "terminate_experiment"),
295                        "TerminateResponseBody"),
296        }
297
298    def copy_file(self, src, dest, size=1024):
299        """
300        Exceedingly simple file copy.
301        """
302        s = open(src,'r')
303        d = open(dest, 'w')
304
305        buf = "x"
306        while buf != "":
307            buf = s.read(size)
308            d.write(buf)
309        s.close()
310        d.close()
311
312    # Call while holding self.state_lock
313    def write_state(self):
314        """
315        Write a new copy of experiment state after copying the existing state
316        to a backup.
317
318        State format is a simple pickling of the state dictionary.
319        """
320        if os.access(self.state_filename, os.W_OK):
321            self.copy_file(self.state_filename, \
322                    "%s.bak" % self.state_filename)
323        try:
324            f = open(self.state_filename, 'w')
325            pickle.dump(self.state, f)
326        except IOError, e:
327            self.log.error("Can't write file %s: %s" % \
328                    (self.state_filename, e))
329        except pickle.PicklingError, e:
330            self.log.error("Pickling problem: %s" % e)
331        except TypeError, e:
332            self.log.error("Pickling problem (TypeError): %s" % e)
333
334    # Call while holding self.state_lock
335    def read_state(self):
336        """
337        Read a new copy of experiment state.  Old state is overwritten.
338
339        State format is a simple pickling of the state dictionary.
340        """
341        try:
342            f = open(self.state_filename, "r")
343            self.state = pickle.load(f)
344            self.log.debug("[read_state]: Read state from %s" % \
345                    self.state_filename)
346        except IOError, e:
347            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
348                    % (self.state_filename, e))
349        except pickle.UnpicklingError, e:
350            self.log.warning(("[read_state]: No saved state: " + \
351                    "Unpickling failed: %s") % e)
352
353    def scp_file(self, file, user, host, dest=""):
354        """
355        scp a file to the remote host.  If debug is set the action is only
356        logged.
357        """
358
359        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
360        rv = 0
361
362        try:
363            dnull = open("/dev/null", "r")
364        except IOError:
365            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
366            dnull = Null
367
368        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
369        if not self.debug:
370            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
371            else: rv = call(scp_cmd)
372
373        return rv == 0
374
375    def ssh_cmd(self, user, host, cmd, wname=None):
376        """
377        Run a remote command on host as user.  If debug is set, the action is
378        only logged.
379        """
380        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
381
382        try:
383            dnull = open("/dev/null", "r")
384        except IOError:
385            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
386            dnull = Null
387
388        self.log.debug("[ssh_cmd]: %s" % sh_str)
389        if not self.debug:
390            if dnull:
391                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
392            else:
393                sub = Popen(sh_str, shell=True)
394            return sub.wait() == 0
395        else:
396            return True
397
398    def ship_configs(self, host, user, src_dir, dest_dir):
399        """
400        Copy federant-specific configuration files to the federant.
401        """
402        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
403            return False
404        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
405            return False
406
407        for f in os.listdir(src_dir):
408            if os.path.isdir(f):
409                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
410                        "%s/%s" % (dest_dir, f)):
411                    return False
412            else:
413                if not self.scp_file("%s/%s" % (src_dir, f), 
414                        user, host, dest_dir):
415                    return False
416        return True
417
418    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
419        """
420        Start a sub-experiment on a federant.
421
422        Get the current state, modify or create as appropriate, ship data and
423        configs and start the experiment.  There are small ordering differences
424        based on the initial state of the sub-experiment.
425        """
426        # ops node in the federant
427        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
428        user = tbparams[tb]['user']     # federant user
429        pid = tbparams[tb]['project']   # federant project
430        # XXX
431        base_confs = ( "hosts",)
432        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
433        # command to test experiment state
434        expinfo_exec = "/usr/testbed/bin/expinfo" 
435        # Configuration directories on the remote machine
436        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
437        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
438        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
439        # Regular expressions to parse the expinfo response
440        state_re = re.compile("State:\s+(\w+)")
441        no_exp_re = re.compile("^No\s+such\s+experiment")
442        state = None    # Experiment state parsed from expinfo
443        # The expinfo ssh command
444        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
445
446        # Get status
447        self.log.debug("[start_segment]: %s"% " ".join(cmd))
448        dev_null = None
449        try:
450            dev_null = open("/dev/null", "a")
451        except IOError, e:
452            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
453
454        if self.debug:
455            state = 'swapped'
456            rv = 0
457        else:
458            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
459            for line in status.stdout:
460                m = state_re.match(line)
461                if m: state = m.group(1)
462                else:
463                    m = no_exp_re.match(line)
464                    if m: state = "none"
465            rv = status.wait()
466
467        # If the experiment is not present the subcommand returns a non-zero
468        # return value.  If we successfully parsed a "none" outcome, ignore the
469        # return code.
470        if rv != 0 and state != "none":
471            raise service_error(service_error.internal,
472                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
473
474        self.log.debug("[start_segment]: %s: %s" % (tb, state))
475        self.log.info("[start_segment]:transferring experiment to %s" % tb)
476
477        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
478            return False
479        # Clear the federation files
480        if not self.ssh_cmd(user, host, 
481                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
482            return False
483        if not self.ssh_cmd(user, host, 
484                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
485            return False
486        # Clear and create the tarfiles and rpm directories
487        for d in (tarfiles_dir, rpms_dir):
488            if not self.ssh_cmd(user, host, 
489                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
490                return False
491            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
492                    "create tarfiles"):
493                return False
494       
495        if state == 'active':
496            # Remote experiment is active.  Modify it.
497            for f in base_confs:
498                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
499                        "%s/%s" % (proj_dir, f)):
500                    return False
501            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
502                    proj_dir):
503                return False
504            if os.path.isdir("%s/tarfiles" % tmpdir):
505                if not self.ship_configs(host, user,
506                        "%s/tarfiles" % tmpdir, tarfiles_dir):
507                    return False
508            if os.path.isdir("%s/rpms" % tmpdir):
509                if not self.ship_configs(host, user,
510                        "%s/rpms" % tmpdir, tarfiles_dir):
511                    return False
512            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
513            if not self.ssh_cmd(user, host,
514                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
515                            (pid, eid, tclfile), "modexp"):
516                return False
517            return True
518        elif state == "swapped":
519            # Remote experiment swapped out.  Modify it and swap it in.
520            for f in base_confs:
521                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
522                        "%s/%s" % (proj_dir, f)):
523                    return False
524            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
525                    proj_dir):
526                return False
527            if os.path.isdir("%s/tarfiles" % tmpdir):
528                if not self.ship_configs(host, user,
529                        "%s/tarfiles" % tmpdir, tarfiles_dir):
530                    return False
531            if os.path.isdir("%s/rpms" % tmpdir):
532                if not self.ship_configs(host, user,
533                        "%s/rpms" % tmpdir, tarfiles_dir):
534                    return False
535            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
536            if not self.ssh_cmd(user, host,
537                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
538                    "modexp"):
539                return False
540            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
541            if not self.ssh_cmd(user, host,
542                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
543                    "swapexp"):
544                return False
545            return True
546        elif state == "none":
547            # No remote experiment.  Create one.  We do this in 2 steps so we
548            # can put the configuration files and scripts into the new
549            # experiment directories.
550
551            # Tarfiles must be present for creation to work
552            if os.path.isdir("%s/tarfiles" % tmpdir):
553                if not self.ship_configs(host, user,
554                        "%s/tarfiles" % tmpdir, tarfiles_dir):
555                    return False
556            if os.path.isdir("%s/rpms" % tmpdir):
557                if not self.ship_configs(host, user,
558                        "%s/rpms" % tmpdir, tarfiles_dir):
559                    return False
560            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
561            if not self.ssh_cmd(user, host,
562                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
563                            (pid, eid, tclfile), "startexp"):
564                return False
565            # After startexp the per-experiment directories exist
566            for f in base_confs:
567                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
568                        "%s/%s" % (proj_dir, f)):
569                    return False
570            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
571                    proj_dir):
572                return False
573            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
574            if not self.ssh_cmd(user, host,
575                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
576                    "swapexp"):
577                return False
578            return True
579        else:
580            self.log.debug("[start_segment]:unknown state %s" % state)
581            return False
582
583    def stop_segment(self, tb, eid, tbparams):
584        """
585        Stop a sub experiment by calling swapexp on the federant
586        """
587        user = tbparams[tb]['user']
588        host = tbparams[tb]['host']
589        pid = tbparams[tb]['project']
590
591        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
592        return self.ssh_cmd(user, host,
593                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
594
595       
596    def generate_ssh_keys(self, dest, type="rsa" ):
597        """
598        Generate a set of keys for the gateways to use to talk.
599
600        Keys are of type type and are stored in the required dest file.
601        """
602        valid_types = ("rsa", "dsa")
603        t = type.lower();
604        if t not in valid_types: raise ValueError
605        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
606
607        try:
608            trace = open("/dev/null", "w")
609        except IOError:
610            raise service_error(service_error.internal,
611                    "Cannot open /dev/null??");
612
613        # May raise CalledProcessError
614        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
615        rv = call(cmd, stdout=trace, stderr=trace)
616        if rv != 0:
617            raise service_error(service_error.internal, 
618                    "Cannot generate nonce ssh keys.  %s return code %d" \
619                            % (self.ssh_keygen, rv))
620
621    def gentopo(self, str):
622        """
623        Generate the topology dtat structure from the splitter's XML
624        representation of it.
625
626        The topology XML looks like:
627            <experiment>
628                <nodes>
629                    <node><vname></vname><ips>ip1:ip2</ips></node>
630                </nodes>
631                <lans>
632                    <lan>
633                        <vname></vname><vnode></vnode><ip></ip>
634                        <bandwidth></bandwidth><member>node:port</member>
635                    </lan>
636                </lans>
637        """
638        class topo_parse:
639            """
640            Parse the topology XML and create the dats structure.
641            """
642            def __init__(self):
643                # Typing of the subelements for data conversion
644                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
645                self.int_subelements = ( 'bandwidth',)
646                self.float_subelements = ( 'delay',)
647                # The final data structure
648                self.nodes = [ ]
649                self.lans =  [ ]
650                self.topo = { \
651                        'node': self.nodes,\
652                        'lan' : self.lans,\
653                    }
654                self.element = { }  # Current element being created
655                self.chars = ""     # Last text seen
656
657            def end_element(self, name):
658                # After each sub element the contents is added to the current
659                # element or to the appropriate list.
660                if name == 'node':
661                    self.nodes.append(self.element)
662                    self.element = { }
663                elif name == 'lan':
664                    self.lans.append(self.element)
665                    self.element = { }
666                elif name in self.str_subelements:
667                    self.element[name] = self.chars
668                    self.chars = ""
669                elif name in self.int_subelements:
670                    self.element[name] = int(self.chars)
671                    self.chars = ""
672                elif name in self.float_subelements:
673                    self.element[name] = float(self.chars)
674                    self.chars = ""
675
676            def found_chars(self, data):
677                self.chars += data.rstrip()
678
679
680        tp = topo_parse();
681        parser = xml.parsers.expat.ParserCreate()
682        parser.EndElementHandler = tp.end_element
683        parser.CharacterDataHandler = tp.found_chars
684
685        parser.Parse(str)
686
687        return tp.topo
688       
689
690    def genviz(self, topo):
691        """
692        Generate the visualization the virtual topology
693        """
694
695        neato = "/usr/local/bin/neato"
696        # These are used to parse neato output and to create the visualization
697        # file.
698        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
699        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
700                "%s</type></node>"
701
702        try:
703            # Node names
704            nodes = [ n['vname'] for n in topo['node'] ]
705            topo_lans = topo['lan']
706        except KeyError:
707            raise service_error(service_error.internal, "Bad topology")
708
709        lans = { }
710        links = { }
711
712        # Walk through the virtual topology, organizing the connections into
713        # 2-node connections (links) and more-than-2-node connections (lans).
714        # When a lan is created, it's added to the list of nodes (there's a
715        # node in the visualization for the lan).
716        for l in topo_lans:
717            if links.has_key(l['vname']):
718                if len(links[l['vname']]) < 2:
719                    links[l['vname']].append(l['vnode'])
720                else:
721                    nodes.append(l['vname'])
722                    lans[l['vname']] = links[l['vname']]
723                    del links[l['vname']]
724                    lans[l['vname']].append(l['vnode'])
725            elif lans.has_key(l['vname']):
726                lans[l['vname']].append(l['vnode'])
727            else:
728                links[l['vname']] = [ l['vnode'] ]
729
730
731        # Open up a temporary file for dot to turn into a visualization
732        try:
733            df, dotname = tempfile.mkstemp()
734            dotfile = os.fdopen(df, 'w')
735        except IOError:
736            raise service_error(service_error.internal,
737                    "Failed to open file in genviz")
738
739        # Generate a dot/neato input file from the links, nodes and lans
740        try:
741            print >>dotfile, "graph G {"
742            for n in nodes:
743                print >>dotfile, '\t"%s"' % n
744            for l in links.keys():
745                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
746            for l in lans.keys():
747                for n in lans[l]:
748                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
749            print >>dotfile, "}"
750            dotfile.close()
751        except TypeError:
752            raise service_error(service_error.internal,
753                    "Single endpoint link in vtopo")
754        except IOError:
755            raise service_error(service_error.internal, "Cannot write dot file")
756
757        # Use dot to create a visualization
758        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
759                '-Gpack=true', dotname], stdout=PIPE)
760
761        # Translate dot to vis format
762        vis_nodes = [ ]
763        vis = { 'node': vis_nodes }
764        for line in dot.stdout:
765            m = vis_re.match(line)
766            if m:
767                vn = m.group(1)
768                vis_node = {'name': vn, \
769                        'x': float(m.group(2)),\
770                        'y' : float(m.group(3)),\
771                    }
772                if vn in links.keys() or vn in lans.keys():
773                    vis_node['type'] = 'lan'
774                else:
775                    vis_node['type'] = 'node'
776                vis_nodes.append(vis_node)
777        rv = dot.wait()
778
779        os.remove(dotname)
780        if rv == 0 : return vis
781        else: return None
782
783    def get_access(self, tb, nodes, user, tbparam, master, export_project):
784        """
785        Get access to testbed through fedd and set the parameters for that tb
786        """
787
788        translate_attr = {
789            'slavenodestartcmd': 'expstart',
790            'slaveconnectorstartcmd': 'gwstart',
791            'masternodestartcmd': 'mexpstart',
792            'masterconnectorstartcmd': 'mgwstart',
793            'connectorimage': 'gwimage',
794            'connectortype': 'gwtype',
795            'tunnelcfg': 'tun',
796            'smbshare': 'smbshare',
797        }
798
799        uri = self.tbmap.get(tb, None)
800        if not uri:
801            raise service_error(serice_error.server_config, 
802                    "Unknown testbed: %s" % tb)
803
804        # The basic request
805        req = {\
806                'destinationTestbed' : { 'uri' : uri },
807                'user':  user,
808                'allocID' : { 'localname': 'test' },
809                # XXX: need to get service access stright
810                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
811                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
812            }
813
814        if tb == master:
815            # NB, the export_project parameter is a dict that includes
816            # the type
817            req['exportProject'] = export_project
818
819        # node resources if any
820        if nodes != None and len(nodes) > 0:
821            rnodes = [ ]
822            for n in nodes:
823                rn = { }
824                image, hw, count = n.split(":")
825                if image: rn['image'] = [ image ]
826                if hw: rn['hardware'] = [ hw ]
827                if count: rn['count'] = int(count)
828                rnodes.append(rn)
829            req['resources']= { }
830            req['resources']['node'] = rnodes
831
832        r = self.call_RequestAccess(uri, req, 
833                self.cert_file, self.cert_pwd, self.trusted_certs)
834
835        if r.has_key('RequestAccessResponseBody'):
836            r = r['RequestAccessResponseBody']
837        else:
838            raise service_error(service_error.protocol,
839                    "Bad proxy response")
840
841        e = r['emulab']
842        p = e['project']
843        tbparam[tb] = { 
844                "boss": e['boss'],
845                "host": e['ops'],
846                "domain": e['domain'],
847                "fs": e['fileServer'],
848                "eventserver": e['eventServer'],
849                "project": unpack_id(p['name']),
850                "emulab" : e,
851                "allocID" : r['allocID'],
852                }
853        # Make the testbed name be the label the user applied
854        p['testbed'] = {'localname': tb }
855
856        for u in p['user']:
857            tbparam[tb]['user'] = unpack_id(u['userID'])
858
859        for a in e['fedAttr']:
860            if a['attribute']:
861                key = translate_attr.get(a['attribute'].lower(), None)
862                if key:
863                    tbparam[tb][key]= a['value']
864       
865    def release_access(self, tb, aid):
866        """
867        Release access to testbed through fedd
868        """
869
870        uri = self.tbmap.get(tb, None)
871        if not uri:
872            raise service_error(serice_error.server_config, 
873                    "Unknown testbed: %s" % tb)
874
875        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
876                self.cert_file, self.cert_pwd, self.trusted_certs)
877
878        # better error coding
879
880    def remote_splitter(self, uri, desc, master):
881
882        req = {
883                'description' : { 'ns2description': desc },
884                'master': master,
885                'include_fedkit': bool(self.fedkit)
886            }
887
888        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
889                self.trusted_certs)
890
891        if r.has_key('Ns2SplitResponseBody'):
892            r = r['Ns2SplitResponseBody']
893            if r.has_key('output'):
894                return r['output'].splitlines()
895            else:
896                raise service_error(service_error.protocol, 
897                        "Bad splitter response (no output)")
898        else:
899            raise service_error(service_error.protocol, "Bad splitter response")
900       
901    class current_testbed:
902        """
903        Object for collecting the current testbed description.  The testbed
904        description is saved to a file with the local testbed variables
905        subsittuted line by line.
906        """
907        def __init__(self, eid, tmpdir, fedkit):
908            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
909            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
910            self.current_testbed = None
911            self.testbed_file = None
912
913            self.def_expstart = \
914                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
915            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
916            self.def_gwstart = \
917                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
918            self.def_mgwstart = \
919                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
920            self.def_gwimage = "FBSD61-TUNNEL2";
921            self.def_gwtype = "pc";
922
923            self.eid = eid
924            self.tmpdir = tmpdir
925            self.fedkit = fedkit
926
927        def __call__(self, line, master, allocated, tbparams):
928            # Capture testbed topology descriptions
929            if self.current_testbed == None:
930                m = self.begin_testbed.match(line)
931                if m != None:
932                    self.current_testbed = m.group(1)
933                    if self.current_testbed == None:
934                        raise service_error(service_error.req,
935                                "Bad request format (unnamed testbed)")
936                    allocated[self.current_testbed] = \
937                            allocated.get(self.current_testbed,0) + 1
938                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
939                    if not os.path.exists(tb_dir):
940                        try:
941                            os.mkdir(tb_dir)
942                        except IOError:
943                            raise service_error(service_error.internal,
944                                    "Cannot create %s" % tb_dir)
945                    try:
946                        self.testbed_file = open("%s/%s.%s.tcl" %
947                                (tb_dir, self.eid, self.current_testbed), 'w')
948                    except IOError:
949                        self.testbed_file = None
950                    return True
951                else: return False
952            else:
953                m = self.end_testbed.match(line)
954                if m != None:
955                    if m.group(1) != self.current_testbed:
956                        raise service_error(service_error.internal, 
957                                "Mismatched testbed markers!?")
958                    if self.testbed_file != None: 
959                        self.testbed_file.close()
960                        self.testbed_file = None
961                    self.current_testbed = None
962                elif self.testbed_file:
963                    # Substitute variables and put the line into the local
964                    # testbed file.
965                    gwtype = tbparams[self.current_testbed].get('gwtype', 
966                            self.def_gwtype)
967                    gwimage = tbparams[self.current_testbed].get('gwimage', 
968                            self.def_gwimage)
969                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
970                            self.def_mgwstart)
971                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
972                            self.def_mexpstart)
973                    gwstart = tbparams[self.current_testbed].get('gwstart', 
974                            self.def_gwstart)
975                    expstart = tbparams[self.current_testbed].get('expstart', 
976                            self.def_expstart)
977                    project = tbparams[self.current_testbed].get('project')
978                    line = re.sub("GWTYPE", gwtype, line)
979                    line = re.sub("GWIMAGE", gwimage, line)
980                    if self.current_testbed == master:
981                        line = re.sub("GWSTART", mgwstart, line)
982                        line = re.sub("EXPSTART", mexpstart, line)
983                    else:
984                        line = re.sub("GWSTART", gwstart, line)
985                        line = re.sub("EXPSTART", expstart, line)
986                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
987                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
988                    line = re.sub("EID", self.eid, line)
989                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
990                            (project, self.eid), line)
991                    if self.fedkit:
992                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
993                                line)
994                    print >>self.testbed_file, line
995                return True
996
997    class allbeds:
998        """
999        Process the Allbeds section.  Get access to each federant and save the
1000        parameters in tbparams
1001        """
1002        def __init__(self, get_access):
1003            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1004            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1005            self.in_allbeds = False
1006            self.get_access = get_access
1007
1008        def __call__(self, line, user, tbparams, master, export_project):
1009            # Testbed access parameters
1010            if not self.in_allbeds:
1011                if self.begin_allbeds.match(line):
1012                    self.in_allbeds = True
1013                    return True
1014                else:
1015                    return False
1016            else:
1017                if self.end_allbeds.match(line):
1018                    self.in_allbeds = False
1019                else:
1020                    nodes = line.split('|')
1021                    tb = nodes.pop(0)
1022                    self.get_access(tb, nodes, user, tbparams, master,
1023                            export_project)
1024                return True
1025
1026    class gateways:
1027        def __init__(self, eid, master, tmpdir, gw_pubkey,
1028                gw_secretkey, copy_file, fedkit):
1029            self.begin_gateways = \
1030                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1031            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1032            self.current_gateways = None
1033            self.control_gateway = None
1034            self.active_end = { }
1035
1036            self.eid = eid
1037            self.master = master
1038            self.tmpdir = tmpdir
1039            self.gw_pubkey_base = gw_pubkey
1040            self.gw_secretkey_base = gw_secretkey
1041
1042            self.copy_file = copy_file
1043            self.fedkit = fedkit
1044
1045
1046        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1047                active_end, tbparams, dtb, myname, desthost, type):
1048            """
1049            Produce a gateway configuration file from a gateways line.
1050            """
1051
1052            sproject = tbparams[gw].get('project', 'project')
1053            dproject = tbparams[dtb].get('project', 'project')
1054            sdomain = ".%s.%s%s" % (eid, sproject,
1055                    tbparams[gw].get('domain', ".example.com"))
1056            ddomain = ".%s.%s%s" % (eid, dproject,
1057                    tbparams[dtb].get('domain', ".example.com"))
1058            boss = tbparams[master].get('boss', "boss")
1059            fs = tbparams[master].get('fs', "fs")
1060            event_server = "%s%s" % \
1061                    (tbparams[gw].get('eventserver', "event_server"),
1062                            tbparams[gw].get('domain', "example.com"))
1063            remote_event_server = "%s%s" % \
1064                    (tbparams[dtb].get('eventserver', "event_server"),
1065                            tbparams[dtb].get('domain', "example.com"))
1066            seer_control = "%s%s" % \
1067                    (tbparams[gw].get('control', "control"), sdomain)
1068
1069            if self.fedkit:
1070                remote_script_dir = "/usr/local/federation/bin"
1071                local_script_dir = "/usr/local/federation/bin"
1072            else:
1073                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1074                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1075
1076            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1077            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1078            tunnel_cfg = tbparams[gw].get("tun", "false")
1079
1080            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1081            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1082
1083            # translate to lower case so the `hostname` hack for specifying
1084            # configuration files works.
1085            conf_file = conf_file.lower();
1086            remote_conf_file = remote_conf_file.lower();
1087
1088            if dtb == master:
1089                active = "false"
1090            elif gw == master:
1091                active = "true"
1092            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1093                active = "false"
1094            else:
1095                active_end['%s-%s' % (gw, dtb)] = 1
1096                active = "true"
1097
1098            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1099            print >>gwconfig, "Active: %s" % active
1100            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1101            print >>gwconfig, "BossName: %s" % boss
1102            print >>gwconfig, "FsName: %s" % fs
1103            print >>gwconfig, "EventServerName: %s" % event_server
1104            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1105            print >>gwconfig, "SeerControl: %s" % seer_control
1106            print >>gwconfig, "Type: %s" % type
1107            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1108            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1109                    local_script_dir
1110            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1111            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1112            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1113                    (remote_conf_dir, remote_conf_file)
1114            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1115            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1116            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1117            gwconfig.close()
1118
1119            return active == "true"
1120
1121        def __call__(self, line, allocated, tbparams):
1122            # Process gateways
1123            if not self.current_gateways:
1124                m = self.begin_gateways.match(line)
1125                if m:
1126                    self.current_gateways = m.group(1)
1127                    if allocated.has_key(self.current_gateways):
1128                        # This test should always succeed
1129                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1130                        if not os.path.exists(tb_dir):
1131                            try:
1132                                os.mkdir(tb_dir)
1133                            except IOError:
1134                                raise service_error(service_error.internal,
1135                                        "Cannot create %s" % tb_dir)
1136                    else:
1137                        # XXX
1138                        self.log.error("[gateways]: Ignoring gateways for " + \
1139                                "unknown testbed %s" % self.current_gateways)
1140                        self.current_gateways = None
1141                    return True
1142                else:
1143                    return False
1144            else:
1145                m = self.end_gateways.match(line)
1146                if m :
1147                    if m.group(1) != self.current_gateways:
1148                        raise service_error(service_error.internal,
1149                                "Mismatched gateway markers!?")
1150                    if self.control_gateway:
1151                        try:
1152                            cc = open("%s/%s/client.conf" %
1153                                    (self.tmpdir, self.current_gateways), 'w')
1154                            print >>cc, "ControlGateway: %s" % \
1155                                    self.control_gateway
1156                            if tbparams[self.master].has_key('smbshare'):
1157                                print >>cc, "SMBSHare: %s" % \
1158                                        tbparams[self.master]['smbshare']
1159                            print >>cc, "ProjectUser: %s" % \
1160                                    tbparams[self.master]['user']
1161                            print >>cc, "ProjectName: %s" % \
1162                                    tbparams[self.master]['project']
1163                            cc.close()
1164                        except IOError:
1165                            raise service_error(service_error.internal,
1166                                    "Error creating client config")
1167                        try:
1168                            cc = open("%s/%s/seer.conf" %
1169                                    (self.tmpdir, self.current_gateways),
1170                                    'w')
1171                            if self.current_gateways != self.master:
1172                                print >>cc, "ControlNode: %s" % \
1173                                        self.control_gateway
1174                            print >>cc, "ExperimentID: %s/%s" % \
1175                                    ( tbparams[self.master]['project'], \
1176                                    self.eid )
1177                            cc.close()
1178                        except IOError:
1179                            raise service_error(service_error.internal,
1180                                    "Error creating seer config")
1181                    else:
1182                        debug.error("[gateways]: No control gateway for %s" %\
1183                                    self.current_gateways)
1184                    self.current_gateways = None
1185                else:
1186                    dtb, myname, desthost, type = line.split(" ")
1187
1188                    if type == "control" or type == "both":
1189                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1190                                self.eid, 
1191                                tbparams[self.current_gateways]['project'],
1192                                tbparams[self.current_gateways]['domain'])
1193                    try:
1194                        active = self.gateway_conf_file(self.current_gateways,
1195                                self.master, self.eid, self.gw_pubkey_base,
1196                                self.gw_secretkey_base,
1197                                self.active_end, tbparams, dtb, myname,
1198                                desthost, type)
1199                    except IOError, e:
1200                        raise service_error(service_error.internal,
1201                                "Failed to write config file for %s" % \
1202                                        self.current_gateway)
1203           
1204                    gw_pubkey = "%s/keys/%s" % \
1205                            (self.tmpdir, self.gw_pubkey_base)
1206                    gw_secretkey = "%s/keys/%s" % \
1207                            (self.tmpdir, self.gw_secretkey_base)
1208
1209                    pkfile = "%s/%s/%s" % \
1210                            ( self.tmpdir, self.current_gateways, 
1211                                    self.gw_pubkey_base)
1212                    skfile = "%s/%s/%s" % \
1213                            ( self.tmpdir, self.current_gateways, 
1214                                    self.gw_secretkey_base)
1215
1216                    if not os.path.exists(pkfile):
1217                        try:
1218                            self.copy_file(gw_pubkey, pkfile)
1219                        except IOError:
1220                            service_error(service_error.internal,
1221                                    "Failed to copy pubkey file")
1222
1223                    if active and not os.path.exists(skfile):
1224                        try:
1225                            self.copy_file(gw_secretkey, skfile)
1226                        except IOError:
1227                            service_error(service_error.internal,
1228                                    "Failed to copy secretkey file")
1229                return True
1230
1231    class shunt_to_file:
1232        """
1233        Simple class to write data between two regexps to a file.
1234        """
1235        def __init__(self, begin, end, filename):
1236            """
1237            Begin shunting on a match of begin, stop on end, send data to
1238            filename.
1239            """
1240            self.begin = re.compile(begin)
1241            self.end = re.compile(end)
1242            self.in_shunt = False
1243            self.file = None
1244            self.filename = filename
1245
1246        def __call__(self, line):
1247            """
1248            Call this on each line in the input that may be shunted.
1249            """
1250            if not self.in_shunt:
1251                if self.begin.match(line):
1252                    self.in_shunt = True
1253                    try:
1254                        self.file = open(self.filename, "w")
1255                    except:
1256                        self.file = None
1257                        raise
1258                    return True
1259                else:
1260                    return False
1261            else:
1262                if self.end.match(line):
1263                    if self.file: 
1264                        self.file.close()
1265                        self.file = None
1266                    self.in_shunt = False
1267                else:
1268                    if self.file:
1269                        print >>self.file, line
1270                return True
1271
1272    class shunt_to_list:
1273        """
1274        Same interface as shunt_to_file.  Data collected in self.list, one list
1275        element per line.
1276        """
1277        def __init__(self, begin, end):
1278            self.begin = re.compile(begin)
1279            self.end = re.compile(end)
1280            self.in_shunt = False
1281            self.list = [ ]
1282       
1283        def __call__(self, line):
1284            if not self.in_shunt:
1285                if self.begin.match(line):
1286                    self.in_shunt = True
1287                    return True
1288                else:
1289                    return False
1290            else:
1291                if self.end.match(line):
1292                    self.in_shunt = False
1293                else:
1294                    self.list.append(line)
1295                return True
1296
1297    class shunt_to_string:
1298        """
1299        Same interface as shunt_to_file.  Data collected in self.str, all in
1300        one string.
1301        """
1302        def __init__(self, begin, end):
1303            self.begin = re.compile(begin)
1304            self.end = re.compile(end)
1305            self.in_shunt = False
1306            self.str = ""
1307       
1308        def __call__(self, line):
1309            if not self.in_shunt:
1310                if self.begin.match(line):
1311                    self.in_shunt = True
1312                    return True
1313                else:
1314                    return False
1315            else:
1316                if self.end.match(line):
1317                    self.in_shunt = False
1318                else:
1319                    self.str += line
1320                return True
1321
1322    def create_experiment(self, req, fid):
1323        """
1324        The external interface to experiment creation called from the
1325        dispatcher.
1326
1327        Creates a working directory, splits the incoming description using the
1328        splitter script and parses out the avrious subsections using the
1329        lcasses above.  Once each sub-experiment is created, use pooled threads
1330        to instantiate them and start it all up.
1331        """
1332        try:
1333            tmpdir = tempfile.mkdtemp(prefix="split-")
1334        except IOError:
1335            raise service_error(service_error.internal, "Cannot create tmp dir")
1336
1337        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1338        gw_secretkey_base = "fed.%s" % self.ssh_type
1339        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1340        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1341        tclfile = tmpdir + "/experiment.tcl"
1342        tbparams = { }
1343
1344        pid = "dummy"
1345        gid = "dummy"
1346        # XXX
1347        fail_soft = False
1348
1349        try:
1350            os.mkdir(tmpdir+"/keys")
1351        except OSError:
1352            raise service_error(service_error.internal,
1353                    "Can't make temporary dir")
1354
1355        req = req.get('CreateRequestBody', None)
1356        if not req:
1357            raise service_error(service_error.req,
1358                    "Bad request format (no CreateRequestBody)")
1359        # The tcl parser needs to read a file so put the content into that file
1360        descr=req.get('experimentdescription', None)
1361        if descr:
1362            file_content=descr.get('ns2description', None)
1363            if file_content:
1364                try:
1365                    f = open(tclfile, 'w')
1366                    f.write(file_content)
1367                    f.close()
1368                except IOError:
1369                    raise service_error(service_error.internal,
1370                            "Cannot write temp experiment description")
1371            else:
1372                raise service_error(service_error.req, 
1373                        "Only ns2descriptions supported")
1374        else:
1375            raise service_error(service_error.req, "No experiment description")
1376
1377        if req.has_key('experimentID') and \
1378                req['experimentID'].has_key('localname'):
1379            eid = req['experimentID']['localname']
1380            self.state_lock.acquire()
1381            while (self.state.has_key(eid)):
1382                eid += random.choice(string.ascii_letters)
1383            # To avoid another thread picking this localname
1384            self.state[eid] = "placeholder"
1385            self.state_lock.release()
1386        else:
1387            eid = self.exp_stem
1388            for i in range(0,5):
1389                eid += random.choice(string.ascii_letters)
1390            self.state_lock.acquire()
1391            while (self.state.has_key(eid)):
1392                eid = self.exp_stem
1393                for i in range(0,5):
1394                    eid += random.choice(string.ascii_letters)
1395            # To avoid another thread picking this localname
1396            self.state[eid] = "placeholder"
1397            self.state_lock.release()
1398
1399        try:
1400            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1401        except ValueError:
1402            raise service_error(service_error.server_config, 
1403                    "Bad key type (%s)" % self.ssh_type)
1404
1405        user = req.get('user', None)
1406        if user == None:
1407            raise service_error(service_error.req, "No user")
1408
1409        master = req.get('master', None)
1410        if not master:
1411            raise service_error(service_error.req, "No master testbed label")
1412        export_project = req.get('exportProject', None)
1413        if not export_project:
1414            raise service_error(service_error.req, "No export project")
1415       
1416        if self.splitter_url:
1417            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1418            split_data = self.remote_splitter(self.splitter_url, file_content,
1419                    master)
1420        else:
1421            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1422                str(self.muxmax), '-m', master]
1423
1424            if self.fedkit:
1425                tclcmd.append('-k')
1426
1427            tclcmd.extend([pid, gid, eid, tclfile])
1428
1429            self.log.debug("running local splitter %s", " ".join(tclcmd))
1430            tclparser = Popen(tclcmd, stdout=PIPE)
1431            split_data = tclparser.stdout
1432
1433        allocated = { }     # Testbeds we can access
1434        started = { }       # Testbeds where a sub-experiment started
1435                            # successfully
1436
1437        # Objects to parse the splitter output (defined above)
1438        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1439        parse_allbeds = self.allbeds(self.get_access)
1440        parse_gateways = self.gateways(eid, master, tmpdir,
1441                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1442        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1443                    "^#\s+End\s+Vtopo")
1444        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1445                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1446        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1447                "^#\s+End\s+tarfiles")
1448        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1449                "^#\s+End\s+rpms")
1450
1451        # Worling on the split data
1452        for line in split_data:
1453            line = line.rstrip()
1454            if parse_current_testbed(line, master, allocated, tbparams):
1455                continue
1456            elif parse_allbeds(line, user, tbparams, master, export_project):
1457                continue
1458            elif parse_gateways(line, allocated, tbparams):
1459                continue
1460            elif parse_vtopo(line):
1461                continue
1462            elif parse_hostnames(line):
1463                continue
1464            elif parse_tarfiles(line):
1465                continue
1466            elif parse_rpms(line):
1467                continue
1468            else:
1469                raise service_error(service_error.internal, 
1470                        "Bad tcl parse? %s" % line)
1471
1472        # Virtual topology and visualization
1473        vtopo = self.gentopo(parse_vtopo.str)
1474        if not vtopo:
1475            raise service_error(service_error.internal, 
1476                    "Failed to generate virtual topology")
1477
1478        vis = self.genviz(vtopo)
1479        if not vis:
1480            raise service_error(service_error.internal, 
1481                    "Failed to generate visualization")
1482
1483        # save federant information
1484        for k in allocated.keys():
1485            tbparams[k]['federant'] = {\
1486                    'name': [ { 'localname' : eid} ],\
1487                    'emulab': tbparams[k]['emulab'],\
1488                    'allocID' : tbparams[k]['allocID'],\
1489                    'master' : k == master,\
1490                }
1491
1492
1493        # Copy tarfiles and rpms needed at remote sites into a staging area
1494        try:
1495            if self.fedkit:
1496                parse_tarfiles.list.append(self.fedkit)
1497            for t in parse_tarfiles.list:
1498                if not os.path.exists("%s/tarfiles" % tmpdir):
1499                    os.mkdir("%s/tarfiles" % tmpdir)
1500                self.copy_file(t, "%s/tarfiles/%s" % \
1501                        (tmpdir, os.path.basename(t)))
1502            for r in parse_rpms.list:
1503                if not os.path.exists("%s/rpms" % tmpdir):
1504                    os.mkdir("%s/rpms" % tmpdir)
1505                self.copy_file(r, "%s/rpms/%s" % \
1506                        (tmpdir, os.path.basename(r)))
1507        except IOError, e:
1508            raise service_error(service_error.internal, 
1509                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1510
1511        thread_pool_info = self.thread_pool()
1512        threads = [ ]
1513
1514        for tb in [ k for k in allocated.keys() if k != master]:
1515            # Wait until we have a free slot to start the next testbed load
1516            thread_pool_info.acquire()
1517            while thread_pool_info.started - \
1518                    thread_pool_info.terminated >= self.nthreads:
1519                thread_pool_info.wait()
1520            thread_pool_info.release()
1521
1522            # Create and start a thread to start the segment, and save it to
1523            # get the return value later
1524            t  = self.pooled_thread(target=self.start_segment, 
1525                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1526                    pdata=thread_pool_info, trace_file=self.trace_file)
1527            threads.append(t)
1528            t.start()
1529
1530        # Wait until all finish (the first clause of the while is to make sure
1531        # one starts)
1532        thread_pool_info.acquire()
1533        while thread_pool_info.started == 0 or \
1534                thread_pool_info.started > thread_pool_info.terminated:
1535            thread_pool_info.wait()
1536        thread_pool_info.release()
1537
1538        # If none failed, start the master
1539        failed = [ t.getName() for t in threads if not t.rv ]
1540
1541        if len(failed) == 0:
1542            if not self.start_segment(master, eid, tbparams, tmpdir):
1543                failed.append(master)
1544
1545        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1546        # If one failed clean up, unless fail_soft is set
1547        if failed:
1548            if not fail_soft:
1549                for tb in succeeded:
1550                    self.stop_segment(tb, eid, tbparams)
1551                # Remove the placeholder
1552                self.state_lock.acquire()
1553                del self.state[eid]
1554                self.state_lock.release()
1555
1556                raise service_error(service_error.federant,
1557                    "Swap in failed on %s" % ",".join(failed))
1558        else:
1559            self.log.info("[start_segment]: Experiment %s started" % eid)
1560
1561        # Generate an ID for the experiment (slice) and a certificate that the
1562        # allocator can use to prove they own it.  We'll ship it back through
1563        # the encrypted connection.
1564        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1565
1566        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1567
1568        # Walk up tmpdir, deleting as we go
1569        for path, dirs, files in os.walk(tmpdir, topdown=False):
1570            for f in files:
1571                os.remove(os.path.join(path, f))
1572            for d in dirs:
1573                os.rmdir(os.path.join(path, d))
1574        os.rmdir(tmpdir)
1575
1576        # The deepcopy prevents the allocation ID and other binaries from being
1577        # translated into other formats
1578        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1579                for tb in tbparams.keys() \
1580                    if tbparams[tb].has_key('federant') ],\
1581                    'vtopo': vtopo,\
1582                    'vis' : vis,
1583                    'experimentID' : [\
1584                            { 'fedid': copy.copy(expid) }, \
1585                            { 'localname': eid },\
1586                        ],\
1587                    'experimentAccess': { 'X509' : expcert },\
1588                }
1589
1590        # Insert the experiment into our state and update the disk copy
1591        self.state_lock.acquire()
1592        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1593                for tb in tbparams.keys() \
1594                    if tbparams[tb].has_key('federant') ],\
1595                    'vtopo': vtopo,\
1596                    'vis' : vis,
1597                    'experimentID' : [\
1598                            { 'fedid': expid }, { 'localname': eid },\
1599                        ],\
1600                }
1601        self.state[eid] = self.state[expid]
1602        if self.state_filename: self.write_state()
1603        self.state_lock.release()
1604
1605        if not failed:
1606            return resp
1607        else:
1608            raise service_error(service_error.partial, \
1609                    "Partial swap in on %s" % ",".join(succeeded))
1610
1611
1612    def get_vtopo(self, req, fid):
1613        """
1614        Return the stored virtual topology for this experiment
1615        """
1616        rv = None
1617
1618        req = req.get('VtopoRequestBody', None)
1619        if not req:
1620            raise service_error(service_error.req,
1621                    "Bad request format (no VtopoRequestBody)")
1622        exp = req.get('experiment', None)
1623        if exp:
1624            if exp.has_key('fedid'):
1625                key = exp['fedid']
1626                keytype = "fedid"
1627            elif exp.has_key('localname'):
1628                key = exp['localname']
1629                keytype = "localname"
1630            else:
1631                raise service_error(service_error.req, "Unknown lookup type")
1632        else:
1633            raise service_error(service_error.req, "No request?")
1634
1635        self.state_lock.acquire()
1636        if self.state.has_key(key):
1637            rv = { 'experiment' : {keytype: key },\
1638                    'vtopo': self.state[key]['vtopo'],\
1639                }
1640        self.state_lock.release()
1641
1642        if rv: return rv
1643        else: raise service_error(service_error.req, "No such experiment")
1644
1645    def get_vis(self, req, fid):
1646        """
1647        Return the stored visualization for this experiment
1648        """
1649        rv = None
1650
1651        req = req.get('VisRequestBody', None)
1652        if not req:
1653            raise service_error(service_error.req,
1654                    "Bad request format (no VisRequestBody)")
1655        exp = req.get('experiment', None)
1656        if exp:
1657            if exp.has_key('fedid'):
1658                key = exp['fedid']
1659                keytype = "fedid"
1660            elif exp.has_key('localname'):
1661                key = exp['localname']
1662                keytype = "localname"
1663            else:
1664                raise service_error(service_error.req, "Unknown lookup type")
1665        else:
1666            raise service_error(service_error.req, "No request?")
1667
1668        self.state_lock.acquire()
1669        if self.state.has_key(key):
1670            rv =  { 'experiment' : {keytype: key },\
1671                    'vis': self.state[key]['vis'],\
1672                    }
1673        self.state_lock.release()
1674
1675        if rv: return rv
1676        else: raise service_error(service_error.req, "No such experiment")
1677
1678    def get_info(self, req, fid):
1679        """
1680        Return all the stored info about this experiment
1681        """
1682        rv = None
1683
1684        req = req.get('InfoRequestBody', None)
1685        if not req:
1686            raise service_error(service_error.req,
1687                    "Bad request format (no VisRequestBody)")
1688        exp = req.get('experiment', None)
1689        if exp:
1690            if exp.has_key('fedid'):
1691                key = exp['fedid']
1692                keytype = "fedid"
1693            elif exp.has_key('localname'):
1694                key = exp['localname']
1695                keytype = "localname"
1696            else:
1697                raise service_error(service_error.req, "Unknown lookup type")
1698        else:
1699            raise service_error(service_error.req, "No request?")
1700
1701        # The state may be massaged by the service function that called
1702        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1703        # state.
1704        self.state_lock.acquire()
1705        if self.state.has_key(key):
1706            rv = copy.deepcopy(self.state[key])
1707        self.state_lock.release()
1708
1709        if rv: return rv
1710        else: raise service_error(service_error.req, "No such experiment")
1711
1712
1713    def terminate_experiment(self, req, fid):
1714        """
1715        Swap this experiment out on the federants and delete the shared
1716        information
1717        """
1718        tbparams = { }
1719        req = req.get('TerminateRequestBody', None)
1720        if not req:
1721            raise service_error(service_error.req,
1722                    "Bad request format (no TerminateRequestBody)")
1723        exp = req.get('experiment', None)
1724        if exp:
1725            if exp.has_key('fedid'):
1726                key = exp['fedid']
1727                keytype = "fedid"
1728            elif exp.has_key('localname'):
1729                key = exp['localname']
1730                keytype = "localname"
1731            else:
1732                raise service_error(service_error.req, "Unknown lookup type")
1733        else:
1734            raise service_error(service_error.req, "No request?")
1735
1736        self.state_lock.acquire()
1737        fed_exp = self.state.get(key, None)
1738
1739        if fed_exp:
1740            # This branch of the conditional holds the lock to generate a
1741            # consistent temporary tbparams variable to deallocate experiments.
1742            # It releases the lock to do the deallocations and reacquires it to
1743            # remove the experiment state when the termination is complete.
1744            ids = []
1745            #  experimentID is a list of dicts that are self-describing
1746            #  identifiers.  This finds all the fedids and localnames - the
1747            #  keys of self.state - and puts them into ids.
1748            for id in fed_exp.get('experimentID', []):
1749                if id.has_key('fedid'): ids.append(id['fedid'])
1750                if id.has_key('localname'): ids.append(id['localname'])
1751
1752            # Construct enough of the tbparams to make the stop_segment calls
1753            # work
1754            for fed in fed_exp['federant']:
1755                try:
1756                    for e in fed['name']:
1757                        eid = e.get('localname', None)
1758                        if eid: break
1759                    else:
1760                        continue
1761
1762                    p = fed['emulab']['project']
1763
1764                    project = p['name']['localname']
1765                    tb = p['testbed']['localname']
1766                    user = p['user'][0]['userID']['localname']
1767
1768                    domain = fed['emulab']['domain']
1769                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1770                    aid = fed['allocID']
1771                except KeyError, e:
1772                    continue
1773                tbparams[tb] = {\
1774                        'user': user,\
1775                        'domain': domain,\
1776                        'project': project,\
1777                        'host': host,\
1778                        'eid': eid,\
1779                        'aid': aid,\
1780                    }
1781            self.state_lock.release()
1782
1783            # Stop everyone.
1784            for tb in tbparams.keys():
1785                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1786
1787            # release the allocations
1788            for tb in tbparams.keys():
1789                self.release_access(tb, tbparams[tb]['aid'])
1790
1791            # Remove the terminated experiment
1792            self.state_lock.acquire()
1793            for id in ids:
1794                if self.state.has_key(id): del self.state[id]
1795
1796            if self.state_filename: self.write_state()
1797            self.state_lock.release()
1798
1799            return { 'experiment': exp }
1800        else:
1801            # Don't forget to release the lock
1802            self.state_lock.release()
1803            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.