source: fedd/fedd_experiment_control.py @ d81971a

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since d81971a was d81971a, checked in by Ted Faber <faber@…>, 16 years ago

checkpoint of the resource management stuff

  • Property mode set to 100644
File size: 55.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5from ZSI import *
6from M2Crypto import SSL
7from M2Crypto.SSL.SSLServer import SSLServer
8import M2Crypto.httpslib
9
10import xml.parsers.expat
11
12import re
13import random
14import string
15import subprocess
16import tempfile
17import copy
18import pickle
19
20import traceback
21
22from threading import *
23
24from subprocess import *
25
26from fedd_services import *
27from fedd_internal_services import *
28from fedd_util import *
29import parse_detail
30from service_error import *
31
32import logging
33
34class nullHandler(logging.Handler):
35    def emit(self, record): pass
36
37fl = logging.getLogger("fedd.experiment_control")
38fl.addHandler(nullHandler())
39
40class fedd_experiment_control_local:
41    """
42    Control of experiments that this system can directly access.
43
44    Includes experiment creation, termination and information dissemination.
45    Thred safe.
46    """
47   
48    class thread_pool:
49        """
50        A class to keep track of a set of threads all invoked for the same
51        task.  Manages the mutual exclusion of the states.
52        """
53        def __init__(self):
54            """
55            Start a pool.
56            """
57            self.changed = Condition()
58            self.started = 0
59            self.terminated = 0
60
61        def acquire(self):
62            """
63            Get the pool's lock.
64            """
65            self.changed.acquire()
66
67        def release(self):
68            """
69            Release the pool's lock.
70            """
71            self.changed.release()
72
73        def wait(self, timeout = None):
74            """
75            Wait for a pool thread to start or stop.
76            """
77            self.changed.wait(timeout)
78
79        def start(self):
80            """
81            Called by a pool thread to report starting.
82            """
83            self.changed.acquire()
84            self.started += 1
85            self.changed.notifyAll()
86            self.changed.release()
87
88        def terminate(self):
89            """
90            Called by a pool thread to report finishing.
91            """
92            self.changed.acquire()
93            self.terminated += 1
94            self.changed.notifyAll()
95            self.changed.release()
96
97        def clear(self):
98            """
99            Clear all pool data.
100            """
101            self.changed.acquire()
102            self.started = 0
103            self.terminated =0
104            self.changed.notifyAll()
105            self.changed.release()
106
107    class pooled_thread(Thread):
108        """
109        One of a set of threads dedicated to a specific task.  Uses the
110        thread_pool class above for coordination.
111        """
112        def __init__(self, group=None, target=None, name=None, args=(), 
113                kwargs={}, pdata=None, trace_file=None):
114            Thread.__init__(self, group, target, name, args, kwargs)
115            self.rv = None          # Return value of the ops in this thread
116            self.exception = None   # Exception that terminated this thread
117            self.target=target      # Target function to run on start()
118            self.args = args        # Args to pass to target
119            self.kwargs = kwargs    # Additional kw args
120            self.pdata = pdata      # thread_pool for this class
121            # Logger for this thread
122            self.log = logging.getLogger("fedd.experiment_control")
123       
124        def run(self):
125            """
126            Emulate Thread.run, except add pool data manipulation and error
127            logging.
128            """
129            if self.pdata:
130                self.pdata.start()
131
132            if self.target:
133                try:
134                    self.rv = self.target(*self.args, **self.kwargs)
135                except service_error, s:
136                    self.exception = s
137                    self.log.error("Thread exception: %s %s" % \
138                            (s.code_string(), s.desc))
139                except:
140                    self.exception = sys.exc_info()[1]
141                    self.log.error(("Unexpected thread exception: %s" +\
142                            "Trace %s") % (self.exception,\
143                                traceback.format_exc()))
144            if self.pdata:
145                self.pdata.terminate()
146
147    def __init__(self, config=None):
148        """
149        Intialize the various attributes, most from the config object
150        """
151        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
152        self.thread_pool = fedd_experiment_control_local.thread_pool
153
154        self.cert_file = None
155        self.cert_pwd = None
156        self.trusted_certs = None
157
158        # Walk through the various relevant certificat specifying config
159        # attributes until the local certificate attributes can be resolved.
160        # The walk is from most specific to most general specification.
161        for s in ("experiment_control", "globals"):
162            if config.has_section(s): 
163                if config.has_option(s, "cert_file"):
164                    if not self.cert_file:
165                        self.cert_file = config.get(s, "cert_file")
166                        self.cert_pwd = config.get(s, "cert_pwd")
167
168                if config.has_option(s, "trusted_certs"):
169                    if not self.trusted_certs:
170                        self.trusted_certs = config.get(s, "trusted_certs")
171
172        self.exp_stem = "fed-stem"
173        self.log = logging.getLogger("fedd.experiment_control")
174        self.muxmax = 2
175        self.nthreads = 2
176        self.randomize_experiments = False
177
178        self.scp_exec = "/usr/bin/scp"
179        self.splitter = None
180        self.ssh_exec="/usr/bin/ssh"
181        self.ssh_keygen = "/usr/bin/ssh-keygen"
182        self.ssh_identity_file = None
183
184        if config.has_section("experiment_control"):
185            self.debug = config.get("experiment_control", "create_debug")
186            self.state_filename = config.get("experiment_control", 
187                    "experiment_state_file")
188            self.splitter_url = config.get("experiment_control", "splitter_url")
189            self.fedkit = config.get("experiment_control", "fedkit")
190        else:
191            self.debug = False
192            self.state_filename = None
193            self.splitter_url = None
194            self.fedkit = None
195
196        # XXX
197        self.ssh_pubkey_file = "/users/faber/.ssh/id_rsa.pub"
198        self.ssh_type = "rsa"
199        self.state = { }
200        self.state_lock = Lock()
201        self.tclsh = "/usr/local/bin/otclsh"
202        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
203        self.tbmap = { 
204                'deter':'https://users.isi.deterlab.net:23235',
205                'emulab':'https://users.isi.deterlab.net:23236',
206                'ucb':'https://users.isi.deterlab.net:23237',
207                }
208        self.trace_file = sys.stderr
209
210        self.def_expstart = \
211                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
212                "/tmp/federate";
213        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
214                "FEDDIR/hosts";
215        self.def_gwstart = \
216                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
217                "/tmp/bridge.log";
218        self.def_mgwstart = \
219                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
220                "/tmp/bridge.log";
221        self.def_gwimage = "FBSD61-TUNNEL2";
222        self.def_gwtype = "pc";
223
224
225        if self.ssh_pubkey_file:
226            try:
227                f = open(self.ssh_pubkey_file, 'r')
228                self.ssh_pubkey = f.read()
229                f.close()
230            except IOError:
231                raise service_error(service_error.internal,
232                        "Cannot read sshpubkey")
233
234        set_log_level(config, "experiment_control", self.log)
235
236        # Grab saved state.  OK to do this w/o locking because it's read only
237        # and only one thread should be in existence that can see self.state at
238        # this point.
239        if self.state_filename:
240            self.read_state()
241
242        # Dispatch tables
243        self.soap_services = {\
244                'Create': make_soap_handler(\
245                        CreateRequestMessage.typecode,
246                        getattr(self, "create_experiment"), 
247                        CreateResponseMessage,
248                        "CreateResponseBody"),
249                'Vtopo': make_soap_handler(\
250                        VtopoRequestMessage.typecode,
251                        getattr(self, "get_vtopo"),
252                        VtopoResponseMessage,
253                        "VtopoResponseBody"),
254                'Vis': make_soap_handler(\
255                        VisRequestMessage.typecode,
256                        getattr(self, "get_vis"),
257                        VisResponseMessage,
258                        "VisResponseBody"),
259                'Info': make_soap_handler(\
260                        InfoRequestMessage.typecode,
261                        getattr(self, "get_info"),
262                        InfoResponseMessage,
263                        "InfoResponseBody"),
264                'Terminate': make_soap_handler(\
265                        TerminateRequestMessage.typecode,
266                        getattr(self, "terminate_experiment"),
267                        TerminateResponseMessage,
268                        "TerminateResponseBody"),
269        }
270
271        self.xmlrpc_services = {\
272                'Create': make_xmlrpc_handler(\
273                        getattr(self, "create_experiment"), 
274                        "CreateResponseBody"),
275                'Vtopo': make_xmlrpc_handler(\
276                        getattr(self, "get_vtopo"),
277                        "VtopoResponseBody"),
278                'Vis': make_xmlrpc_handler(\
279                        getattr(self, "get_vis"),
280                        "VisResponseBody"),
281                'Info': make_xmlrpc_handler(\
282                        getattr(self, "get_info"),
283                        "InfoResponseBody"),
284                'Terminate': make_xmlrpc_handler(\
285                        getattr(self, "terminate_experiment"),
286                        "TerminateResponseBody"),
287        }
288
289    def copy_file(self, src, dest, size=1024):
290        """
291        Exceedingly simple file copy.
292        """
293        s = open(src,'r')
294        d = open(dest, 'w')
295
296        buf = "x"
297        while buf != "":
298            buf = s.read(size)
299            d.write(buf)
300        s.close()
301        d.close()
302
303    # Call while holding self.state_lock
304    def write_state(self):
305        """
306        Write a new copy of experiment state after copying the existing state
307        to a backup.
308
309        State format is a simple pickling of the state dictionary.
310        """
311        if os.access(self.state_filename, os.W_OK):
312            self.copy_file(self.state_filename, \
313                    "%s.bak" % self.state_filename)
314        try:
315            f = open(self.state_filename, 'w')
316            pickle.dump(self.state, f)
317        except IOError, e:
318            self.log.error("Can't write file %s: %s" % \
319                    (self.state_filename, e))
320        except pickle.PicklingError, e:
321            self.log.error("Pickling problem: %s" % e)
322        except TypeError, e:
323            self.log.error("Pickling problem (TypeError): %s" % e)
324
325    # Call while holding self.state_lock
326    def read_state(self):
327        """
328        Read a new copy of experiment state.  Old state is overwritten.
329
330        State format is a simple pickling of the state dictionary.
331        """
332        try:
333            f = open(self.state_filename, "r")
334            self.state = pickle.load(f)
335            self.log.debug("[read_state]: Read state from %s" % \
336                    self.state_filename)
337        except IOError, e:
338            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
339                    % (self.state_filename, e))
340        except pickle.UnpicklingError, e:
341            self.log.warning(("[read_state]: No saved state: " + \
342                    "Unpickling failed: %s") % e)
343
344    def scp_file(self, file, user, host, dest=""):
345        """
346        scp a file to the remote host.  If debug is set the action is only
347        logged.
348        """
349
350        scp_cmd = [self.scp_exec, file, "%s@%s:%s" % (user, host, dest)]
351        rv = 0
352
353        try:
354            dnull = open("/dev/null", "r")
355        except IOError:
356            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
357            dnull = Null
358
359        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
360        if not self.debug:
361            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
362            else: rv = call(scp_cmd)
363
364        return rv == 0
365
366    def ssh_cmd(self, user, host, cmd, wname=None):
367        """
368        Run a remote command on host as user.  If debug is set, the action is
369        only logged.
370        """
371        sh_str = "%s %s@%s %s" % (self.ssh_exec, user, host, cmd)
372
373        try:
374            dnull = open("/dev/null", "r")
375        except IOError:
376            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
377            dnull = Null
378
379        self.log.debug("[ssh_cmd]: %s" % sh_str)
380        if not self.debug:
381            if dnull:
382                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
383            else:
384                sub = Popen(sh_str, shell=True)
385            return sub.wait() == 0
386        else:
387            return True
388
389    def ship_configs(self, host, user, src_dir, dest_dir):
390        """
391        Copy federant-specific configuration files to the federant.
392        """
393        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
394            return False
395        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
396            return False
397
398        for f in os.listdir(src_dir):
399            if os.path.isdir(f):
400                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
401                        "%s/%s" % (dest_dir, f)):
402                    return False
403            else:
404                if not self.scp_file("%s/%s" % (src_dir, f), 
405                        user, host, dest_dir):
406                    return False
407        return True
408
409    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
410        """
411        Start a sub-experiment on a federant.
412
413        Get the current state, modify or create as appropriate, ship data and
414        configs and start the experiment.  There are small ordering differences
415        based on the initial state of the sub-experiment.
416        """
417        # ops node in the federant
418        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
419        user = tbparams[tb]['user']     # federant user
420        pid = tbparams[tb]['project']   # federant project
421        # XXX
422        base_confs = ( "hosts",)
423        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
424        # command to test experiment state
425        expinfo_exec = "/usr/testbed/bin/expinfo" 
426        # Configuration directories on the remote machine
427        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
428        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
429        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
430        # Regular expressions to parse the expinfo response
431        state_re = re.compile("State:\s+(\w+)")
432        no_exp_re = re.compile("^No\s+such\s+experiment")
433        state = None    # Experiment state parsed from expinfo
434        # The expinfo ssh command
435        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
436
437        # Get status
438        self.log.debug("[start_segment]: %s"% " ".join(cmd))
439        dev_null = None
440        try:
441            dev_null = open("/dev/null", "a")
442        except IOError, e:
443            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
444
445        if self.debug:
446            state = 'swapped'
447            rv = 0
448        else:
449            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
450            for line in status.stdout:
451                m = state_re.match(line)
452                if m: state = m.group(1)
453                else:
454                    m = no_exp_re.match(line)
455                    if m: state = "none"
456            rv = status.wait()
457
458        # If the experiment is not present the subcommand returns a non-zero
459        # return value.  If we successfully parsed a "none" outcome, ignore the
460        # return code.
461        if rv != 0 and state != "none":
462            raise service_error(service_error.internal,
463                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
464
465        self.log.debug("[start_segment]: %s: %s" % (tb, state))
466        self.log.info("[start_segment]:transferring experiment to %s" % tb)
467
468        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
469            return False
470        # Clear the federation files
471        if not self.ssh_cmd(user, host, 
472                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
473            return False
474        if not self.ssh_cmd(user, host, 
475                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
476            return False
477        # Clear and create the tarfiles and rpm directories
478        for d in (tarfiles_dir, rpms_dir):
479            if not self.ssh_cmd(user, host, 
480                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
481                return False
482            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
483                    "create tarfiles"):
484                return False
485       
486        if state == 'active':
487            # Remote experiment is active.  Modify it.
488            for f in base_confs:
489                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
490                        "%s/%s" % (proj_dir, f)):
491                    return False
492            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
493                    proj_dir):
494                return False
495            if os.path.isdir("%s/tarfiles" % tmpdir):
496                if not self.ship_configs(host, user,
497                        "%s/tarfiles" % tmpdir, tarfiles_dir):
498                    return False
499            if os.path.isdir("%s/rpms" % tmpdir):
500                if not self.ship_configs(host, user,
501                        "%s/rpms" % tmpdir, tarfiles_dir):
502                    return False
503            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
504            if not self.ssh_cmd(user, host,
505                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
506                            (pid, eid, tclfile), "modexp"):
507                return False
508            return True
509        elif state == "swapped":
510            # Remote experiment swapped out.  Modify it and swap it in.
511            for f in base_confs:
512                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
513                        "%s/%s" % (proj_dir, f)):
514                    return False
515            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
516                    proj_dir):
517                return False
518            if os.path.isdir("%s/tarfiles" % tmpdir):
519                if not self.ship_configs(host, user,
520                        "%s/tarfiles" % tmpdir, tarfiles_dir):
521                    return False
522            if os.path.isdir("%s/rpms" % tmpdir):
523                if not self.ship_configs(host, user,
524                        "%s/rpms" % tmpdir, tarfiles_dir):
525                    return False
526            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
527            if not self.ssh_cmd(user, host,
528                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
529                    "modexp"):
530                return False
531            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
532            if not self.ssh_cmd(user, host,
533                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
534                    "swapexp"):
535                return False
536            return True
537        elif state == "none":
538            # No remote experiment.  Create one.  We do this in 2 steps so we
539            # can put the configuration files and scripts into the new
540            # experiment directories.
541
542            # Tarfiles must be present for creation to work
543            if os.path.isdir("%s/tarfiles" % tmpdir):
544                if not self.ship_configs(host, user,
545                        "%s/tarfiles" % tmpdir, tarfiles_dir):
546                    return False
547            if os.path.isdir("%s/rpms" % tmpdir):
548                if not self.ship_configs(host, user,
549                        "%s/rpms" % tmpdir, tarfiles_dir):
550                    return False
551            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
552            if not self.ssh_cmd(user, host,
553                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
554                            (pid, eid, tclfile), "startexp"):
555                return False
556            # After startexp the per-experiment directories exist
557            for f in base_confs:
558                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
559                        "%s/%s" % (proj_dir, f)):
560                    return False
561            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
562                    proj_dir):
563                return False
564            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
565            if not self.ssh_cmd(user, host,
566                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
567                    "swapexp"):
568                return False
569            return True
570        else:
571            self.log.debug("[start_segment]:unknown state %s" % state)
572            return False
573
574    def stop_segment(self, tb, eid, tbparams):
575        """
576        Stop a sub experiment by calling swapexp on the federant
577        """
578        user = tbparams[tb]['user']
579        host = tbparams[tb]['host']
580        pid = tbparams[tb]['project']
581
582        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
583        return self.ssh_cmd(user, host,
584                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
585
586       
587    def generate_ssh_keys(self, dest, type="rsa" ):
588        """
589        Generate a set of keys for the gateways to use to talk.
590
591        Keys are of type type and are stored in the required dest file.
592        """
593        valid_types = ("rsa", "dsa")
594        t = type.lower();
595        if t not in valid_types: raise ValueError
596        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
597
598        try:
599            trace = open("/dev/null", "w")
600        except IOError:
601            raise service_error(service_error.internal,
602                    "Cannot open /dev/null??");
603
604        # May raise CalledProcessError
605        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
606        rv = call(cmd, stdout=trace, stderr=trace)
607        if rv != 0:
608            raise service_error(service_error.internal, 
609                    "Cannot generate nonce ssh keys.  %s return code %d" \
610                            % (self.ssh_keygen, rv))
611
612    def gentopo(self, str):
613        """
614        Generate the topology dtat structure from the splitter's XML
615        representation of it.
616
617        The topology XML looks like:
618            <experiment>
619                <nodes>
620                    <node><vname></vname><ips>ip1:ip2</ips></node>
621                </nodes>
622                <lans>
623                    <lan>
624                        <vname></vname><vnode></vnode><ip></ip>
625                        <bandwidth></bandwidth><member>node:port</member>
626                    </lan>
627                </lans>
628        """
629        class topo_parse:
630            """
631            Parse the topology XML and create the dats structure.
632            """
633            def __init__(self):
634                # Typing of the subelements for data conversion
635                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
636                self.int_subelements = ( 'bandwidth',)
637                self.float_subelements = ( 'delay',)
638                # The final data structure
639                self.nodes = [ ]
640                self.lans =  [ ]
641                self.topo = { \
642                        'node': self.nodes,\
643                        'lan' : self.lans,\
644                    }
645                self.element = { }  # Current element being created
646                self.chars = ""     # Last text seen
647
648            def end_element(self, name):
649                # After each sub element the contents is added to the current
650                # element or to the appropriate list.
651                if name == 'node':
652                    self.nodes.append(self.element)
653                    self.element = { }
654                elif name == 'lan':
655                    self.lans.append(self.element)
656                    self.element = { }
657                elif name in self.str_subelements:
658                    self.element[name] = self.chars
659                    self.chars = ""
660                elif name in self.int_subelements:
661                    self.element[name] = int(self.chars)
662                    self.chars = ""
663                elif name in self.float_subelements:
664                    self.element[name] = float(self.chars)
665                    self.chars = ""
666
667            def found_chars(self, data):
668                self.chars += data.rstrip()
669
670
671        tp = topo_parse();
672        parser = xml.parsers.expat.ParserCreate()
673        parser.EndElementHandler = tp.end_element
674        parser.CharacterDataHandler = tp.found_chars
675
676        parser.Parse(str)
677
678        return tp.topo
679       
680
681    def genviz(self, topo):
682        """
683        Generate the visualization the virtual topology
684        """
685
686        neato = "/usr/local/bin/neato"
687        # These are used to parse neato output and to create the visualization
688        # file.
689        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
690        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
691                "%s</type></node>"
692
693        try:
694            # Node names
695            nodes = [ n['vname'] for n in topo['node'] ]
696            topo_lans = topo['lan']
697        except KeyError:
698            raise service_error(service_error.internal, "Bad topology")
699
700        lans = { }
701        links = { }
702
703        # Walk through the virtual topology, organizing the connections into
704        # 2-node connections (links) and more-than-2-node connections (lans).
705        # When a lan is created, it's added to the list of nodes (there's a
706        # node in the visualization for the lan).
707        for l in topo_lans:
708            if links.has_key(l['vname']):
709                if len(links[l['vname']]) < 2:
710                    links[l['vname']].append(l['vnode'])
711                else:
712                    nodes.append(l['vname'])
713                    lans[l['vname']] = links[l['vname']]
714                    del links[l['vname']]
715                    lans[l['vname']].append(l['vnode'])
716            elif lans.has_key(l['vname']):
717                lans[l['vname']].append(l['vnode'])
718            else:
719                links[l['vname']] = [ l['vnode'] ]
720
721
722        # Open up a temporary file for dot to turn into a visualization
723        try:
724            df, dotname = tempfile.mkstemp()
725            dotfile = os.fdopen(df, 'w')
726        except IOError:
727            raise service_error(service_error.internal,
728                    "Failed to open file in genviz")
729
730        # Generate a dot/neato input file from the links, nodes and lans
731        try:
732            print >>dotfile, "graph G {"
733            for n in nodes:
734                print >>dotfile, '\t"%s"' % n
735            for l in links.keys():
736                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
737            for l in lans.keys():
738                for n in lans[l]:
739                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
740            print >>dotfile, "}"
741            dotfile.close()
742        except TypeError:
743            raise service_error(service_error.internal,
744                    "Single endpoint link in vtopo")
745        except IOError:
746            raise service_error(service_error.internal, "Cannot write dot file")
747
748        # Use dot to create a visualization
749        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
750                '-Gpack=true', dotname], stdout=PIPE)
751
752        # Translate dot to vis format
753        vis_nodes = [ ]
754        vis = { 'node': vis_nodes }
755        for line in dot.stdout:
756            m = vis_re.match(line)
757            if m:
758                vn = m.group(1)
759                vis_node = {'name': vn, \
760                        'x': float(m.group(2)),\
761                        'y' : float(m.group(3)),\
762                    }
763                if vn in links.keys() or vn in lans.keys():
764                    vis_node['type'] = 'lan'
765                else:
766                    vis_node['type'] = 'node'
767                vis_nodes.append(vis_node)
768        rv = dot.wait()
769
770        os.remove(dotname)
771        if rv == 0 : return vis
772        else: return None
773
774    def get_access(self, tb, nodes, user, tbparam):
775        """
776        Get access to testbed through fedd and set the parameters for that tb
777        """
778
779        translate_attr = {
780            'slavenodestartcmd': 'expstart',
781            'slaveconnectorstartcmd': 'gwstart',
782            'masternodestartcmd': 'mexpstart',
783            'masterconnectorstartcmd': 'mgwstart',
784            'connectorimage': 'gwimage',
785            'connectortype': 'gwtype',
786            'tunnelcfg': 'tun',
787            'smbshare': 'smbshare',
788        }
789
790        uri = self.tbmap.get(tb, None)
791        if not uri:
792            raise service_error(serice_error.server_config, 
793                    "Unknown testbed: %s" % tb)
794
795        # The basic request
796        req = {\
797                'destinationTestbed' : { 'uri' : uri },
798                'user':  user,
799                'allocID' : { 'localname': 'test' },
800                # XXX: need to get service access stright
801                'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
802                'serviceAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ]
803            }
804       
805        # node resources if any
806        if nodes != None and len(nodes) > 0:
807            rnodes = [ ]
808            for n in nodes:
809                rn = { }
810                image, hw, count = n.split(":")
811                if image: rn['image'] = [ image ]
812                if hw: rn['hardware'] = [ hw ]
813                if count: rn['count'] = int(count)
814                rnodes.append(rn)
815            req['resources']= { }
816            req['resources']['node'] = rnodes
817
818        # No retry loop here.  Proxy servers must correctly authenticate
819        # themselves without help
820
821        try:
822            ctx = fedd_ssl_context(self.cert_file, 
823                    self.trusted_certs, password=self.cert_pwd)
824        except SSL.SSLError:
825            raise service_error(service_error.server_config, 
826                    "Server certificates misconfigured")
827
828        loc = feddServiceLocator();
829        port = loc.getfeddPortType(uri,
830                transport=M2Crypto.httpslib.HTTPSConnection, 
831                transdict={ 'ssl_context' : ctx })
832
833        # Reconstruct the full request message
834        msg = RequestAccessRequestMessage()
835        msg.set_element_RequestAccessRequestBody(
836                pack_soap(msg, "RequestAccessRequestBody", req))
837
838        try:
839            resp = port.RequestAccess(msg)
840        except ZSI.ParseException, e:
841            raise service_error(service_error.req,
842                    "Bad format message (XMLRPC??): %s" % str(e))
843        except ZSI.FaultException, e:
844            resp = e.fault.detail[0]
845
846        # Again, weird incompatibilities rear their head.  userRoles, which are
847        # restricted strings, seem to be encoded by ZSI as non-unicode strings
848        # in a way that confuses the pickling and XMLRPC sending systems.
849        # Explicitly unicoding them seems to fix this, though it concerns me
850        # some.  It may be that these things are actually a ZSI string
851        # subclass, and the character encoding is not the major issue.  In any
852        # case, making all the subclasses of basestring into unicode strings
853        # unifies the response format and solves the problem.
854        r = make_unicode(unpack_soap(resp))
855
856        if r.has_key('RequestAccessResponseBody'):
857            r = r['RequestAccessResponseBody']
858        else:
859            raise service_error(service_error.proxy,
860                    "Bad proxy response")
861
862        e = r['emulab']
863        p = e['project']
864        tbparam[tb] = { 
865                "boss": e['boss'],
866                "host": e['ops'],
867                "domain": e['domain'],
868                "fs": e['fileServer'],
869                "eventserver": e['eventServer'],
870                "project": unpack_id(p['name']),
871                "emulab" : e,
872                "allocID" : r['allocID'],
873                }
874        # Make the testbed name be the label the user applied
875        p['testbed'] = {'localname': tb }
876
877        for u in p['user']:
878            tbparam[tb]['user'] = unpack_id(u['userID'])
879
880        for a in e['fedAttr']:
881            if a['attribute']:
882                key = translate_attr.get(a['attribute'].lower(), None)
883                if key:
884                    tbparam[tb][key]= a['value']
885       
886    def release_access(self, tb, aid):
887        """
888        Release access to testbed through fedd
889        """
890
891        uri = self.tbmap.get(tb, None)
892        if not uri:
893            raise service_error(serice_error.server_config, 
894                    "Unknown testbed: %s" % tb)
895
896        # The basic request
897        req = { 'allocID' : aid }
898       
899        # No retry loop here.  Proxy servers must correctly authenticate
900        # themselves without help
901
902        try:
903            ctx = fedd_ssl_context(self.cert_file, 
904                    self.trusted_certs, password=self.cert_pwd)
905        except SSL.SSLError:
906            raise service_error(service_error.server_config, 
907                    "Server certificates misconfigured")
908
909        loc = feddServiceLocator();
910        port = loc.getfeddPortType(uri,
911                transport=M2Crypto.httpslib.HTTPSConnection, 
912                transdict={ 'ssl_context' : ctx })
913
914        # Reconstruct the full request message
915        msg = ReleaseAccessRequestMessage()
916        msg.set_element_ReleaseAccessRequestBody(
917                pack_soap(msg, "ReleaseAccessRequestBody", req))
918
919        try:
920            resp = port.ReleaseAccess(msg)
921        except ZSI.ParseException, e:
922            raise service_error(service_error.req,
923                    "Bad format message (XMLRPC??): %s" % str(e))
924        except ZSI.FaultException, e:
925            resp = e.fault.detail[0]
926
927        # better error coding
928
929
930
931    def remote_splitter(self, uri, desc, master):
932
933        req = {
934                'description' : { 'ns2description': desc },
935                'master': master,
936                'include_fedkit': bool(self.fedkit)
937            }
938
939        # No retry loop here.  Proxy servers must correctly authenticate
940        # themselves without help
941        try:
942            ctx = fedd_ssl_context(self.cert_file, 
943                    self.trusted_certs, password=self.cert_pwd)
944        except SSL.SSLError:
945            raise service_error(service_error.server_config, 
946                    "Server certificates misconfigured")
947
948        loc = feddInternalServiceLocator();
949        port = loc.getfeddInternalPortType(uri,
950                transport=M2Crypto.httpslib.HTTPSConnection, 
951                transdict={ 'ssl_context' : ctx })
952
953        # Reconstruct the full request message
954        msg = Ns2SplitRequestMessage()
955        msg.set_element_Ns2SplitRequestBody(
956                pack_soap(msg, "Ns2SplitRequestBody", req))
957
958        try:
959            resp = port.Ns2Split(msg)
960        except ZSI.ParseException, e:
961            raise service_error(service_error.req,
962                    "Bad format message (XMLRPC??): %s" %
963                    str(e))
964        r = unpack_soap(resp)
965        if r.has_key('Ns2SplitResponseBody'):
966            r = r['Ns2SplitResponseBody']
967            if r.has_key('output'):
968                return r['output'].splitlines()
969            else:
970                raise service_error(service_error.proxy, 
971                        "Bad splitter response (no output)")
972        else:
973            raise service_error(service_error.proxy, "Bad splitter response")
974       
975    class current_testbed:
976        """
977        Object for collecting the current testbed description.  The testbed
978        description is saved to a file with the local testbed variables
979        subsittuted line by line.
980        """
981        def __init__(self, eid, tmpdir, fedkit):
982            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
983            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
984            self.current_testbed = None
985            self.testbed_file = None
986
987            self.def_expstart = \
988                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
989            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
990            self.def_gwstart = \
991                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
992            self.def_mgwstart = \
993                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
994            self.def_gwimage = "FBSD61-TUNNEL2";
995            self.def_gwtype = "pc";
996
997            self.eid = eid
998            self.tmpdir = tmpdir
999            self.fedkit = fedkit
1000
1001        def __call__(self, line, master, allocated, tbparams):
1002            # Capture testbed topology descriptions
1003            if self.current_testbed == None:
1004                m = self.begin_testbed.match(line)
1005                if m != None:
1006                    self.current_testbed = m.group(1)
1007                    if self.current_testbed == None:
1008                        raise service_error(service_error.req,
1009                                "Bad request format (unnamed testbed)")
1010                    allocated[self.current_testbed] = \
1011                            allocated.get(self.current_testbed,0) + 1
1012                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1013                    if not os.path.exists(tb_dir):
1014                        try:
1015                            os.mkdir(tb_dir)
1016                        except IOError:
1017                            raise service_error(service_error.internal,
1018                                    "Cannot create %s" % tb_dir)
1019                    try:
1020                        self.testbed_file = open("%s/%s.%s.tcl" %
1021                                (tb_dir, self.eid, self.current_testbed), 'w')
1022                    except IOError:
1023                        self.testbed_file = None
1024                    return True
1025                else: return False
1026            else:
1027                m = self.end_testbed.match(line)
1028                if m != None:
1029                    if m.group(1) != self.current_testbed:
1030                        raise service_error(service_error.internal, 
1031                                "Mismatched testbed markers!?")
1032                    if self.testbed_file != None: 
1033                        self.testbed_file.close()
1034                        self.testbed_file = None
1035                    self.current_testbed = None
1036                elif self.testbed_file:
1037                    # Substitute variables and put the line into the local
1038                    # testbed file.
1039                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1040                            self.def_gwtype)
1041                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1042                            self.def_gwimage)
1043                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1044                            self.def_mgwstart)
1045                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1046                            self.def_mexpstart)
1047                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1048                            self.def_gwstart)
1049                    expstart = tbparams[self.current_testbed].get('expstart', 
1050                            self.def_expstart)
1051                    project = tbparams[self.current_testbed].get('project')
1052                    line = re.sub("GWTYPE", gwtype, line)
1053                    line = re.sub("GWIMAGE", gwimage, line)
1054                    if self.current_testbed == master:
1055                        line = re.sub("GWSTART", mgwstart, line)
1056                        line = re.sub("EXPSTART", mexpstart, line)
1057                    else:
1058                        line = re.sub("GWSTART", gwstart, line)
1059                        line = re.sub("EXPSTART", expstart, line)
1060                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1061                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1062                    line = re.sub("EID", self.eid, line)
1063                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1064                            (project, self.eid), line)
1065                    if self.fedkit:
1066                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1067                                line)
1068                    print >>self.testbed_file, line
1069                return True
1070
1071    class allbeds:
1072        """
1073        Process the Allbeds section.  Get access to each federant and save the
1074        parameters in tbparams
1075        """
1076        def __init__(self, get_access):
1077            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1078            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1079            self.in_allbeds = False
1080            self.get_access = get_access
1081
1082        def __call__(self, line, user, tbparams):
1083            # Testbed access parameters
1084            if not self.in_allbeds:
1085                if self.begin_allbeds.match(line):
1086                    self.in_allbeds = True
1087                    return True
1088                else:
1089                    return False
1090            else:
1091                if self.end_allbeds.match(line):
1092                    self.in_allbeds = False
1093                else:
1094                    nodes = line.split('|')
1095                    tb = nodes.pop(0)
1096                    self.get_access(tb, nodes, user, tbparams)
1097                return True
1098
1099    class gateways:
1100        def __init__(self, eid, master, tmpdir, gw_pubkey,
1101                gw_secretkey, copy_file, fedkit):
1102            self.begin_gateways = \
1103                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1104            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1105            self.current_gateways = None
1106            self.control_gateway = None
1107            self.active_end = { }
1108
1109            self.eid = eid
1110            self.master = master
1111            self.tmpdir = tmpdir
1112            self.gw_pubkey_base = gw_pubkey
1113            self.gw_secretkey_base = gw_secretkey
1114
1115            self.copy_file = copy_file
1116            self.fedkit = fedkit
1117
1118
1119        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1120                active_end, tbparams, dtb, myname, desthost, type):
1121            """
1122            Produce a gateway configuration file from a gateways line.
1123            """
1124
1125            sproject = tbparams[gw].get('project', 'project')
1126            dproject = tbparams[dtb].get('project', 'project')
1127            sdomain = ".%s.%s%s" % (eid, sproject,
1128                    tbparams[gw].get('domain', ".example.com"))
1129            ddomain = ".%s.%s%s" % (eid, dproject,
1130                    tbparams[dtb].get('domain', ".example.com"))
1131            boss = tbparams[master].get('boss', "boss")
1132            fs = tbparams[master].get('fs', "fs")
1133            event_server = "%s%s" % \
1134                    (tbparams[gw].get('eventserver', "event_server"),
1135                            tbparams[gw].get('domain', "example.com"))
1136            remote_event_server = "%s%s" % \
1137                    (tbparams[dtb].get('eventserver', "event_server"),
1138                            tbparams[dtb].get('domain', "example.com"))
1139            seer_control = "%s%s" % \
1140                    (tbparams[gw].get('control', "control"), sdomain)
1141
1142            if self.fedkit:
1143                remote_script_dir = "/usr/local/federation/bin"
1144                local_script_dir = "/usr/local/federation/bin"
1145            else:
1146                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1147                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1148
1149            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1150            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1151            tunnel_cfg = tbparams[gw].get("tun", "false")
1152
1153            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1154            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1155
1156            # translate to lower case so the `hostname` hack for specifying
1157            # configuration files works.
1158            conf_file = conf_file.lower();
1159            remote_conf_file = remote_conf_file.lower();
1160
1161            if dtb == master:
1162                active = "false"
1163            elif gw == master:
1164                active = "true"
1165            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1166                active = "false"
1167            else:
1168                active_end['%s-%s' % (gw, dtb)] = 1
1169                active = "true"
1170
1171            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1172            print >>gwconfig, "Active: %s" % active
1173            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1174            print >>gwconfig, "BossName: %s" % boss
1175            print >>gwconfig, "FsName: %s" % fs
1176            print >>gwconfig, "EventServerName: %s" % event_server
1177            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1178            print >>gwconfig, "SeerControl: %s" % seer_control
1179            print >>gwconfig, "Type: %s" % type
1180            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1181            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1182                    local_script_dir
1183            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1184            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1185            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1186                    (remote_conf_dir, remote_conf_file)
1187            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1188            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1189            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1190            gwconfig.close()
1191
1192            return active == "true"
1193
1194        def __call__(self, line, allocated, tbparams):
1195            # Process gateways
1196            if not self.current_gateways:
1197                m = self.begin_gateways.match(line)
1198                if m:
1199                    self.current_gateways = m.group(1)
1200                    if allocated.has_key(self.current_gateways):
1201                        # This test should always succeed
1202                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1203                        if not os.path.exists(tb_dir):
1204                            try:
1205                                os.mkdir(tb_dir)
1206                            except IOError:
1207                                raise service_error(service_error.internal,
1208                                        "Cannot create %s" % tb_dir)
1209                    else:
1210                        # XXX
1211                        self.log.error("[gateways]: Ignoring gateways for " + \
1212                                "unknown testbed %s" % self.current_gateways)
1213                        self.current_gateways = None
1214                    return True
1215                else:
1216                    return False
1217            else:
1218                m = self.end_gateways.match(line)
1219                if m :
1220                    if m.group(1) != self.current_gateways:
1221                        raise service_error(service_error.internal,
1222                                "Mismatched gateway markers!?")
1223                    if self.control_gateway:
1224                        try:
1225                            cc = open("%s/%s/client.conf" %
1226                                    (self.tmpdir, self.current_gateways), 'w')
1227                            print >>cc, "ControlGateway: %s" % \
1228                                    self.control_gateway
1229                            if tbparams[self.master].has_key('smbshare'):
1230                                print >>cc, "SMBSHare: %s" % \
1231                                        tbparams[self.master]['smbshare']
1232                            print >>cc, "ProjectUser: %s" % \
1233                                    tbparams[self.master]['user']
1234                            print >>cc, "ProjectName: %s" % \
1235                                    tbparams[self.master]['project']
1236                            cc.close()
1237                        except IOError:
1238                            raise service_error(service_error.internal,
1239                                    "Error creating client config")
1240                        try:
1241                            cc = open("%s/%s/seer.conf" %
1242                                    (self.tmpdir, self.current_gateways),
1243                                    'w')
1244                            if self.current_gateways != self.master:
1245                                print >>cc, "ControlNode: %s" % \
1246                                        self.control_gateway
1247                            print >>cc, "ExperimentID: %s/%s" % \
1248                                    ( tbparams[self.master]['project'], \
1249                                    self.eid )
1250                            cc.close()
1251                        except IOError:
1252                            raise service_error(service_error.internal,
1253                                    "Error creating seer config")
1254                    else:
1255                        debug.error("[gateways]: No control gateway for %s" %\
1256                                    self.current_gateways)
1257                    self.current_gateways = None
1258                else:
1259                    dtb, myname, desthost, type = line.split(" ")
1260
1261                    if type == "control" or type == "both":
1262                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1263                                self.eid, 
1264                                tbparams[self.current_gateways]['project'],
1265                                tbparams[self.current_gateways]['domain'])
1266                    try:
1267                        active = self.gateway_conf_file(self.current_gateways,
1268                                self.master, self.eid, self.gw_pubkey_base,
1269                                self.gw_secretkey_base,
1270                                self.active_end, tbparams, dtb, myname,
1271                                desthost, type)
1272                    except IOError, e:
1273                        raise service_error(service_error.internal,
1274                                "Failed to write config file for %s" % \
1275                                        self.current_gateway)
1276           
1277                    gw_pubkey = "%s/keys/%s" % \
1278                            (self.tmpdir, self.gw_pubkey_base)
1279                    gw_secretkey = "%s/keys/%s" % \
1280                            (self.tmpdir, self.gw_secretkey_base)
1281
1282                    pkfile = "%s/%s/%s" % \
1283                            ( self.tmpdir, self.current_gateways, 
1284                                    self.gw_pubkey_base)
1285                    skfile = "%s/%s/%s" % \
1286                            ( self.tmpdir, self.current_gateways, 
1287                                    self.gw_secretkey_base)
1288
1289                    if not os.path.exists(pkfile):
1290                        try:
1291                            self.copy_file(gw_pubkey, pkfile)
1292                        except IOError:
1293                            service_error(service_error.internal,
1294                                    "Failed to copy pubkey file")
1295
1296                    if active and not os.path.exists(skfile):
1297                        try:
1298                            self.copy_file(gw_secretkey, skfile)
1299                        except IOError:
1300                            service_error(service_error.internal,
1301                                    "Failed to copy secretkey file")
1302                return True
1303
1304    class shunt_to_file:
1305        """
1306        Simple class to write data between two regexps to a file.
1307        """
1308        def __init__(self, begin, end, filename):
1309            """
1310            Begin shunting on a match of begin, stop on end, send data to
1311            filename.
1312            """
1313            self.begin = re.compile(begin)
1314            self.end = re.compile(end)
1315            self.in_shunt = False
1316            self.file = None
1317            self.filename = filename
1318
1319        def __call__(self, line):
1320            """
1321            Call this on each line in the input that may be shunted.
1322            """
1323            if not self.in_shunt:
1324                if self.begin.match(line):
1325                    self.in_shunt = True
1326                    try:
1327                        self.file = open(self.filename, "w")
1328                    except:
1329                        self.file = None
1330                        raise
1331                    return True
1332                else:
1333                    return False
1334            else:
1335                if self.end.match(line):
1336                    if self.file: 
1337                        self.file.close()
1338                        self.file = None
1339                    self.in_shunt = False
1340                else:
1341                    if self.file:
1342                        print >>self.file, line
1343                return True
1344
1345    class shunt_to_list:
1346        """
1347        Same interface as shunt_to_file.  Data collected in self.list, one list
1348        element per line.
1349        """
1350        def __init__(self, begin, end):
1351            self.begin = re.compile(begin)
1352            self.end = re.compile(end)
1353            self.in_shunt = False
1354            self.list = [ ]
1355       
1356        def __call__(self, line):
1357            if not self.in_shunt:
1358                if self.begin.match(line):
1359                    self.in_shunt = True
1360                    return True
1361                else:
1362                    return False
1363            else:
1364                if self.end.match(line):
1365                    self.in_shunt = False
1366                else:
1367                    self.list.append(line)
1368                return True
1369
1370    class shunt_to_string:
1371        """
1372        Same interface as shunt_to_file.  Data collected in self.str, all in
1373        one string.
1374        """
1375        def __init__(self, begin, end):
1376            self.begin = re.compile(begin)
1377            self.end = re.compile(end)
1378            self.in_shunt = False
1379            self.str = ""
1380       
1381        def __call__(self, line):
1382            if not self.in_shunt:
1383                if self.begin.match(line):
1384                    self.in_shunt = True
1385                    return True
1386                else:
1387                    return False
1388            else:
1389                if self.end.match(line):
1390                    self.in_shunt = False
1391                else:
1392                    self.str += line
1393                return True
1394
1395    def create_experiment(self, req, fid):
1396        """
1397        The external interface to experiment creation called from the
1398        dispatcher.
1399
1400        Creates a working directory, splits the incoming description using the
1401        splitter script and parses out the avrious subsections using the
1402        lcasses above.  Once each sub-experiment is created, use pooled threads
1403        to instantiate them and start it all up.
1404        """
1405        try:
1406            tmpdir = tempfile.mkdtemp(prefix="split-")
1407        except IOError:
1408            raise service_error(service_error.internal, "Cannot create tmp dir")
1409
1410        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1411        gw_secretkey_base = "fed.%s" % self.ssh_type
1412        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1413        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1414        tclfile = tmpdir + "/experiment.tcl"
1415        tbparams = { }
1416
1417        pid = "dummy"
1418        gid = "dummy"
1419        # XXX
1420        fail_soft = False
1421
1422        try:
1423            os.mkdir(tmpdir+"/keys")
1424        except OSError:
1425            raise service_error(service_error.internal,
1426                    "Can't make temporary dir")
1427
1428        req = req.get('CreateRequestBody', None)
1429        if not req:
1430            raise service_error(service_error.req,
1431                    "Bad request format (no CreateRequestBody)")
1432        # The tcl parser needs to read a file so put the content into that file
1433        descr=req.get('experimentdescription', None)
1434        if descr:
1435            file_content=descr.get('ns2description', None)
1436            if file_content:
1437                try:
1438                    f = open(tclfile, 'w')
1439                    f.write(file_content)
1440                    f.close()
1441                except IOError:
1442                    raise service_error(service_error.internal,
1443                            "Cannot write temp experiment description")
1444            else:
1445                raise service_error(service_error.req, 
1446                        "Only ns2descriptions supported")
1447        else:
1448            raise service_error(service_error.req, "No experiment description")
1449
1450        if req.has_key('experimentID') and \
1451                req['experimentID'].has_key('localname'):
1452            eid = req['experimentID']['localname']
1453            self.state_lock.acquire()
1454            while (self.state.has_key(eid)):
1455                eid += random.choice(string.ascii_letters)
1456            # To avoid another thread picking this localname
1457            self.state[eid] = "placeholder"
1458            self.state_lock.release()
1459        else:
1460            eid = self.exp_stem
1461            for i in range(0,5):
1462                eid += random.choice(string.ascii_letters)
1463            self.state_lock.acquire()
1464            while (self.state.has_key(eid)):
1465                eid = self.exp_stem
1466                for i in range(0,5):
1467                    eid += random.choice(string.ascii_letters)
1468            # To avoid another thread picking this localname
1469            self.state[eid] = "placeholder"
1470            self.state_lock.release()
1471
1472        try:
1473            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1474        except ValueError:
1475            raise service_error(service_error.server_config, 
1476                    "Bad key type (%s)" % self.ssh_type)
1477
1478        user = req.get('user', None)
1479        if user == None:
1480            raise service_error(service_error.req, "No user")
1481
1482        master = req.get('master', None)
1483        if master == None:
1484            raise service_error(service_error.req, "No master testbed label")
1485       
1486        if self.splitter_url:
1487            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1488            split_data = self.remote_splitter(self.splitter_url, file_content,
1489                    master)
1490        else:
1491            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1492                str(self.muxmax), '-m', master]
1493
1494            if self.fedkit:
1495                tclcmd.append('-k')
1496
1497            tclcmd.extend([pid, gid, eid, tclfile])
1498
1499            self.log.debug("running local splitter %s", " ".join(tclcmd))
1500            tclparser = Popen(tclcmd, stdout=PIPE)
1501            split_data = tclparser.stdout
1502
1503        allocated = { }     # Testbeds we can access
1504        started = { }       # Testbeds where a sub-experiment started
1505                            # successfully
1506
1507        # Objects to parse the splitter output (defined above)
1508        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1509        parse_allbeds = self.allbeds(self.get_access)
1510        parse_gateways = self.gateways(eid, master, tmpdir,
1511                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1512        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1513                    "^#\s+End\s+Vtopo")
1514        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1515                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1516        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1517                "^#\s+End\s+tarfiles")
1518        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1519                "^#\s+End\s+rpms")
1520
1521        # Worling on the split data
1522        for line in split_data:
1523            line = line.rstrip()
1524            if parse_current_testbed(line, master, allocated, tbparams):
1525                continue
1526            elif parse_allbeds(line, user, tbparams):
1527                continue
1528            elif parse_gateways(line, allocated, tbparams):
1529                continue
1530            elif parse_vtopo(line):
1531                continue
1532            elif parse_hostnames(line):
1533                continue
1534            elif parse_tarfiles(line):
1535                continue
1536            elif parse_rpms(line):
1537                continue
1538            else:
1539                raise service_error(service_error.internal, 
1540                        "Bad tcl parse? %s" % line)
1541
1542        # Virtual topology and visualization
1543        vtopo = self.gentopo(parse_vtopo.str)
1544        if not vtopo:
1545            raise service_error(service_error.internal, 
1546                    "Failed to generate virtual topology")
1547
1548        vis = self.genviz(vtopo)
1549        if not vis:
1550            raise service_error(service_error.internal, 
1551                    "Failed to generate visualization")
1552
1553        # save federant information
1554        for k in allocated.keys():
1555            tbparams[k]['federant'] = {\
1556                    'name': [ { 'localname' : eid} ],\
1557                    'emulab': tbparams[k]['emulab'],\
1558                    'allocID' : tbparams[k]['allocID'],\
1559                    'master' : k == master,\
1560                }
1561
1562
1563        # Copy tarfiles and rpms needed at remote sites into a staging area
1564        try:
1565            if self.fedkit:
1566                parse_tarfiles.list.append(self.fedkit)
1567            for t in parse_tarfiles.list:
1568                if not os.path.exists("%s/tarfiles" % tmpdir):
1569                    os.mkdir("%s/tarfiles" % tmpdir)
1570                self.copy_file(t, "%s/tarfiles/%s" % \
1571                        (tmpdir, os.path.basename(t)))
1572            for r in parse_rpms.list:
1573                if not os.path.exists("%s/rpms" % tmpdir):
1574                    os.mkdir("%s/rpms" % tmpdir)
1575                self.copy_file(r, "%s/rpms/%s" % \
1576                        (tmpdir, os.path.basename(r)))
1577        except IOError, e:
1578            raise service_error(service_error.internal, 
1579                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1580
1581        thread_pool_info = self.thread_pool()
1582        threads = [ ]
1583
1584        for tb in [ k for k in allocated.keys() if k != master]:
1585            # Wait until we have a free slot to start the next testbed load
1586            thread_pool_info.acquire()
1587            while thread_pool_info.started - \
1588                    thread_pool_info.terminated >= self.nthreads:
1589                thread_pool_info.wait()
1590            thread_pool_info.release()
1591
1592            # Create and start a thread to start the segment, and save it to
1593            # get the return value later
1594            t  = self.pooled_thread(target=self.start_segment, 
1595                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1596                    pdata=thread_pool_info, trace_file=self.trace_file)
1597            threads.append(t)
1598            t.start()
1599
1600        # Wait until all finish (the first clause of the while is to make sure
1601        # one starts)
1602        thread_pool_info.acquire()
1603        while thread_pool_info.started == 0 or \
1604                thread_pool_info.started > thread_pool_info.terminated:
1605            thread_pool_info.wait()
1606        thread_pool_info.release()
1607
1608        # If none failed, start the master
1609        failed = [ t.getName() for t in threads if not t.rv ]
1610
1611        if len(failed) == 0:
1612            if not self.start_segment(master, eid, tbparams, tmpdir):
1613                failed.append(master)
1614
1615        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1616        # If one failed clean up, unless fail_soft is set
1617        if failed:
1618            if not fail_soft:
1619                for tb in succeeded:
1620                    self.stop_segment(tb, eid, tbparams)
1621                # Remove the placeholder
1622                self.state_lock.acquire()
1623                del self.state[eid]
1624                self.state_lock.release()
1625
1626                raise service_error(service_error.federant,
1627                    "Swap in failed on %s" % ",".join(failed))
1628        else:
1629            self.log.info("[start_segment]: Experiment %s started" % eid)
1630
1631        # Generate an ID for the experiment (slice) and a certificate that the
1632        # allocator can use to prove they own it.  We'll ship it back through
1633        # the encrypted connection.
1634        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1635
1636        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1637
1638        # Walk up tmpdir, deleting as we go
1639        for path, dirs, files in os.walk(tmpdir, topdown=False):
1640            for f in files:
1641                os.remove(os.path.join(path, f))
1642            for d in dirs:
1643                os.rmdir(os.path.join(path, d))
1644        os.rmdir(tmpdir)
1645
1646        resp = { 'federant' : [ tbparams[tb]['federant'] \
1647                for tb in tbparams.keys() \
1648                    if tbparams[tb].has_key('federant') ],\
1649                    'vtopo': vtopo,\
1650                    'vis' : vis,
1651                    'experimentID' : [\
1652                            { 'fedid': copy.copy(expid) }, \
1653                            { 'localname': eid },\
1654                        ],\
1655                    'experimentAccess': { 'X509' : expcert },\
1656                }
1657
1658        # Insert the experiment into our state and update the disk copy
1659        self.state_lock.acquire()
1660        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1661                for tb in tbparams.keys() \
1662                    if tbparams[tb].has_key('federant') ],\
1663                    'vtopo': vtopo,\
1664                    'vis' : vis,
1665                    'experimentID' : [\
1666                            { 'fedid': expid }, { 'localname': eid },\
1667                        ],\
1668                }
1669        self.state[eid] = self.state[expid]
1670        if self.state_filename: self.write_state()
1671        self.state_lock.release()
1672
1673        if not failed:
1674            return resp
1675        else:
1676            raise service_error(service_error.partial, \
1677                    "Partial swap in on %s" % ",".join(succeeded))
1678
1679
1680    def get_vtopo(self, req, fid):
1681        """
1682        Return the stored virtual topology for this experiment
1683        """
1684        rv = None
1685
1686        req = req.get('VtopoRequestBody', None)
1687        if not req:
1688            raise service_error(service_error.req,
1689                    "Bad request format (no VtopoRequestBody)")
1690        exp = req.get('experiment', None)
1691        if exp:
1692            if exp.has_key('fedid'):
1693                key = fedid(bits=exp['fedid'])
1694                keytype = "fedid"
1695            elif exp.has_key('localname'):
1696                key = exp['localname']
1697                keytype = "localname"
1698            else:
1699                raise service_error(service_error.req, "Unknown lookup type")
1700        else:
1701            raise service_error(service_error.req, "No request?")
1702
1703        self.state_lock.acquire()
1704        if self.state.has_key(key):
1705            rv = { 'experiment' : {keytype: key },\
1706                    'vtopo': self.state[key]['vtopo'],\
1707                }
1708        self.state_lock.release()
1709
1710        if rv: return rv
1711        else: raise service_error(service_error.req, "No such experiment")
1712
1713    def get_vis(self, req, fid):
1714        """
1715        Return the stored visualization for this experiment
1716        """
1717        rv = None
1718
1719        req = req.get('VisRequestBody', None)
1720        if not req:
1721            raise service_error(service_error.req,
1722                    "Bad request format (no VisRequestBody)")
1723        exp = req.get('experiment', None)
1724        if exp:
1725            if exp.has_key('fedid'):
1726                key = fedid(bits=exp['fedid'])
1727                keytype = "fedid"
1728            elif exp.has_key('localname'):
1729                key = exp['localname']
1730                keytype = "localname"
1731            else:
1732                raise service_error(service_error.req, "Unknown lookup type")
1733        else:
1734            raise service_error(service_error.req, "No request?")
1735
1736        self.state_lock.acquire()
1737        if self.state.has_key(key):
1738            rv =  { 'experiment' : {keytype: key },\
1739                    'vis': self.state[key]['vis'],\
1740                    }
1741        self.state_lock.release()
1742
1743        if rv: return rv
1744        else: raise service_error(service_error.req, "No such experiment")
1745
1746    def get_info(self, req, fid):
1747        """
1748        Return all the stored info about this experiment
1749        """
1750        rv = None
1751
1752        req = req.get('InfoRequestBody', None)
1753        if not req:
1754            raise service_error(service_error.req,
1755                    "Bad request format (no VisRequestBody)")
1756        exp = req.get('experiment', None)
1757        if exp:
1758            if exp.has_key('fedid'):
1759                key = fedid(bits=exp['fedid'])
1760                keytype = "fedid"
1761            elif exp.has_key('localname'):
1762                key = exp['localname']
1763                keytype = "localname"
1764            else:
1765                raise service_error(service_error.req, "Unknown lookup type")
1766        else:
1767            raise service_error(service_error.req, "No request?")
1768
1769        # The state may be massaged by the service function that called
1770        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1771        # state.
1772        self.state_lock.acquire()
1773        if self.state.has_key(key):
1774            rv = copy.deepcopy(self.state[key])
1775        self.state_lock.release()
1776
1777        if rv: return rv
1778        else: raise service_error(service_error.req, "No such experiment")
1779
1780
1781    def terminate_experiment(self, req, fid):
1782        """
1783        Swap this experiment out on the federants and delete the shared
1784        information
1785        """
1786        tbparams = { }
1787        req = req.get('TerminateRequestBody', None)
1788        if not req:
1789            raise service_error(service_error.req,
1790                    "Bad request format (no TerminateRequestBody)")
1791        exp = req.get('experiment', None)
1792        if exp:
1793            if exp.has_key('fedid'):
1794                key = fedid(bits=exp['fedid'])
1795                keytype = "fedid"
1796            elif exp.has_key('localname'):
1797                key = exp['localname']
1798                keytype = "localname"
1799            else:
1800                raise service_error(service_error.req, "Unknown lookup type")
1801        else:
1802            raise service_error(service_error.req, "No request?")
1803
1804        self.state_lock.acquire()
1805        fed_exp = self.state.get(key, None)
1806
1807        if fed_exp:
1808            # This branch of the conditional holds the lock to generate a
1809            # consistent temporary tbparams variable to deallocate experiments.
1810            # It releases the lock to do the deallocations and reacquires it to
1811            # remove the experiment state when the termination is complete.
1812            ids = []
1813            #  experimentID is a list of dicts that are self-describing
1814            #  identifiers.  This finds all the fedids and localnames - the
1815            #  keys of self.state - and puts them into ids.
1816            for id in fed_exp.get('experimentID', []):
1817                if id.has_key('fedid'): ids.append(id['fedid'])
1818                if id.has_key('localname'): ids.append(id['localname'])
1819
1820            # Construct enough of the tbparams to make the stop_segment calls
1821            # work
1822            for fed in fed_exp['federant']:
1823                try:
1824                    for e in fed['name']:
1825                        eid = e.get('localname', None)
1826                        if eid: break
1827                    else:
1828                        continue
1829
1830                    p = fed['emulab']['project']
1831
1832                    project = p['name']['localname']
1833                    tb = p['testbed']['localname']
1834                    user = p['user'][0]['userID']['localname']
1835
1836                    domain = fed['emulab']['domain']
1837                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1838                    aid = fed['allocID']
1839                except KeyError, e:
1840                    continue
1841                tbparams[tb] = {\
1842                        'user': user,\
1843                        'domain': domain,\
1844                        'project': project,\
1845                        'host': host,\
1846                        'eid': eid,\
1847                        'aid': aid,\
1848                    }
1849            self.state_lock.release()
1850
1851            # Stop everyone.
1852            for tb in tbparams.keys():
1853                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1854
1855            # release the allocations
1856            for tb in tbparams.keys():
1857                self.release_access(tb, tbparams[tb]['aid'])
1858
1859            # Remove the terminated experiment
1860            self.state_lock.acquire()
1861            for id in ids:
1862                if self.state.has_key(id): del self.state[id]
1863
1864            if self.state_filename: self.write_state()
1865            self.state_lock.release()
1866
1867            return { 'experiment': exp }
1868        else:
1869            # Don't forget to release the lock
1870            self.state_lock.release()
1871            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.