source: fedd/fedd_experiment_control.py @ 9c166cf

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 9c166cf was 2c6128f, checked in by Ted Faber <faber@…>, 16 years ago

Add support for a real fedkit tar file rather than the ad hoc script stuff.

  • Property mode set to 100644
File size: 52.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47   
48    class thread_pool:
49        """
50        A class to keep track of a set of threads all invoked for the same
51        task.  Manages the mutual exclusion of the states.
52        """
53        def __init__(self):
54            """
55            Start a pool.
56            """
57            self.changed = Condition()
58            self.started = 0
59            self.terminated = 0
60
61        def acquire(self):
62            """
63            Get the pool's lock.
64            """
65            self.changed.acquire()
66
67        def release(self):
68            """
69            Release the pool's lock.
70            """
71            self.changed.release()
72
73        def wait(self, timeout = None):
74            """
75            Wait for a pool thread to start or stop.
76            """
77            self.changed.wait(timeout)
78
79        def start(self):
80            """
81            Called by a pool thread to report starting.
82            """
83            self.changed.acquire()
84            self.started += 1
85            self.changed.notifyAll()
86            self.changed.release()
87
88        def terminate(self):
89            """
90            Called by a pool thread to report finishing.
91            """
92            self.changed.acquire()
93            self.terminated += 1
94            self.changed.notifyAll()
95            self.changed.release()
96
97        def clear(self):
98            """
99            Clear all pool data.
100            """
101            self.changed.acquire()
102            self.started = 0
103            self.terminated =0
104            self.changed.notifyAll()
105            self.changed.release()
106
107    class pooled_thread(Thread):
108        """
109        One of a set of threads dedicated to a specific task.  Uses the
110        thread_pool class above for coordination.
111        """
112        def __init__(self, group=None, target=None, name=None, args=(), 
113                kwargs={}, pdata=None, trace_file=None):
114            Thread.__init__(self, group, target, name, args, kwargs)
115            self.rv = None          # Return value of the ops in this thread
116            self.exception = None   # Exception that terminated this thread
117            self.target=target      # Target function to run on start()
118            self.args = args        # Args to pass to target
119            self.kwargs = kwargs    # Additional kw args
120            self.pdata = pdata      # thread_pool for this class
121            # Logger for this thread
122            self.log = logging.getLogger("fedd.experiment_control")
123       
124        def run(self):
125            """
126            Emulate Thread.run, except add pool data manipulation and error
127            logging.
128            """
129            if self.pdata:
130                self.pdata.start()
131
132            if self.target:
133                try:
134                    self.rv = self.target(*self.args, **self.kwargs)
135                except service_error, s:
136                    self.exception = s
137                    self.log.error("Thread exception: %s %s" % \
138                            (s.code_string(), s.desc))
139                except:
140                    self.exception = sys.exc_info()[1]
141                    self.log.error(("Unexpected thread exception: %s" +\
142                            "Trace %s") % (self.exception,\
143                                traceback.format_exc()))
144            if self.pdata:
145                self.pdata.terminate()
146
147    def __init__(self, config=None):
148        """
149        Intialize the various attributes, most from the config object
150        """
151        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
152        self.thread_pool = fedd_experiment_control_local.thread_pool
153
154        self.cert_file = None
155        self.cert_pwd = None
156        self.trusted_certs = None
157
158        # Walk through the various relevant certificat specifying config
159        # attributes until the local certificate attributes can be resolved.
160        # The walk is from most specific to most general specification.
161        for s in ("experiment_control", "globals"):
162            if config.has_section(s): 
163                if config.has_option(s, "cert_file"):
164                    if not self.cert_file:
165                        self.cert_file = config.get(s, "cert_file")
166                        self.cert_pwd = config.get(s, "cert_pwd")
167
168                if config.has_option(s, "trusted_certs"):
169                    if not self.trusted_certs:
170                        self.trusted_certs = config.get(s, "trusted_certs")
171
172        self.exp_stem = "fed-stem"
173        self.log = logging.getLogger("fedd.experiment_control")
174        self.muxmax = 2
175        self.nthreads = 2
176        self.randomize_experiments = False
177
178        self.scp_exec = "/usr/bin/scp"
179        self.splitter = None
180        self.ssh_exec="/usr/bin/ssh"
181        self.ssh_keygen = "/usr/bin/ssh-keygen"
182        self.ssh_identity_file = None
183
184        if config.has_section("experiment_control"):
185            self.debug = config.get("experiment_control", "create_debug")
186            self.state_filename = config.get("experiment_control", 
187                    "experiment_state_file")
188            self.splitter_url = config.get("experiment_control", "splitter_url")
189            self.fedkit = config.get("experiment_control", "fedkit")
190        else:
191            self.debug = False
192            self.state_filename = None
193            self.splitter_url = None
194            self.fedkit = None
195
196        # XXX
197        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
198        self.ssh_type = "rsa"
199        self.state = { }
200        self.state_lock = Lock()
201        self.tclsh = "/usr/local/bin/otclsh"
202        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
203        self.tbmap = { 
204                'deter':'https://users.isi.deterlab.net:23235',
205                'emulab':'https://users.isi.deterlab.net:23236',
206                'ucb':'https://users.isi.deterlab.net:23237',
207                }
208        self.trace_file = sys.stderr
209
210        self.def_expstart = \
211                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
212                "/tmp/federate";
213        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
214                "FEDDIR/hosts";
215        self.def_gwstart = \
216                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
217                "/tmp/bridge.log";
218        self.def_mgwstart = \
219                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
220                "/tmp/bridge.log";
221        self.def_gwimage = "FBSD61-TUNNEL2";
222        self.def_gwtype = "pc";
223
224
225        if self.ssh_pubkey_file:
226            try:
227                f = open(self.ssh_pubkey_file, 'r')
228                self.ssh_pubkey = f.read()
229                f.close()
230            except IOError:
231                raise service_error(service_error.internal,
232                        "Cannot read sshpubkey")
233
234        set_log_level(config, "experiment_control", self.log)
235
236        # Grab saved state.  OK to do this w/o locking because it's read only
237        # and only one thread should be in existence that can see self.state at
238        # this point.
239        if self.state_filename:
240            self.read_state()
241
242        # Dispatch tables
243        self.soap_services = {\
244                'Create': make_soap_handler(\
245                        CreateRequestMessage.typecode,
246                        getattr(self, "create_experiment"), 
247                        CreateResponseMessage,
248                        "CreateResponseBody"),
249                'Vtopo': make_soap_handler(\
250                        VtopoRequestMessage.typecode,
251                        getattr(self, "get_vtopo"),
252                        VtopoResponseMessage,
253                        "VtopoResponseBody"),
254                'Vis': make_soap_handler(\
255                        VisRequestMessage.typecode,
256                        getattr(self, "get_vis"),
257                        VisResponseMessage,
258                        "VisResponseBody"),
259                'Info': make_soap_handler(\
260                        InfoRequestMessage.typecode,
261                        getattr(self, "get_info"),
262                        InfoResponseMessage,
263                        "InfoResponseBody"),
264                'Terminate': make_soap_handler(\
265                        TerminateRequestMessage.typecode,
266                        getattr(self, "terminate_experiment"),
267                        TerminateResponseMessage,
268                        "TerminateResponseBody"),
269        }
270
271        self.xmlrpc_services = {\
272                'Create': make_xmlrpc_handler(\
273                        getattr(self, "create_experiment"), 
274                        "CreateResponseBody"),
275                'Vtopo': make_xmlrpc_handler(\
276                        getattr(self, "get_vtopo"),
277                        "VtopoResponseBody"),
278                'Vis': make_xmlrpc_handler(\
279                        getattr(self, "get_vis"),
280                        "VisResponseBody"),
281                'Info': make_xmlrpc_handler(\
282                        getattr(self, "get_info"),
283                        "InfoResponseBody"),
284                'Terminate': make_xmlrpc_handler(\
285                        getattr(self, "terminate_experiment"),
286                        "TerminateResponseBody"),
287        }
288
289    def copy_file(self, src, dest, size=1024):
290        """
291        Exceedingly simple file copy.
292        """
293        s = open(src,'r')
294        d = open(dest, 'w')
295
296        buf = "x"
297        while buf != "":
298            buf = s.read(size)
299            d.write(buf)
300        s.close()
301        d.close()
302
303    # Call while holding self.state_lock
304    def write_state(self):
305        """
306        Write a new copy of experiment state after copying the existing state
307        to a backup.
308
309        State format is a simple pickling of the state dictionary.
310        """
311        if os.access(self.state_filename, os.W_OK):
312            self.copy_file(self.state_filename, \
313                    "%s.bak" % self.state_filename)
314        try:
315            f = open(self.state_filename, 'w')
316            pickle.dump(self.state, f)
317        except IOError, e:
318            self.log.error("Can't write file %s: %s" % \
319                    (self.state_filename, e))
320        except pickle.PicklingError, e:
321            self.log.error("Pickling problem: %s" % e)
322
323    # Call while holding self.state_lock
324    def read_state(self):
325        """
326        Read a new copy of experiment state.  Old state is overwritten.
327
328        State format is a simple pickling of the state dictionary.
329        """
330        try:
331            f = open(self.state_filename, "r")
332            self.state = pickle.load(f)
333            self.log.debug("[read_state]: Read state from %s" % \
334                    self.state_filename)
335        except IOError, e:
336            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
337                    % (self.state_filename, e))
338        except pickle.UnpicklingError, e:
339            self.log.warning(("[read_state]: No saved state: " + \
340                    "Unpickling failed: %s") % e)
341
342    def scp_file(self, file, user, host, dest=""):
343        """
344        scp a file to the remote host.  If debug is set the action is only
345        logged.
346        """
347
348        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
349        rv = 0
350
351        try:
352            dnull = open("/dev/null", "r")
353        except IOError:
354            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
355            dnull = Null
356
357        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
358        if not self.debug:
359            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
360            else: rv = call(scp_cmd)
361
362        return rv == 0
363
364    def ssh_cmd(self, user, host, cmd, wname=None):
365        """
366        Run a remote command on host as user.  If debug is set, the action is
367        only logged.
368        """
369        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
370
371        try:
372            dnull = open("/dev/null", "r")
373        except IOError:
374            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
375            dnull = Null
376
377        self.log.debug("[ssh_cmd]: %s" % sh_str)
378        if not self.debug:
379            if dnull:
380                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
381            else:
382                sub = Popen(sh_str, shell=True)
383            return sub.wait() == 0
384        else:
385            return True
386
387    def ship_configs(self, host, user, src_dir, dest_dir):
388        """
389        Copy federant-specific configuration files to the federant.
390        """
391        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
392            return False
393        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
394            return False
395
396        for f in os.listdir(src_dir):
397            if os.path.isdir(f):
398                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
399                        "%s/%s" % (dest_dir, f)):
400                    return False
401            else:
402                if not self.scp_file("%s/%s" % (src_dir, f), 
403                        user, host, dest_dir):
404                    return False
405        return True
406
407    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
408        """
409        Start a sub-experiment on a federant.
410
411        Get the current state, modify or create as appropriate, ship data and
412        configs and start the experiment.  There are small ordering differences
413        based on the initial state of the sub-experiment.
414        """
415        # ops node in the federant
416        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
417        user = tbparams[tb]['user']     # federant user
418        pid = tbparams[tb]['project']   # federant project
419        # XXX
420        base_confs = ( "hosts",)
421        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
422        # command to test experiment state
423        expinfo_exec = "/usr/testbed/bin/expinfo" 
424        # Configuration directories on the remote machine
425        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
426        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
427        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
428        # Regular expressions to parse the expinfo response
429        state_re = re.compile("State:\s+(\w+)")
430        no_exp_re = re.compile("^No\s+such\s+experiment")
431        state = None    # Experiment state parsed from expinfo
432        # The expinfo ssh command
433        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
434
435        # Get status
436        self.log.debug("[start_segment]: %s"% " ".join(cmd))
437        dev_null = None
438        try:
439            dev_null = open("/dev/null", "a")
440        except IOError, e:
441            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
442
443        status = Popen(cmd, stdout=PIPE, stderr=dev_null)
444        for line in status.stdout:
445            m = state_re.match(line)
446            if m: state = m.group(1)
447            else:
448                m = no_exp_re.match(line)
449                if m: state = "none"
450        rv = status.wait()
451
452        # If the experiment is not present the subcommand returns a non-zero
453        # return value.  If we successfully parsed a "none" outcome, ignore the
454        # return code.
455        if rv != 0 and state != "none":
456            raise service_error(service_error.internal,
457                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
458
459        self.log.debug("[start_segment]: %s: %s" % (tb, state))
460        self.log.info("[start_segment]:transferring experiment to %s" % tb)
461
462        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
463            return False
464        # Clear the federation files
465        if not self.ssh_cmd(user, host, 
466                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
467            return False
468        if not self.ssh_cmd(user, host, 
469                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
470            return False
471        # Clear and create the tarfiles and rpm directories
472        for d in (tarfiles_dir, rpms_dir):
473            if not self.ssh_cmd(user, host, 
474                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
475                return False
476            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
477                    "create tarfiles"):
478                return False
479       
480        if state == 'active':
481            # Remote experiment is active.  Modify it.
482            for f in base_confs:
483                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
484                        "%s/%s" % (proj_dir, f)):
485                    return False
486            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
487                    proj_dir):
488                return False
489            if os.path.isdir("%s/tarfiles" % tmpdir):
490                if not self.ship_configs(host, user,
491                        "%s/tarfiles" % tmpdir, tarfiles_dir):
492                    return False
493            if os.path.isdir("%s/rpms" % tmpdir):
494                if not self.ship_configs(host, user,
495                        "%s/rpms" % tmpdir, tarfiles_dir):
496                    return False
497            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
498            if not self.ssh_cmd(user, host,
499                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
500                            (pid, eid, tclfile), "modexp"):
501                return False
502            return True
503        elif state == "swapped":
504            # Remote experiment swapped out.  Modify it and swap it in.
505            for f in base_confs:
506                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
507                        "%s/%s" % (proj_dir, f)):
508                    return False
509            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
510                    proj_dir):
511                return False
512            if os.path.isdir("%s/tarfiles" % tmpdir):
513                if not self.ship_configs(host, user,
514                        "%s/tarfiles" % tmpdir, tarfiles_dir):
515                    return False
516            if os.path.isdir("%s/rpms" % tmpdir):
517                if not self.ship_configs(host, user,
518                        "%s/rpms" % tmpdir, tarfiles_dir):
519                    return False
520            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
521            if not self.ssh_cmd(user, host,
522                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
523                    "modexp"):
524                return False
525            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
526            if not self.ssh_cmd(user, host,
527                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
528                    "swapexp"):
529                return False
530            return True
531        elif state == "none":
532            # No remote experiment.  Create one.  We do this in 2 steps so we
533            # can put the configuration files and scripts into the new
534            # experiment directories.
535
536            # Tarfiles must be present for creation to work
537            if os.path.isdir("%s/tarfiles" % tmpdir):
538                if not self.ship_configs(host, user,
539                        "%s/tarfiles" % tmpdir, tarfiles_dir):
540                    return False
541            if os.path.isdir("%s/rpms" % tmpdir):
542                if not self.ship_configs(host, user,
543                        "%s/rpms" % tmpdir, tarfiles_dir):
544                    return False
545            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
546            if not self.ssh_cmd(user, host,
547                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
548                            (pid, eid, tclfile), "startexp"):
549                return False
550            # After startexp the per-experiment directories exist
551            for f in base_confs:
552                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
553                        "%s/%s" % (proj_dir, f)):
554                    return False
555            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
556                    proj_dir):
557                return False
558            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
559            if not self.ssh_cmd(user, host,
560                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
561                    "swapexp"):
562                return False
563            return True
564        else:
565            self.log.debug("[start_segment]:unknown state %s" % state)
566            return False
567
568    def stop_segment(self, tb, eid, tbparams):
569        """
570        Stop a sub experiment by calling swapexp on the federant
571        """
572        user = tbparams[tb]['user']
573        host = tbparams[tb]['host']
574        pid = tbparams[tb]['project']
575
576        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
577        return self.ssh_cmd(user, host,
578                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
579
580       
581    def generate_ssh_keys(self, dest, type="rsa" ):
582        """
583        Generate a set of keys for the gateways to use to talk.
584
585        Keys are of type type and are stored in the required dest file.
586        """
587        valid_types = ("rsa", "dsa")
588        t = type.lower();
589        if t not in valid_types: raise ValueError
590        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
591
592        try:
593            trace = open("/dev/null", "w")
594        except IOError:
595            raise service_error(service_error.internal,
596                    "Cannot open /dev/null??");
597
598        # May raise CalledProcessError
599        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
600        rv = call(cmd, stdout=trace, stderr=trace)
601        if rv != 0:
602            raise service_error(service_error.internal, 
603                    "Cannot generate nonce ssh keys.  %s return code %d" \
604                            % (self.ssh_keygen, rv))
605
606    def gentopo(self, str):
607        """
608        Generate the topology dtat structure from the splitter's XML
609        representation of it.
610
611        The topology XML looks like:
612            <experiment>
613                <nodes>
614                    <node><vname></vname><ips>ip1:ip2</ips></node>
615                </nodes>
616                <lans>
617                    <lan>
618                        <vname></vname><vnode></vnode><ip></ip>
619                        <bandwidth></bandwidth><member>node:port</member>
620                    </lan>
621                </lans>
622        """
623        class topo_parse:
624            """
625            Parse the topology XML and create the dats structure.
626            """
627            def __init__(self):
628                # Typing of the subelements for data conversion
629                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
630                self.int_subelements = ( 'bandwidth',)
631                self.float_subelements = ( 'delay',)
632                # The final data structure
633                self.nodes = [ ]
634                self.lans =  [ ]
635                self.topo = { \
636                        'node': self.nodes,\
637                        'lan' : self.lans,\
638                    }
639                self.element = { }  # Current element being created
640                self.chars = ""     # Last text seen
641
642            def end_element(self, name):
643                # After each sub element the contents is added to the current
644                # element or to the appropriate list.
645                if name == 'node':
646                    self.nodes.append(self.element)
647                    self.element = { }
648                elif name == 'lan':
649                    self.lans.append(self.element)
650                    self.element = { }
651                elif name in self.str_subelements:
652                    self.element[name] = self.chars
653                    self.chars = ""
654                elif name in self.int_subelements:
655                    self.element[name] = int(self.chars)
656                    self.chars = ""
657                elif name in self.float_subelements:
658                    self.element[name] = float(self.chars)
659                    self.chars = ""
660
661            def found_chars(self, data):
662                self.chars += data.rstrip()
663
664
665        tp = topo_parse();
666        parser = xml.parsers.expat.ParserCreate()
667        parser.EndElementHandler = tp.end_element
668        parser.CharacterDataHandler = tp.found_chars
669
670        parser.Parse(str)
671
672        return tp.topo
673       
674
675    def genviz(self, topo):
676        """
677        Generate the visualization the virtual topology
678        """
679
680        neato = "/usr/local/bin/neato"
681        # These are used to parse neato output and to create the visualization
682        # file.
683        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
684        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
685                "%s</type></node>"
686
687        try:
688            # Node names
689            nodes = [ n['vname'] for n in topo['node'] ]
690            topo_lans = topo['lan']
691        except KeyError:
692            raise service_error(service_error.internal, "Bad topology")
693
694        lans = { }
695        links = { }
696
697        # Walk through the virtual topology, organizing the connections into
698        # 2-node connections (links) and more-than-2-node connections (lans).
699        # When a lan is created, it's added to the list of nodes (there's a
700        # node in the visualization for the lan).
701        for l in topo_lans:
702            if links.has_key(l['vname']):
703                if len(links[l['vname']]) < 2:
704                    links[l['vname']].append(l['vnode'])
705                else:
706                    nodes.append(l['vname'])
707                    lans[l['vname']] = links[l['vname']]
708                    del links[l['vname']]
709                    lans[l['vname']].append(l['vnode'])
710            elif lans.has_key(l['vname']):
711                lans[l['vname']].append(l['vnode'])
712            else:
713                links[l['vname']] = [ l['vnode'] ]
714
715
716        # Open up a temporary file for dot to turn into a visualization
717        try:
718            df, dotname = tempfile.mkstemp()
719            dotfile = os.fdopen(df, 'w')
720        except IOError:
721            raise service_error(service_error.internal,
722                    "Failed to open file in genviz")
723
724        # Generate a dot/neato input file from the links, nodes and lans
725        try:
726            print >>dotfile, "graph G {"
727            for n in nodes:
728                print >>dotfile, '\t"%s"' % n
729            for l in links.keys():
730                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
731            for l in lans.keys():
732                for n in lans[l]:
733                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
734            print >>dotfile, "}"
735            dotfile.close()
736        except TypeError:
737            raise service_error(service_error.internal,
738                    "Single endpoint link in vtopo")
739        except IOError:
740            raise service_error(service_error.internal, "Cannot write dot file")
741
742        # Use dot to create a visualization
743        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
744                '-Gpack=true', dotname], stdout=PIPE)
745
746        # Translate dot to vis format
747        vis_nodes = [ ]
748        vis = { 'node': vis_nodes }
749        for line in dot.stdout:
750            m = vis_re.match(line)
751            if m:
752                vn = m.group(1)
753                vis_node = {'name': vn, \
754                        'x': float(m.group(2)),\
755                        'y' : float(m.group(3)),\
756                    }
757                if vn in links.keys() or vn in lans.keys():
758                    vis_node['type'] = 'lan'
759                else:
760                    vis_node['type'] = 'node'
761                vis_nodes.append(vis_node)
762        rv = dot.wait()
763
764        os.remove(dotname)
765        if rv == 0 : return vis
766        else: return None
767
768
769    def get_access(self, tb, nodes, user, tbparam):
770        """
771        Get access to testbed through fedd and set the parameters for that tb
772        """
773
774        translate_attr = {
775            'slavenodestartcmd': 'expstart',
776            'slaveconnectorstartcmd': 'gwstart',
777            'masternodestartcmd': 'mexpstart',
778            'masterconnectorstartcmd': 'mgwstart',
779            'connectorimage': 'gwimage',
780            'connectortype': 'gwtype',
781            'tunnelcfg': 'tun',
782            'smbshare': 'smbshare',
783        }
784
785        uri = self.tbmap.get(tb, None)
786        if not uri:
787            raise service_error(serice_error.server_config, 
788                    "Unknown testbed: %s" % tb)
789
790        # The basic request
791        req = {\
792                'destinationTestbed' : { 'uri' : uri },
793                'user':  user,
794                'allocID' : { 'localname': 'test' },
795                # XXX: need to get service access stright
796                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
797                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
798            }
799       
800        # node resources if any
801        if nodes != None and len(nodes) > 0:
802            rnodes = [ ]
803            for n in nodes:
804                rn = { }
805                image, hw, count = n.split(":")
806                if image: rn['image'] = [ image ]
807                if hw: rn['hardware'] = [ hw ]
808                if count: rn['count'] = int(count)
809                rnodes.append(rn)
810            req['resources']= { }
811            req['resources']['node'] = rnodes
812
813        # No retry loop here.  Proxy servers must correctly authenticate
814        # themselves without help
815
816        try:
817            ctx = fedd_ssl_context(self.cert_file, 
818                    self.trusted_certs, password=self.cert_pwd)
819        except SSL.SSLError:
820            raise service_error(service_error.server_config, 
821                    "Server certificates misconfigured")
822
823        loc = feddServiceLocator();
824        port = loc.getfeddPortType(uri,
825                transport=M2Crypto.httpslib.HTTPSConnection, 
826                transdict={ 'ssl_context' : ctx })
827
828        # Reconstruct the full request message
829        msg = RequestAccessRequestMessage()
830        msg.set_element_RequestAccessRequestBody(
831                pack_soap(msg, "RequestAccessRequestBody", req))
832
833        try:
834            resp = port.RequestAccess(msg)
835        except ZSI.ParseException, e:
836            raise service_error(service_error.req,
837                    "Bad format message (XMLRPC??): %s" %
838                    str(e))
839        r = unpack_soap(resp)
840
841        if r.has_key('RequestAccessResponseBody'):
842            r = r['RequestAccessResponseBody']
843        else:
844            raise service_error(service_error.proxy,
845                    "Bad proxy response")
846
847
848        e = r['emulab']
849        p = e['project']
850        tbparam[tb] = { 
851                "boss": e['boss'],
852                "host": e['ops'],
853                "domain": e['domain'],
854                "fs": e['fileServer'],
855                "eventserver": e['eventServer'],
856                "project": unpack_id(p['name']),
857                "emulab" : e
858                }
859        # Make the testbed name be the label the user applied
860        p['testbed'] = {'localname': tb }
861
862        for u in p['user']:
863            tbparam[tb]['user'] = unpack_id(u['userID'])
864
865        for a in e['fedAttr']:
866            if a['attribute']:
867                key = translate_attr.get(a['attribute'].lower(), None)
868                if key:
869                    tbparam[tb][key]= a['value']
870
871    def remote_splitter(self, uri, desc, master):
872
873        req = {
874                'description' : { 'ns2description': desc },
875                'master': master,
876                'include_fedkit': bool(self.fedkit)
877            }
878
879        # No retry loop here.  Proxy servers must correctly authenticate
880        # themselves without help
881        try:
882            ctx = fedd_ssl_context(self.cert_file, 
883                    self.trusted_certs, password=self.cert_pwd)
884        except SSL.SSLError:
885            raise service_error(service_error.server_config, 
886                    "Server certificates misconfigured")
887
888        loc = feddInternalServiceLocator();
889        port = loc.getfeddInternalPortType(uri,
890                transport=M2Crypto.httpslib.HTTPSConnection, 
891                transdict={ 'ssl_context' : ctx })
892
893        # Reconstruct the full request message
894        msg = Ns2SplitRequestMessage()
895        msg.set_element_Ns2SplitRequestBody(
896                pack_soap(msg, "Ns2SplitRequestBody", req))
897
898        try:
899            resp = port.Ns2Split(msg)
900        except ZSI.ParseException, e:
901            raise service_error(service_error.req,
902                    "Bad format message (XMLRPC??): %s" %
903                    str(e))
904        r = unpack_soap(resp)
905        if r.has_key('Ns2SplitResponseBody'):
906            r = r['Ns2SplitResponseBody']
907            if r.has_key('output'):
908                return r['output'].splitlines()
909            else:
910                raise service_error(service_error.proxy, 
911                        "Bad splitter response (no output)")
912        else:
913            raise service_error(service_error.proxy, "Bad splitter response")
914       
915    class current_testbed:
916        """
917        Object for collecting the current testbed description.  The testbed
918        description is saved to a file with the local testbed variables
919        subsittuted line by line.
920        """
921        def __init__(self, eid, tmpdir, fedkit):
922            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
923            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
924            self.current_testbed = None
925            self.testbed_file = None
926
927            self.def_expstart = \
928                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
929            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
930            self.def_gwstart = \
931                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
932            self.def_mgwstart = \
933                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
934            self.def_gwimage = "FBSD61-TUNNEL2";
935            self.def_gwtype = "pc";
936
937            self.eid = eid
938            self.tmpdir = tmpdir
939            self.fedkit = fedkit
940
941        def __call__(self, line, master, allocated, tbparams):
942            # Capture testbed topology descriptions
943            if self.current_testbed == None:
944                m = self.begin_testbed.match(line)
945                if m != None:
946                    self.current_testbed = m.group(1)
947                    if self.current_testbed == None:
948                        raise service_error(service_error.req,
949                                "Bad request format (unnamed testbed)")
950                    allocated[self.current_testbed] = \
951                            allocated.get(self.current_testbed,0) + 1
952                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
953                    if not os.path.exists(tb_dir):
954                        try:
955                            os.mkdir(tb_dir)
956                        except IOError:
957                            raise service_error(service_error.internal,
958                                    "Cannot create %s" % tb_dir)
959                    try:
960                        self.testbed_file = open("%s/%s.%s.tcl" %
961                                (tb_dir, self.eid, self.current_testbed), 'w')
962                    except IOError:
963                        self.testbed_file = None
964                    return True
965                else: return False
966            else:
967                m = self.end_testbed.match(line)
968                if m != None:
969                    if m.group(1) != self.current_testbed:
970                        raise service_error(service_error.internal, 
971                                "Mismatched testbed markers!?")
972                    if self.testbed_file != None: 
973                        self.testbed_file.close()
974                        self.testbed_file = None
975                    self.current_testbed = None
976                elif self.testbed_file:
977                    # Substitute variables and put the line into the local
978                    # testbed file.
979                    gwtype = tbparams[self.current_testbed].get('gwtype', 
980                            self.def_gwtype)
981                    gwimage = tbparams[self.current_testbed].get('gwimage', 
982                            self.def_gwimage)
983                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
984                            self.def_mgwstart)
985                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
986                            self.def_mexpstart)
987                    gwstart = tbparams[self.current_testbed].get('gwstart', 
988                            self.def_gwstart)
989                    expstart = tbparams[self.current_testbed].get('expstart', 
990                            self.def_expstart)
991                    project = tbparams[self.current_testbed].get('project')
992                    line = re.sub("GWTYPE", gwtype, line)
993                    line = re.sub("GWIMAGE", gwimage, line)
994                    if self.current_testbed == master:
995                        line = re.sub("GWSTART", mgwstart, line)
996                        line = re.sub("EXPSTART", mexpstart, line)
997                    else:
998                        line = re.sub("GWSTART", gwstart, line)
999                        line = re.sub("EXPSTART", expstart, line)
1000                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1001                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1002                    line = re.sub("EID", self.eid, line)
1003                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1004                            (project, self.eid), line)
1005                    if self.fedkit:
1006                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1007                                line)
1008                    print >>self.testbed_file, line
1009                return True
1010
1011    class allbeds:
1012        """
1013        Process the Allbeds section.  Get access to each federant and save the
1014        parameters in tbparams
1015        """
1016        def __init__(self, get_access):
1017            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1018            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1019            self.in_allbeds = False
1020            self.get_access = get_access
1021
1022        def __call__(self, line, user, tbparams):
1023            # Testbed access parameters
1024            if not self.in_allbeds:
1025                if self.begin_allbeds.match(line):
1026                    self.in_allbeds = True
1027                    return True
1028                else:
1029                    return False
1030            else:
1031                if self.end_allbeds.match(line):
1032                    self.in_allbeds = False
1033                else:
1034                    nodes = line.split('|')
1035                    tb = nodes.pop(0)
1036                    self.get_access(tb, nodes, user, tbparams)
1037                return True
1038
1039    class gateways:
1040        def __init__(self, eid, master, tmpdir, gw_pubkey,
1041                gw_secretkey, copy_file, fedkit):
1042            self.begin_gateways = \
1043                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1044            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1045            self.current_gateways = None
1046            self.control_gateway = None
1047            self.active_end = { }
1048
1049            self.eid = eid
1050            self.master = master
1051            self.tmpdir = tmpdir
1052            self.gw_pubkey_base = gw_pubkey
1053            self.gw_secretkey_base = gw_secretkey
1054
1055            self.copy_file = copy_file
1056            self.fedkit = fedkit
1057
1058
1059        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1060                active_end, tbparams, dtb, myname, desthost, type):
1061            """
1062            Produce a gateway configuration file from a gateways line.
1063            """
1064
1065            sproject = tbparams[gw].get('project', 'project')
1066            dproject = tbparams[dtb].get('project', 'project')
1067            sdomain = ".%s.%s%s" % (eid, sproject,
1068                    tbparams[gw].get('domain', ".example.com"))
1069            ddomain = ".%s.%s%s" % (eid, dproject,
1070                    tbparams[dtb].get('domain', ".example.com"))
1071            boss = tbparams[master].get('boss', "boss")
1072            fs = tbparams[master].get('fs', "fs")
1073            event_server = "%s%s" % \
1074                    (tbparams[gw].get('eventserver', "event_server"),
1075                            tbparams[gw].get('domain', "example.com"))
1076            remote_event_server = "%s%s" % \
1077                    (tbparams[dtb].get('eventserver', "event_server"),
1078                            tbparams[dtb].get('domain', "example.com"))
1079            seer_control = "%s%s" % \
1080                    (tbparams[gw].get('control', "control"), sdomain)
1081
1082            if self.fedkit:
1083                remote_script_dir = "/usr/local/federation/bin"
1084                local_script_dir = "/usr/local/federation/bin"
1085            else:
1086                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1087                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1088
1089            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1090            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1091            tunnel_cfg = tbparams[gw].get("tun", "false")
1092
1093            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1094            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1095
1096            # translate to lower case so the `hostname` hack for specifying
1097            # configuration files works.
1098            conf_file = conf_file.lower();
1099            remote_conf_file = remote_conf_file.lower();
1100
1101            if dtb == master:
1102                active = "false"
1103            elif gw == master:
1104                active = "true"
1105            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1106                active = "false"
1107            else:
1108                active_end['%s-%s' % (gw, dtb)] = 1
1109                active = "true"
1110
1111            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1112            print >>gwconfig, "Active: %s" % active
1113            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1114            print >>gwconfig, "BossName: %s" % boss
1115            print >>gwconfig, "FsName: %s" % fs
1116            print >>gwconfig, "EventServerName: %s" % event_server
1117            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1118            print >>gwconfig, "SeerControl: %s" % seer_control
1119            print >>gwconfig, "Type: %s" % type
1120            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1121            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1122                    local_script_dir
1123            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1124            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1125            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1126                    (remote_conf_dir, remote_conf_file)
1127            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1128            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1129            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1130            gwconfig.close()
1131
1132            return active == "true"
1133
1134        def __call__(self, line, allocated, tbparams):
1135            # Process gateways
1136            if not self.current_gateways:
1137                m = self.begin_gateways.match(line)
1138                if m:
1139                    self.current_gateways = m.group(1)
1140                    if allocated.has_key(self.current_gateways):
1141                        # This test should always succeed
1142                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1143                        if not os.path.exists(tb_dir):
1144                            try:
1145                                os.mkdir(tb_dir)
1146                            except IOError:
1147                                raise service_error(service_error.internal,
1148                                        "Cannot create %s" % tb_dir)
1149                    else:
1150                        # XXX
1151                        self.log.error("[gateways]: Ignoring gateways for " + \
1152                                "unknown testbed %s" % self.current_gateways)
1153                        self.current_gateways = None
1154                    return True
1155                else:
1156                    return False
1157            else:
1158                m = self.end_gateways.match(line)
1159                if m :
1160                    if m.group(1) != self.current_gateways:
1161                        raise service_error(service_error.internal,
1162                                "Mismatched gateway markers!?")
1163                    if self.control_gateway:
1164                        try:
1165                            cc = open("%s/%s/client.conf" %
1166                                    (self.tmpdir, self.current_gateways), 'w')
1167                            print >>cc, "ControlGateway: %s" % \
1168                                    self.control_gateway
1169                            if tbparams[self.master].has_key('smbshare'):
1170                                print >>cc, "SMBSHare: %s" % \
1171                                        tbparams[self.master]['smbshare']
1172                            print >>cc, "ProjectUser: %s" % \
1173                                    tbparams[self.master]['user']
1174                            print >>cc, "ProjectName: %s" % \
1175                                    tbparams[self.master]['project']
1176                            cc.close()
1177                        except IOError:
1178                            raise service_error(service_error.internal,
1179                                    "Error creating client config")
1180                        try:
1181                            cc = open("%s/%s/seer.conf" %
1182                                    (self.tmpdir, self.current_gateways),
1183                                    'w')
1184                            if self.current_gateways != self.master:
1185                                print >>cc, "ControlNode: %s" % \
1186                                        self.control_gateway
1187                            print >>cc, "ExperimentID: %s/%s" % \
1188                                    ( tbparams[self.master]['project'], \
1189                                    self.eid )
1190                            cc.close()
1191                        except IOError:
1192                            raise service_error(service_error.internal,
1193                                    "Error creating seer config")
1194                    else:
1195                        debug.error("[gateways]: No control gateway for %s" %\
1196                                    self.current_gateways)
1197                    self.current_gateways = None
1198                else:
1199                    dtb, myname, desthost, type = line.split(" ")
1200
1201                    if type == "control" or type == "both":
1202                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1203                                self.eid, 
1204                                tbparams[self.current_gateways]['project'],
1205                                tbparams[self.current_gateways]['domain'])
1206                    try:
1207                        active = self.gateway_conf_file(self.current_gateways,
1208                                self.master, self.eid, self.gw_pubkey_base,
1209                                self.gw_secretkey_base,
1210                                self.active_end, tbparams, dtb, myname,
1211                                desthost, type)
1212                    except IOError, e:
1213                        raise service_error(service_error.internal,
1214                                "Failed to write config file for %s" % \
1215                                        self.current_gateway)
1216           
1217                    gw_pubkey = "%s/keys/%s" % \
1218                            (self.tmpdir, self.gw_pubkey_base)
1219                    gw_secretkey = "%s/keys/%s" % \
1220                            (self.tmpdir, self.gw_secretkey_base)
1221
1222                    pkfile = "%s/%s/%s" % \
1223                            ( self.tmpdir, self.current_gateways, 
1224                                    self.gw_pubkey_base)
1225                    skfile = "%s/%s/%s" % \
1226                            ( self.tmpdir, self.current_gateways, 
1227                                    self.gw_secretkey_base)
1228
1229                    if not os.path.exists(pkfile):
1230                        try:
1231                            self.copy_file(gw_pubkey, pkfile)
1232                        except IOError:
1233                            service_error(service_error.internal,
1234                                    "Failed to copy pubkey file")
1235
1236                    if active and not os.path.exists(skfile):
1237                        try:
1238                            self.copy_file(gw_secretkey, skfile)
1239                        except IOError:
1240                            service_error(service_error.internal,
1241                                    "Failed to copy secretkey file")
1242                return True
1243
1244    class shunt_to_file:
1245        """
1246        Simple class to write data between two regexps to a file.
1247        """
1248        def __init__(self, begin, end, filename):
1249            """
1250            Begin shunting on a match of begin, stop on end, send data to
1251            filename.
1252            """
1253            self.begin = re.compile(begin)
1254            self.end = re.compile(end)
1255            self.in_shunt = False
1256            self.file = None
1257            self.filename = filename
1258
1259        def __call__(self, line):
1260            """
1261            Call this on each line in the input that may be shunted.
1262            """
1263            if not self.in_shunt:
1264                if self.begin.match(line):
1265                    self.in_shunt = True
1266                    try:
1267                        self.file = open(self.filename, "w")
1268                    except:
1269                        self.file = None
1270                        raise
1271                    return True
1272                else:
1273                    return False
1274            else:
1275                if self.end.match(line):
1276                    if self.file: 
1277                        self.file.close()
1278                        self.file = None
1279                    self.in_shunt = False
1280                else:
1281                    if self.file:
1282                        print >>self.file, line
1283                return True
1284
1285    class shunt_to_list:
1286        """
1287        Same interface as shunt_to_file.  Data collected in self.list, one list
1288        element per line.
1289        """
1290        def __init__(self, begin, end):
1291            self.begin = re.compile(begin)
1292            self.end = re.compile(end)
1293            self.in_shunt = False
1294            self.list = [ ]
1295       
1296        def __call__(self, line):
1297            if not self.in_shunt:
1298                if self.begin.match(line):
1299                    self.in_shunt = True
1300                    return True
1301                else:
1302                    return False
1303            else:
1304                if self.end.match(line):
1305                    self.in_shunt = False
1306                else:
1307                    self.list.append(line)
1308                return True
1309
1310    class shunt_to_string:
1311        """
1312        Same interface as shunt_to_file.  Data collected in self.str, all in
1313        one string.
1314        """
1315        def __init__(self, begin, end):
1316            self.begin = re.compile(begin)
1317            self.end = re.compile(end)
1318            self.in_shunt = False
1319            self.str = ""
1320       
1321        def __call__(self, line):
1322            if not self.in_shunt:
1323                if self.begin.match(line):
1324                    self.in_shunt = True
1325                    return True
1326                else:
1327                    return False
1328            else:
1329                if self.end.match(line):
1330                    self.in_shunt = False
1331                else:
1332                    self.str += line
1333                return True
1334
1335    def create_experiment(self, req, fid):
1336        """
1337        The external interface to experiment creation called from the
1338        dispatcher.
1339
1340        Creates a working directory, splits the incoming description using the
1341        splitter script and parses out the avrious subsections using the
1342        lcasses above.  Once each sub-experiment is created, use pooled threads
1343        to instantiate them and start it all up.
1344        """
1345        try:
1346            tmpdir = tempfile.mkdtemp(prefix="split-")
1347        except IOError:
1348            raise service_error(service_error.internal, "Cannot create tmp dir")
1349
1350        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1351        gw_secretkey_base = "fed.%s" % self.ssh_type
1352        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1353        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1354        tclfile = tmpdir + "/experiment.tcl"
1355        tbparams = { }
1356
1357        pid = "dummy"
1358        gid = "dummy"
1359        # XXX
1360        fail_soft = False
1361
1362        try:
1363            os.mkdir(tmpdir+"/keys")
1364        except OSError:
1365            raise service_error(service_error.internal,
1366                    "Can't make temporary dir")
1367
1368        req = req.get('CreateRequestBody', None)
1369        if not req:
1370            raise service_error(service_error.req,
1371                    "Bad request format (no CreateRequestBody)")
1372        # The tcl parser needs to read a file so put the content into that file
1373        descr=req.get('experimentdescription', None)
1374        if descr:
1375            file_content=descr.get('ns2description', None)
1376            if file_content:
1377                try:
1378                    f = open(tclfile, 'w')
1379                    f.write(file_content)
1380                    f.close()
1381                except IOError:
1382                    raise service_error(service_error.internal,
1383                            "Cannot write temp experiment description")
1384            else:
1385                raise service_error(service_error.req, 
1386                        "Only ns2descriptions supported")
1387        else:
1388            raise service_error(service_error.req, "No experiment description")
1389
1390        if req.has_key('experimentID') and \
1391                req['experimentID'].has_key('localname'):
1392            eid = req['experimentID']['localname']
1393            self.state_lock.acquire()
1394            while (self.state.has_key(eid)):
1395                eid += random.choice(string.ascii_letters)
1396            # To avoid another thread picking this localname
1397            self.state[eid] = "placeholder"
1398            self.state_lock.release()
1399        else:
1400            eid = self.exp_stem
1401            for i in range(0,5):
1402                eid += random.choice(string.ascii_letters)
1403            self.state_lock.acquire()
1404            while (self.state.has_key(eid)):
1405                eid = self.exp_stem
1406                for i in range(0,5):
1407                    eid += random.choice(string.ascii_letters)
1408            # To avoid another thread picking this localname
1409            self.state[eid] = "placeholder"
1410            self.state_lock.release()
1411
1412        try:
1413            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1414        except ValueError:
1415            raise service_error(service_error.server_config, 
1416                    "Bad key type (%s)" % self.ssh_type)
1417
1418        user = req.get('user', None)
1419        if user == None:
1420            raise service_error(service_error.req, "No user")
1421
1422        master = req.get('master', None)
1423        if master == None:
1424            raise service_error(service_error.req, "No master testbed label")
1425       
1426        if self.splitter_url:
1427            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1428            split_data = self.remote_splitter(self.splitter_url, file_content,
1429                    master)
1430        else:
1431            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1432                str(self.muxmax), '-m', master]
1433
1434            if self.fedkit:
1435                tclcmd.append('-k')
1436
1437            tclcmd.extend([pid, gid, eid, tclfile])
1438
1439            self.log.debug("running local splitter %s", " ".join(tclcmd))
1440            tclparser = Popen(tclcmd, stdout=PIPE)
1441            split_data = tclparser.stdout
1442
1443        allocated = { }     # Testbeds we can access
1444        started = { }       # Testbeds where a sub-experiment started
1445                            # successfully
1446
1447        # Objects to parse the splitter output (defined above)
1448        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1449        parse_allbeds = self.allbeds(self.get_access)
1450        parse_gateways = self.gateways(eid, master, tmpdir,
1451                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1452        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1453                    "^#\s+End\s+Vtopo")
1454        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1455                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1456        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1457                "^#\s+End\s+tarfiles")
1458        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1459                "^#\s+End\s+rpms")
1460
1461        # Worling on the split data
1462        for line in split_data:
1463            line = line.rstrip()
1464            if parse_current_testbed(line, master, allocated, tbparams):
1465                continue
1466            elif parse_allbeds(line, user, tbparams):
1467                continue
1468            elif parse_gateways(line, allocated, tbparams):
1469                continue
1470            elif parse_vtopo(line):
1471                continue
1472            elif parse_hostnames(line):
1473                continue
1474            elif parse_tarfiles(line):
1475                continue
1476            elif parse_rpms(line):
1477                continue
1478            else:
1479                raise service_error(service_error.internal, 
1480                        "Bad tcl parse? %s" % line)
1481
1482        # Virtual topology and visualization
1483        vtopo = self.gentopo(parse_vtopo.str)
1484        if not vtopo:
1485            raise service_error(service_error.internal, 
1486                    "Failed to generate virtual topology")
1487
1488        vis = self.genviz(vtopo)
1489        if not vis:
1490            raise service_error(service_error.internal, 
1491                    "Failed to generate visualization")
1492
1493        # save federant information
1494        for k in allocated.keys():
1495            tbparams[k]['federant'] = {\
1496                    'name': [ { 'localname' : eid} ],\
1497                    'emulab': tbparams[k]['emulab'],\
1498                    'master' : k == master,\
1499                }
1500
1501
1502        # Copy tarfiles and rpms needed at remote sites into a staging area
1503        try:
1504            if self.fedkit:
1505                parse_tarfiles.list.append(self.fedkit)
1506            for t in parse_tarfiles.list:
1507                if not os.path.exists("%s/tarfiles" % tmpdir):
1508                    os.mkdir("%s/tarfiles" % tmpdir)
1509                self.copy_file(t, "%s/tarfiles/%s" % \
1510                        (tmpdir, os.path.basename(t)))
1511            for r in parse_rpms.list:
1512                if not os.path.exists("%s/rpms" % tmpdir):
1513                    os.mkdir("%s/rpms" % tmpdir)
1514                self.copy_file(r, "%s/rpms/%s" % \
1515                        (tmpdir, os.path.basename(r)))
1516        except IOError, e:
1517            raise service_error(service_error.internal, 
1518                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1519
1520        thread_pool_info = self.thread_pool()
1521        threads = [ ]
1522
1523        for tb in [ k for k in allocated.keys() if k != master]:
1524            # Wait until we have a free slot to start the next testbed load
1525            thread_pool_info.acquire()
1526            while thread_pool_info.started - \
1527                    thread_pool_info.terminated >= self.nthreads:
1528                thread_pool_info.wait()
1529            thread_pool_info.release()
1530
1531            # Create and start a thread to start the segment, and save it to
1532            # get the return value later
1533            t  = self.pooled_thread(target=self.start_segment, 
1534                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1535                    pdata=thread_pool_info, trace_file=self.trace_file)
1536            threads.append(t)
1537            t.start()
1538
1539        # Wait until all finish (the first clause of the while is to make sure
1540        # one starts)
1541        thread_pool_info.acquire()
1542        while thread_pool_info.started == 0 or \
1543                thread_pool_info.started > thread_pool_info.terminated:
1544            thread_pool_info.wait()
1545        thread_pool_info.release()
1546
1547        # If none failed, start the master
1548        failed = [ t.getName() for t in threads if not t.rv ]
1549
1550        if len(failed) == 0:
1551            if not self.start_segment(master, eid, tbparams, tmpdir):
1552                failed.append(master)
1553
1554        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1555        # If one failed clean up, unless fail_soft is set
1556        if failed:
1557            if not fail_soft:
1558                for tb in succeeded:
1559                    self.stop_segment(tb, eid, tbparams)
1560                # Remove the placeholder
1561                self.state_lock.acquire()
1562                del self.state[eid]
1563                self.state_lock.release()
1564
1565                raise service_error(service_error.federant,
1566                    "Swap in failed on %s" % ",".join(failed))
1567        else:
1568            self.log.info("[start_segment]: Experiment %s started" % eid)
1569
1570        # Generate an ID for the experiment (slice) and a certificate that the
1571        # allocator can use to prove they own it.  We'll ship it back through
1572        # the encrypted connection.
1573        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1574
1575        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1576
1577        # Walk up tmpdir, deleting as we go
1578        for path, dirs, files in os.walk(tmpdir, topdown=False):
1579            for f in files:
1580                os.remove(os.path.join(path, f))
1581            for d in dirs:
1582                os.rmdir(os.path.join(path, d))
1583        os.rmdir(tmpdir)
1584
1585        resp = { 'federant' : [ tbparams[tb]['federant'] \
1586                for tb in tbparams.keys() \
1587                    if tbparams[tb].has_key('federant') ],\
1588                    'vtopo': vtopo,\
1589                    'vis' : vis,
1590                    'experimentID' : [\
1591                            { 'fedid': copy.copy(expid) }, \
1592                            { 'localname': eid },\
1593                        ],\
1594                    'experimentAccess': { 'X509' : expcert },\
1595                }
1596
1597        # Insert the experiment into our state and update the disk copy
1598        self.state_lock.acquire()
1599        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1600                for tb in tbparams.keys() \
1601                    if tbparams[tb].has_key('federant') ],\
1602                    'vtopo': vtopo,\
1603                    'vis' : vis,
1604                    'experimentID' : [\
1605                            { 'fedid': expid }, { 'localname': eid },\
1606                        ],\
1607                }
1608        self.state[eid] = self.state[expid]
1609        if self.state_filename: self.write_state()
1610        self.state_lock.release()
1611
1612        if not failed:
1613            return resp
1614        else:
1615            raise service_error(service_error.partial, \
1616                    "Partial swap in on %s" % ",".join(succeeded))
1617
1618
1619    def get_vtopo(self, req, fid):
1620        """
1621        Return the stored virtual topology for this experiment
1622        """
1623        rv = None
1624
1625        req = req.get('VtopoRequestBody', None)
1626        if not req:
1627            raise service_error(service_error.req,
1628                    "Bad request format (no VtopoRequestBody)")
1629        exp = req.get('experiment', None)
1630        if exp:
1631            if exp.has_key('fedid'):
1632                key = fedid(bits=exp['fedid'])
1633                keytype = "fedid"
1634            elif exp.has_key('localname'):
1635                key = exp['localname']
1636                keytype = "localname"
1637            else:
1638                raise service_error(service_error.req, "Unknown lookup type")
1639        else:
1640            raise service_error(service_error.req, "No request?")
1641
1642        self.state_lock.acquire()
1643        if self.state.has_key(key):
1644            rv = { 'experiment' : {keytype: key },\
1645                    'vtopo': self.state[key]['vtopo'],\
1646                }
1647        self.state_lock.release()
1648
1649        if rv: return rv
1650        else: raise service_error(service_error.req, "No such experiment")
1651
1652    def get_vis(self, req, fid):
1653        """
1654        Return the stored visualization for this experiment
1655        """
1656        rv = None
1657
1658        req = req.get('VisRequestBody', None)
1659        if not req:
1660            raise service_error(service_error.req,
1661                    "Bad request format (no VisRequestBody)")
1662        exp = req.get('experiment', None)
1663        if exp:
1664            if exp.has_key('fedid'):
1665                key = fedid(bits=exp['fedid'])
1666                keytype = "fedid"
1667            elif exp.has_key('localname'):
1668                key = exp['localname']
1669                keytype = "localname"
1670            else:
1671                raise service_error(service_error.req, "Unknown lookup type")
1672        else:
1673            raise service_error(service_error.req, "No request?")
1674
1675        self.state_lock.acquire()
1676        if self.state.has_key(key):
1677            rv =  { 'experiment' : {keytype: key },\
1678                    'vis': self.state[key]['vis'],\
1679                    }
1680        self.state_lock.release()
1681
1682        if rv: return rv
1683        else: raise service_error(service_error.req, "No such experiment")
1684
1685    def get_info(self, req, fid):
1686        """
1687        Return all the stored info about this experiment
1688        """
1689        rv = None
1690
1691        req = req.get('InfoRequestBody', None)
1692        if not req:
1693            raise service_error(service_error.req,
1694                    "Bad request format (no VisRequestBody)")
1695        exp = req.get('experiment', None)
1696        if exp:
1697            if exp.has_key('fedid'):
1698                key = fedid(bits=exp['fedid'])
1699                keytype = "fedid"
1700            elif exp.has_key('localname'):
1701                key = exp['localname']
1702                keytype = "localname"
1703            else:
1704                raise service_error(service_error.req, "Unknown lookup type")
1705        else:
1706            raise service_error(service_error.req, "No request?")
1707
1708        # The state may be massaged by the service function that called
1709        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1710        # state.
1711        self.state_lock.acquire()
1712        if self.state.has_key(key):
1713            rv = copy.deepcopy(self.state[key])
1714        self.state_lock.release()
1715
1716        if rv: return rv
1717        else: raise service_error(service_error.req, "No such experiment")
1718
1719
1720    def terminate_experiment(self, req, fid):
1721        """
1722        Swap this experiment out on the federants and delete the shared
1723        information
1724        """
1725        tbparams = { }
1726        req = req.get('TerminateRequestBody', None)
1727        if not req:
1728            raise service_error(service_error.req,
1729                    "Bad request format (no TerminateRequestBody)")
1730        exp = req.get('experiment', None)
1731        if exp:
1732            if exp.has_key('fedid'):
1733                key = fedid(bits=exp['fedid'])
1734                keytype = "fedid"
1735            elif exp.has_key('localname'):
1736                key = exp['localname']
1737                keytype = "localname"
1738            else:
1739                raise service_error(service_error.req, "Unknown lookup type")
1740        else:
1741            raise service_error(service_error.req, "No request?")
1742
1743        self.state_lock.acquire()
1744        fed_exp = self.state.get(key, None)
1745
1746        if fed_exp:
1747            # This branch of the conditional holds the lock to generate a
1748            # consistent temporary tbparams variable to deallocate experiments.
1749            # It releases the lock to do the deallocations and reacquires it to
1750            # remove the experiment state when the termination is complete.
1751            ids = []
1752            #  experimentID is a list of dicts that are self-describing
1753            #  identifiers.  This finds all the fedids and localnames - the
1754            #  keys of self.state - and puts them into ids.
1755            for id in fed_exp.get('experimentID', []):
1756                if id.has_key('fedid'): ids.append(id['fedid'])
1757                if id.has_key('localname'): ids.append(id['localname'])
1758
1759            # Construct enough of the tbparams to make the stop_segment calls
1760            # work
1761            for fed in fed_exp['federant']:
1762                try:
1763                    for e in fed['name']:
1764                        eid = e.get('localname', None)
1765                        if eid: break
1766                    else:
1767                        continue
1768
1769                    p = fed['emulab']['project']
1770
1771                    project = p['name']['localname']
1772                    tb = p['testbed']['localname']
1773                    user = p['user'][0]['userID']['localname']
1774
1775                    domain = fed['emulab']['domain']
1776                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1777                except KeyError, e:
1778                    continue
1779                tbparams[tb] = {\
1780                        'user': user,\
1781                        'domain': domain,\
1782                        'project': project,\
1783                        'host': host,\
1784                        'eid': eid,\
1785                    }
1786            self.state_lock.release()
1787
1788            # Stop everyone.
1789            for tb in tbparams.keys():
1790                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1791
1792            # Remove teh terminated experiment
1793            self.state_lock.acquire()
1794            for id in ids:
1795                if self.state.has_key(id): del self.state[id]
1796
1797            if self.state_filename: self.write_state()
1798            self.state_lock.release()
1799
1800            return { 'experiment': exp }
1801        else:
1802            # Don't forget to release the lock
1803            self.state_lock.release()
1804            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.