source: fedd/federation/experiment_control.py @ 5a6b75b

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 5a6b75b was 5fffd82, checked in by Ted Faber <faber@…>, 16 years ago

Call access control routines locally rather than trying to remotely access this
process. Currently calling a remote service on ourselves seems to tangle the
threading up in knots. This should remove that problem.

  • Property mode set to 100644
File size: 60.1 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = None
152        self.cert_pwd = None
153        self.trusted_certs = None
154
155        # Walk through the various relevant certificat specifying config
156        # attributes until the local certificate attributes can be resolved.
157        # The walk is from most specific to most general specification.
158        for s in ("experiment_control", "globals"):
159            if config.has_section(s): 
160                if config.has_option(s, "cert_file"):
161                    if not self.cert_file:
162                        self.cert_file = config.get(s, "cert_file")
163                        self.cert_pwd = config.get(s, "cert_pwd")
164
165                if config.has_option(s, "trusted_certs"):
166                    if not self.trusted_certs:
167                        self.trusted_certs = config.get(s, "trusted_certs")
168
169
170        self.exp_stem = "fed-stem"
171        self.log = logging.getLogger("fedd.experiment_control")
172        set_log_level(config, "experiment_control", self.log)
173        self.muxmax = 2
174        self.nthreads = 2
175        self.randomize_experiments = False
176
177        self.scp_exec = "/usr/bin/scp"
178        self.splitter = None
179        self.ssh_exec="/usr/bin/ssh"
180        self.ssh_keygen = "/usr/bin/ssh-keygen"
181        self.ssh_identity_file = None
182
183
184        self.debug = config.getboolean("experiment_control", "create_debug")
185        self.state_filename = config.get("experiment_control", 
186                "experiment_state_file")
187        self.splitter_url = config.get("experiment_control", "splitter_url")
188        self.fedkit = config.get("experiment_control", "fedkit")
189        accessdb_file = config.get("experiment_control", "accessdb")
190
191        self.ssh_pubkey_file = config.get("experiment_control", 
192                "ssh_pubkey_file")
193        self.ssh_privkey_file = config.get("experiment_control",
194                "ssh_privkey_file")
195        # NB for internal master/slave ops, not experiment setup
196        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
197        self.state = { }
198        self.state_lock = Lock()
199        self.tclsh = "/usr/local/bin/otclsh"
200        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
201                config.get("experiment_control", "tcl_splitter",
202                        "/usr/testbed/lib/ns2ir/parse.tcl")
203        mapdb_file = config.get("experiment_control", "mapdb")
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
208                "/tmp/federate";
209        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
210                "FEDDIR/hosts";
211        self.def_gwstart = \
212                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
213                "/tmp/bridge.log";
214        self.def_mgwstart = \
215                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
216                "/tmp/bridge.log";
217        self.def_gwimage = "FBSD61-TUNNEL2";
218        self.def_gwtype = "pc";
219        self.local_access = { }
220
221        if auth:
222            self.auth = auth
223        else:
224            self.log.error(\
225                    "[access]: No authorizer initialized, creating local one.")
226            auth = authorizer()
227
228
229        if self.ssh_pubkey_file:
230            try:
231                f = open(self.ssh_pubkey_file, 'r')
232                self.ssh_pubkey = f.read()
233                f.close()
234            except IOError:
235                raise service_error(service_error.internal,
236                        "Cannot read sshpubkey")
237        else:
238            raise service_error(service_error.internal, 
239                    "No SSH public key file?")
240
241        if not self.ssh_privkey_file:
242            raise service_error(service_error.internal, 
243                    "No SSH public key file?")
244
245
246        if mapdb_file:
247            self.read_mapdb(mapdb_file)
248        else:
249            self.log.warn("[experiment_control] No testbed map, using defaults")
250            self.tbmap = { 
251                    'deter':'https://users.isi.deterlab.net:23235',
252                    'emulab':'https://users.isi.deterlab.net:23236',
253                    'ucb':'https://users.isi.deterlab.net:23237',
254                    }
255
256        if accessdb_file:
257                self.read_accessdb(accessdb_file)
258        else:
259            raise service_error(service_error.internal,
260                    "No accessdb specified in config")
261
262        # Grab saved state.  OK to do this w/o locking because it's read only
263        # and only one thread should be in existence that can see self.state at
264        # this point.
265        if self.state_filename:
266            self.read_state()
267
268        # Dispatch tables
269        self.soap_services = {\
270                'Create': soap_handler('Create', self.create_experiment),
271                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
272                'Vis': soap_handler('Vis', self.get_vis),
273                'Info': soap_handler('Info', self.get_info),
274                'Terminate': soap_handler('Terminate', 
275                    self.terminate_experiment),
276        }
277
278        self.xmlrpc_services = {\
279                'Create': xmlrpc_handler('Create', self.create_experiment),
280                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
281                'Vis': xmlrpc_handler('Vis', self.get_vis),
282                'Info': xmlrpc_handler('Info', self.get_info),
283                'Terminate': xmlrpc_handler('Terminate',
284                    self.terminate_experiment),
285        }
286
287    def copy_file(self, src, dest, size=1024):
288        """
289        Exceedingly simple file copy.
290        """
291        s = open(src,'r')
292        d = open(dest, 'w')
293
294        buf = "x"
295        while buf != "":
296            buf = s.read(size)
297            d.write(buf)
298        s.close()
299        d.close()
300
301    # Call while holding self.state_lock
302    def write_state(self):
303        """
304        Write a new copy of experiment state after copying the existing state
305        to a backup.
306
307        State format is a simple pickling of the state dictionary.
308        """
309        if os.access(self.state_filename, os.W_OK):
310            self.copy_file(self.state_filename, \
311                    "%s.bak" % self.state_filename)
312        try:
313            f = open(self.state_filename, 'w')
314            pickle.dump(self.state, f)
315        except IOError, e:
316            self.log.error("Can't write file %s: %s" % \
317                    (self.state_filename, e))
318        except pickle.PicklingError, e:
319            self.log.error("Pickling problem: %s" % e)
320        except TypeError, e:
321            self.log.error("Pickling problem (TypeError): %s" % e)
322
323    # Call while holding self.state_lock
324    def read_state(self):
325        """
326        Read a new copy of experiment state.  Old state is overwritten.
327
328        State format is a simple pickling of the state dictionary.
329        """
330        try:
331            f = open(self.state_filename, "r")
332            self.state = pickle.load(f)
333            self.log.debug("[read_state]: Read state from %s" % \
334                    self.state_filename)
335        except IOError, e:
336            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
337                    % (self.state_filename, e))
338        except pickle.UnpicklingError, e:
339            self.log.warning(("[read_state]: No saved state: " + \
340                    "Unpickling failed: %s") % e)
341       
342        for k in self.state.keys():
343            try:
344                # This list should only have one element in it, but phrasing it
345                # as a for loop doesn't cost much, really.  We have to find the
346                # fedid elements anyway.
347                for eid in [ f['fedid'] \
348                        for f in self.state[k]['experimentID']\
349                            if f.has_key('fedid') ]:
350                    self.auth.set_attribute(self.state[k]['owner'], eid)
351            except KeyError, e:
352                self.log.warning("[read_state]: State ownership or identity " +\
353                        "misformatted in %s: %s" % (self.state_filename, e))
354
355
356    def read_accessdb(self, accessdb_file):
357        """
358        Read the mapping from fedids that can create experiments to their name
359        in the 3-level access namespace.  All will be asserted from this
360        testbed and can include the local username and porject that will be
361        asserted on their behalf by this fedd.  Each fedid is also added to the
362        authorization system with the "create" attribute.
363        """
364        self.accessdb = {}
365        # These are the regexps for parsing the db
366        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
367        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
368                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
369        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
370                "\s*->\s*(" + name_expr + ")\s*$")
371        lineno = 0
372
373        # Parse the mappings and store in self.authdb, a dict of
374        # fedid -> (proj, user)
375        try:
376            f = open(accessdb_file, "r")
377            for line in f:
378                lineno += 1
379                line = line.strip()
380                if len(line) == 0 or line.startswith('#'):
381                    continue
382                m = project_line.match(line)
383                if m:
384                    fid = fedid(hexstr=m.group(1))
385                    project, user = m.group(2,3)
386                    if not self.accessdb.has_key(fid):
387                        self.accessdb[fid] = []
388                    self.accessdb[fid].append((project, user))
389                    continue
390
391                m = user_line.match(line)
392                if m:
393                    fid = fedid(hexstr=m.group(1))
394                    project = None
395                    user = m.group(2)
396                    if not self.accessdb.has_key(fid):
397                        self.accessdb[fid] = []
398                    self.accessdb[fid].append((project, user))
399                    continue
400                self.log.warn("[experiment_control] Error parsing access " +\
401                        "db %s at line %d" %  (accessdb_file, lineno))
402        except IOError:
403            raise service_error(service_error.internal,
404                    "Error opening/reading %s as experiment " +\
405                            "control accessdb" %  accessdb_file)
406        f.close()
407
408        # Initialize the authorization attributes
409        for fid in self.accessdb.keys():
410            self.auth.set_attribute(fid, 'create')
411
412    def read_mapdb(self, file):
413        """
414        Read a simple colon separated list of mappings for the
415        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
416        """
417
418        self.tbmap = { }
419        lineno =0
420        try:
421            f = open(file, "r")
422            for line in f:
423                lineno += 1
424                line = line.strip()
425                if line.startswith('#') or len(line) == 0:
426                    continue
427                try:
428                    label, url = line.split(':', 1)
429                    self.tbmap[label] = url
430                except ValueError, e:
431                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
432                            "map db: %s %s" % (lineno, line, e))
433        except IOError, e:
434            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
435                    "open %s: %s" % (file, e))
436        f.close()
437
438    def scp_file(self, file, user, host, dest=""):
439        """
440        scp a file to the remote host.  If debug is set the action is only
441        logged.
442        """
443
444        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
445                "%s@%s:%s" % (user, host, dest)]
446        rv = 0
447
448        try:
449            dnull = open("/dev/null", "r")
450        except IOError:
451            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
452            dnull = Null
453
454        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
455        if not self.debug:
456            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
457            else: rv = call(scp_cmd)
458
459        return rv == 0
460
461    def ssh_cmd(self, user, host, cmd, wname=None):
462        """
463        Run a remote command on host as user.  If debug is set, the action is
464        only logged.
465        """
466        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
467                user, host, cmd)
468
469        try:
470            dnull = open("/dev/null", "r")
471        except IOError:
472            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
473            dnull = Null
474
475        self.log.debug("[ssh_cmd]: %s" % sh_str)
476        if not self.debug:
477            if dnull:
478                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
479            else:
480                sub = Popen(sh_str, shell=True)
481            return sub.wait() == 0
482        else:
483            return True
484
485    def ship_configs(self, host, user, src_dir, dest_dir):
486        """
487        Copy federant-specific configuration files to the federant.
488        """
489        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
490            return False
491        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
492            return False
493
494        for f in os.listdir(src_dir):
495            if os.path.isdir(f):
496                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
497                        "%s/%s" % (dest_dir, f)):
498                    return False
499            else:
500                if not self.scp_file("%s/%s" % (src_dir, f), 
501                        user, host, dest_dir):
502                    return False
503        return True
504
505    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
506        """
507        Start a sub-experiment on a federant.
508
509        Get the current state, modify or create as appropriate, ship data and
510        configs and start the experiment.  There are small ordering differences
511        based on the initial state of the sub-experiment.
512        """
513        # ops node in the federant
514        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
515        user = tbparams[tb]['user']     # federant user
516        pid = tbparams[tb]['project']   # federant project
517        # XXX
518        base_confs = ( "hosts",)
519        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
520        # command to test experiment state
521        expinfo_exec = "/usr/testbed/bin/expinfo" 
522        # Configuration directories on the remote machine
523        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
524        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
525        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
526        # Regular expressions to parse the expinfo response
527        state_re = re.compile("State:\s+(\w+)")
528        no_exp_re = re.compile("^No\s+such\s+experiment")
529        state = None    # Experiment state parsed from expinfo
530        # The expinfo ssh command
531        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
532
533        # Get status
534        self.log.debug("[start_segment]: %s"% " ".join(cmd))
535        dev_null = None
536        try:
537            dev_null = open("/dev/null", "a")
538        except IOError, e:
539            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
540
541        if self.debug:
542            state = 'swapped'
543            rv = 0
544        else:
545            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
546            for line in status.stdout:
547                m = state_re.match(line)
548                if m: state = m.group(1)
549                else:
550                    m = no_exp_re.match(line)
551                    if m: state = "none"
552            rv = status.wait()
553
554        # If the experiment is not present the subcommand returns a non-zero
555        # return value.  If we successfully parsed a "none" outcome, ignore the
556        # return code.
557        if rv != 0 and state != "none":
558            raise service_error(service_error.internal,
559                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
560
561        self.log.debug("[start_segment]: %s: %s" % (tb, state))
562        self.log.info("[start_segment]:transferring experiment to %s" % tb)
563
564        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
565            return False
566        # Clear the federation files
567        if not self.ssh_cmd(user, host, 
568                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
569            return False
570        if not self.ssh_cmd(user, host, 
571                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
572            return False
573        # Clear and create the tarfiles and rpm directories
574        for d in (tarfiles_dir, rpms_dir):
575            if not self.ssh_cmd(user, host, 
576                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
577                return False
578            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
579                    "create tarfiles"):
580                return False
581       
582        if state == 'active':
583            # Remote experiment is active.  Modify it.
584            for f in base_confs:
585                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
586                        "%s/%s" % (proj_dir, f)):
587                    return False
588            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
589                    proj_dir):
590                return False
591            if os.path.isdir("%s/tarfiles" % tmpdir):
592                if not self.ship_configs(host, user,
593                        "%s/tarfiles" % tmpdir, tarfiles_dir):
594                    return False
595            if os.path.isdir("%s/rpms" % tmpdir):
596                if not self.ship_configs(host, user,
597                        "%s/rpms" % tmpdir, tarfiles_dir):
598                    return False
599            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
600            if not self.ssh_cmd(user, host,
601                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
602                            (pid, eid, tclfile), "modexp"):
603                return False
604            return True
605        elif state == "swapped":
606            # Remote experiment swapped out.  Modify it and swap it in.
607            for f in base_confs:
608                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
609                        "%s/%s" % (proj_dir, f)):
610                    return False
611            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
612                    proj_dir):
613                return False
614            if os.path.isdir("%s/tarfiles" % tmpdir):
615                if not self.ship_configs(host, user,
616                        "%s/tarfiles" % tmpdir, tarfiles_dir):
617                    return False
618            if os.path.isdir("%s/rpms" % tmpdir):
619                if not self.ship_configs(host, user,
620                        "%s/rpms" % tmpdir, tarfiles_dir):
621                    return False
622            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
623            if not self.ssh_cmd(user, host,
624                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
625                    "modexp"):
626                return False
627            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
628            if not self.ssh_cmd(user, host,
629                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
630                    "swapexp"):
631                return False
632            return True
633        elif state == "none":
634            # No remote experiment.  Create one.  We do this in 2 steps so we
635            # can put the configuration files and scripts into the new
636            # experiment directories.
637
638            # Tarfiles must be present for creation to work
639            if os.path.isdir("%s/tarfiles" % tmpdir):
640                if not self.ship_configs(host, user,
641                        "%s/tarfiles" % tmpdir, tarfiles_dir):
642                    return False
643            if os.path.isdir("%s/rpms" % tmpdir):
644                if not self.ship_configs(host, user,
645                        "%s/rpms" % tmpdir, tarfiles_dir):
646                    return False
647            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
648            if not self.ssh_cmd(user, host,
649                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
650                            (pid, eid, tclfile), "startexp"):
651                return False
652            # After startexp the per-experiment directories exist
653            for f in base_confs:
654                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
655                        "%s/%s" % (proj_dir, f)):
656                    return False
657            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
658                    proj_dir):
659                return False
660            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
661            if not self.ssh_cmd(user, host,
662                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
663                    "swapexp"):
664                return False
665            return True
666        else:
667            self.log.debug("[start_segment]:unknown state %s" % state)
668            return False
669
670    def stop_segment(self, tb, eid, tbparams):
671        """
672        Stop a sub experiment by calling swapexp on the federant
673        """
674        user = tbparams[tb]['user']
675        host = tbparams[tb]['host']
676        pid = tbparams[tb]['project']
677
678        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
679        return self.ssh_cmd(user, host,
680                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
681
682       
683    def generate_ssh_keys(self, dest, type="rsa" ):
684        """
685        Generate a set of keys for the gateways to use to talk.
686
687        Keys are of type type and are stored in the required dest file.
688        """
689        valid_types = ("rsa", "dsa")
690        t = type.lower();
691        if t not in valid_types: raise ValueError
692        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
693
694        try:
695            trace = open("/dev/null", "w")
696        except IOError:
697            raise service_error(service_error.internal,
698                    "Cannot open /dev/null??");
699
700        # May raise CalledProcessError
701        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
702        rv = call(cmd, stdout=trace, stderr=trace)
703        if rv != 0:
704            raise service_error(service_error.internal, 
705                    "Cannot generate nonce ssh keys.  %s return code %d" \
706                            % (self.ssh_keygen, rv))
707
708    def gentopo(self, str):
709        """
710        Generate the topology dtat structure from the splitter's XML
711        representation of it.
712
713        The topology XML looks like:
714            <experiment>
715                <nodes>
716                    <node><vname></vname><ips>ip1:ip2</ips></node>
717                </nodes>
718                <lans>
719                    <lan>
720                        <vname></vname><vnode></vnode><ip></ip>
721                        <bandwidth></bandwidth><member>node:port</member>
722                    </lan>
723                </lans>
724        """
725        class topo_parse:
726            """
727            Parse the topology XML and create the dats structure.
728            """
729            def __init__(self):
730                # Typing of the subelements for data conversion
731                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
732                self.int_subelements = ( 'bandwidth',)
733                self.float_subelements = ( 'delay',)
734                # The final data structure
735                self.nodes = [ ]
736                self.lans =  [ ]
737                self.topo = { \
738                        'node': self.nodes,\
739                        'lan' : self.lans,\
740                    }
741                self.element = { }  # Current element being created
742                self.chars = ""     # Last text seen
743
744            def end_element(self, name):
745                # After each sub element the contents is added to the current
746                # element or to the appropriate list.
747                if name == 'node':
748                    self.nodes.append(self.element)
749                    self.element = { }
750                elif name == 'lan':
751                    self.lans.append(self.element)
752                    self.element = { }
753                elif name in self.str_subelements:
754                    self.element[name] = self.chars
755                    self.chars = ""
756                elif name in self.int_subelements:
757                    self.element[name] = int(self.chars)
758                    self.chars = ""
759                elif name in self.float_subelements:
760                    self.element[name] = float(self.chars)
761                    self.chars = ""
762
763            def found_chars(self, data):
764                self.chars += data.rstrip()
765
766
767        tp = topo_parse();
768        parser = xml.parsers.expat.ParserCreate()
769        parser.EndElementHandler = tp.end_element
770        parser.CharacterDataHandler = tp.found_chars
771
772        parser.Parse(str)
773
774        return tp.topo
775       
776
777    def genviz(self, topo):
778        """
779        Generate the visualization the virtual topology
780        """
781
782        neato = "/usr/local/bin/neato"
783        # These are used to parse neato output and to create the visualization
784        # file.
785        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
786        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
787                "%s</type></node>"
788
789        try:
790            # Node names
791            nodes = [ n['vname'] for n in topo['node'] ]
792            topo_lans = topo['lan']
793        except KeyError:
794            raise service_error(service_error.internal, "Bad topology")
795
796        lans = { }
797        links = { }
798
799        # Walk through the virtual topology, organizing the connections into
800        # 2-node connections (links) and more-than-2-node connections (lans).
801        # When a lan is created, it's added to the list of nodes (there's a
802        # node in the visualization for the lan).
803        for l in topo_lans:
804            if links.has_key(l['vname']):
805                if len(links[l['vname']]) < 2:
806                    links[l['vname']].append(l['vnode'])
807                else:
808                    nodes.append(l['vname'])
809                    lans[l['vname']] = links[l['vname']]
810                    del links[l['vname']]
811                    lans[l['vname']].append(l['vnode'])
812            elif lans.has_key(l['vname']):
813                lans[l['vname']].append(l['vnode'])
814            else:
815                links[l['vname']] = [ l['vnode'] ]
816
817
818        # Open up a temporary file for dot to turn into a visualization
819        try:
820            df, dotname = tempfile.mkstemp()
821            dotfile = os.fdopen(df, 'w')
822        except IOError:
823            raise service_error(service_error.internal,
824                    "Failed to open file in genviz")
825
826        # Generate a dot/neato input file from the links, nodes and lans
827        try:
828            print >>dotfile, "graph G {"
829            for n in nodes:
830                print >>dotfile, '\t"%s"' % n
831            for l in links.keys():
832                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
833            for l in lans.keys():
834                for n in lans[l]:
835                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
836            print >>dotfile, "}"
837            dotfile.close()
838        except TypeError:
839            raise service_error(service_error.internal,
840                    "Single endpoint link in vtopo")
841        except IOError:
842            raise service_error(service_error.internal, "Cannot write dot file")
843
844        # Use dot to create a visualization
845        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
846                '-Gpack=true', dotname], stdout=PIPE)
847
848        # Translate dot to vis format
849        vis_nodes = [ ]
850        vis = { 'node': vis_nodes }
851        for line in dot.stdout:
852            m = vis_re.match(line)
853            if m:
854                vn = m.group(1)
855                vis_node = {'name': vn, \
856                        'x': float(m.group(2)),\
857                        'y' : float(m.group(3)),\
858                    }
859                if vn in links.keys() or vn in lans.keys():
860                    vis_node['type'] = 'lan'
861                else:
862                    vis_node['type'] = 'node'
863                vis_nodes.append(vis_node)
864        rv = dot.wait()
865
866        os.remove(dotname)
867        if rv == 0 : return vis
868        else: return None
869
870    def get_access(self, tb, nodes, user, tbparam, master, export_project,
871            access_user):
872        """
873        Get access to testbed through fedd and set the parameters for that tb
874        """
875
876        translate_attr = {
877            'slavenodestartcmd': 'expstart',
878            'slaveconnectorstartcmd': 'gwstart',
879            'masternodestartcmd': 'mexpstart',
880            'masterconnectorstartcmd': 'mgwstart',
881            'connectorimage': 'gwimage',
882            'connectortype': 'gwtype',
883            'tunnelcfg': 'tun',
884            'smbshare': 'smbshare',
885        }
886
887        uri = self.tbmap.get(tb, None)
888        if not uri:
889            raise service_error(serice_error.server_config, 
890                    "Unknown testbed: %s" % tb)
891
892        # currently this lumps all users into one service access group
893        service_keys = [ a for u in user \
894                for a in u.get('access', []) \
895                    if a.has_key('sshPubkey')]
896
897        if len(service_keys) == 0:
898            raise service_error(service_error.req, 
899                    "Must have at least one SSH pubkey for services")
900
901
902        for p, u in access_user:
903            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
904                    "to %s") %  ((p or "None"), u, uri))
905
906            if p:
907                # Request with user and project specified
908                req = {\
909                        'destinationTestbed' : { 'uri' : uri },
910                        'project': { 
911                            'name': {'localname': p},
912                            'user': [ {'userID': { 'localname': u } } ],
913                            },
914                        'user':  user,
915                        'allocID' : { 'localname': 'test' },
916                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
917                        'serviceAccess' : service_keys
918                    }
919            else:
920                # Request with only user specified
921                req = {\
922                        'destinationTestbed' : { 'uri' : uri },
923                        'user':  [ {'userID': { 'localname': u } } ],
924                        'allocID' : { 'localname': 'test' },
925                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
926                        'serviceAccess' : service_keys
927                    }
928
929            if tb == master:
930                # NB, the export_project parameter is a dict that includes
931                # the type
932                req['exportProject'] = export_project
933
934            # node resources if any
935            if nodes != None and len(nodes) > 0:
936                rnodes = [ ]
937                for n in nodes:
938                    rn = { }
939                    image, hw, count = n.split(":")
940                    if image: rn['image'] = [ image ]
941                    if hw: rn['hardware'] = [ hw ]
942                    if count: rn['count'] = int(count)
943                    rnodes.append(rn)
944                req['resources']= { }
945                req['resources']['node'] = rnodes
946
947            try:
948                if self.local_access.has_key(uri):
949                    # Local access call
950                    req = { 'RequestAccessRequestBody' : req }
951                    r = self.local_access[uri].RequestAccess(req, 
952                            fedid(file=self.cert_file))
953                    r = { 'RequestAccessResponseBody' : r }
954                else:
955                    r = self.call_RequestAccess(uri, req, 
956                            self.cert_file, self.cert_pwd, self.trusted_certs)
957            except service_error, e:
958                if e.code == service_error.access:
959                    self.log.debug("[get_access] Access denied")
960                    r = None
961                    continue
962                else:
963                    raise e
964
965            if r.has_key('RequestAccessResponseBody'):
966                # Through to here we have a valid response, not a fault.
967                # Access denied is a fault, so something better or worse than
968                # access denied has happened.
969                r = r['RequestAccessResponseBody']
970                self.log.debug("[get_access] Access granted")
971                break
972            else:
973                raise service_error(service_error.protocol,
974                        "Bad proxy response")
975       
976        if not r:
977            raise service_error(service_error.access, 
978                    "Access denied by %s (%s)" % (tb, uri))
979
980        e = r['emulab']
981        p = e['project']
982        tbparam[tb] = { 
983                "boss": e['boss'],
984                "host": e['ops'],
985                "domain": e['domain'],
986                "fs": e['fileServer'],
987                "eventserver": e['eventServer'],
988                "project": unpack_id(p['name']),
989                "emulab" : e,
990                "allocID" : r['allocID'],
991                }
992        # Make the testbed name be the label the user applied
993        p['testbed'] = {'localname': tb }
994
995        for u in p['user']:
996            tbparam[tb]['user'] = unpack_id(u['userID'])
997
998        for a in e['fedAttr']:
999            if a['attribute']:
1000                key = translate_attr.get(a['attribute'].lower(), None)
1001                if key:
1002                    tbparam[tb][key]= a['value']
1003       
1004    def release_access(self, tb, aid):
1005        """
1006        Release access to testbed through fedd
1007        """
1008
1009        uri = self.tbmap.get(tb, None)
1010        if not uri:
1011            raise service_error(serice_error.server_config, 
1012                    "Unknown testbed: %s" % tb)
1013
1014        if self.local_access.has_key(uri):
1015            resp = self.local_access[uri].ReleaseAccess(\
1016                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
1017                    fedid(file=self.cert_file))
1018            resp = { 'ReleaseAccessResponseBody': resp } 
1019        else:
1020            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1021                    self.cert_file, self.cert_pwd, self.trusted_certs)
1022
1023        # better error coding
1024
1025    def remote_splitter(self, uri, desc, master):
1026
1027        req = {
1028                'description' : { 'ns2description': desc },
1029                'master': master,
1030                'include_fedkit': bool(self.fedkit)
1031            }
1032
1033        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1034                self.trusted_certs)
1035
1036        if r.has_key('Ns2SplitResponseBody'):
1037            r = r['Ns2SplitResponseBody']
1038            if r.has_key('output'):
1039                return r['output'].splitlines()
1040            else:
1041                raise service_error(service_error.protocol, 
1042                        "Bad splitter response (no output)")
1043        else:
1044            raise service_error(service_error.protocol, "Bad splitter response")
1045       
1046    class current_testbed:
1047        """
1048        Object for collecting the current testbed description.  The testbed
1049        description is saved to a file with the local testbed variables
1050        subsittuted line by line.
1051        """
1052        def __init__(self, eid, tmpdir, fedkit):
1053            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1054            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1055            self.current_testbed = None
1056            self.testbed_file = None
1057
1058            self.def_expstart = \
1059                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1060            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1061            self.def_gwstart = \
1062                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1063            self.def_mgwstart = \
1064                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1065            self.def_gwimage = "FBSD61-TUNNEL2";
1066            self.def_gwtype = "pc";
1067
1068            self.eid = eid
1069            self.tmpdir = tmpdir
1070            self.fedkit = fedkit
1071
1072        def __call__(self, line, master, allocated, tbparams):
1073            # Capture testbed topology descriptions
1074            if self.current_testbed == None:
1075                m = self.begin_testbed.match(line)
1076                if m != None:
1077                    self.current_testbed = m.group(1)
1078                    if self.current_testbed == None:
1079                        raise service_error(service_error.req,
1080                                "Bad request format (unnamed testbed)")
1081                    allocated[self.current_testbed] = \
1082                            allocated.get(self.current_testbed,0) + 1
1083                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1084                    if not os.path.exists(tb_dir):
1085                        try:
1086                            os.mkdir(tb_dir)
1087                        except IOError:
1088                            raise service_error(service_error.internal,
1089                                    "Cannot create %s" % tb_dir)
1090                    try:
1091                        self.testbed_file = open("%s/%s.%s.tcl" %
1092                                (tb_dir, self.eid, self.current_testbed), 'w')
1093                    except IOError:
1094                        self.testbed_file = None
1095                    return True
1096                else: return False
1097            else:
1098                m = self.end_testbed.match(line)
1099                if m != None:
1100                    if m.group(1) != self.current_testbed:
1101                        raise service_error(service_error.internal, 
1102                                "Mismatched testbed markers!?")
1103                    if self.testbed_file != None: 
1104                        self.testbed_file.close()
1105                        self.testbed_file = None
1106                    self.current_testbed = None
1107                elif self.testbed_file:
1108                    # Substitute variables and put the line into the local
1109                    # testbed file.
1110                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1111                            self.def_gwtype)
1112                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1113                            self.def_gwimage)
1114                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1115                            self.def_mgwstart)
1116                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1117                            self.def_mexpstart)
1118                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1119                            self.def_gwstart)
1120                    expstart = tbparams[self.current_testbed].get('expstart', 
1121                            self.def_expstart)
1122                    project = tbparams[self.current_testbed].get('project')
1123                    line = re.sub("GWTYPE", gwtype, line)
1124                    line = re.sub("GWIMAGE", gwimage, line)
1125                    if self.current_testbed == master:
1126                        line = re.sub("GWSTART", mgwstart, line)
1127                        line = re.sub("EXPSTART", mexpstart, line)
1128                    else:
1129                        line = re.sub("GWSTART", gwstart, line)
1130                        line = re.sub("EXPSTART", expstart, line)
1131                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1132                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1133                    line = re.sub("EID", self.eid, line)
1134                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1135                            (project, self.eid), line)
1136                    if self.fedkit:
1137                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1138                                line)
1139                    print >>self.testbed_file, line
1140                return True
1141
1142    class allbeds:
1143        """
1144        Process the Allbeds section.  Get access to each federant and save the
1145        parameters in tbparams
1146        """
1147        def __init__(self, get_access):
1148            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1149            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1150            self.in_allbeds = False
1151            self.get_access = get_access
1152
1153        def __call__(self, line, user, tbparams, master, export_project,
1154                access_user):
1155            # Testbed access parameters
1156            if not self.in_allbeds:
1157                if self.begin_allbeds.match(line):
1158                    self.in_allbeds = True
1159                    return True
1160                else:
1161                    return False
1162            else:
1163                if self.end_allbeds.match(line):
1164                    self.in_allbeds = False
1165                else:
1166                    nodes = line.split('|')
1167                    tb = nodes.pop(0)
1168                    self.get_access(tb, nodes, user, tbparams, master,
1169                            export_project, access_user)
1170                return True
1171
1172    class gateways:
1173        def __init__(self, eid, master, tmpdir, gw_pubkey,
1174                gw_secretkey, copy_file, fedkit):
1175            self.begin_gateways = \
1176                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1177            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1178            self.current_gateways = None
1179            self.control_gateway = None
1180            self.active_end = { }
1181
1182            self.eid = eid
1183            self.master = master
1184            self.tmpdir = tmpdir
1185            self.gw_pubkey_base = gw_pubkey
1186            self.gw_secretkey_base = gw_secretkey
1187
1188            self.copy_file = copy_file
1189            self.fedkit = fedkit
1190
1191
1192        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1193                active_end, tbparams, dtb, myname, desthost, type):
1194            """
1195            Produce a gateway configuration file from a gateways line.
1196            """
1197
1198            sproject = tbparams[gw].get('project', 'project')
1199            dproject = tbparams[dtb].get('project', 'project')
1200            sdomain = ".%s.%s%s" % (eid, sproject,
1201                    tbparams[gw].get('domain', ".example.com"))
1202            ddomain = ".%s.%s%s" % (eid, dproject,
1203                    tbparams[dtb].get('domain', ".example.com"))
1204            boss = tbparams[master].get('boss', "boss")
1205            fs = tbparams[master].get('fs', "fs")
1206            event_server = "%s%s" % \
1207                    (tbparams[gw].get('eventserver', "event_server"),
1208                            tbparams[gw].get('domain', "example.com"))
1209            remote_event_server = "%s%s" % \
1210                    (tbparams[dtb].get('eventserver', "event_server"),
1211                            tbparams[dtb].get('domain', "example.com"))
1212            seer_control = "%s%s" % \
1213                    (tbparams[gw].get('control', "control"), sdomain)
1214
1215            if self.fedkit:
1216                remote_script_dir = "/usr/local/federation/bin"
1217                local_script_dir = "/usr/local/federation/bin"
1218            else:
1219                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1220                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1221
1222            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1223            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1224            tunnel_cfg = tbparams[gw].get("tun", "false")
1225
1226            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1227            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1228
1229            # translate to lower case so the `hostname` hack for specifying
1230            # configuration files works.
1231            conf_file = conf_file.lower();
1232            remote_conf_file = remote_conf_file.lower();
1233
1234            if dtb == master:
1235                active = "false"
1236            elif gw == master:
1237                active = "true"
1238            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1239                active = "false"
1240            else:
1241                active_end['%s-%s' % (gw, dtb)] = 1
1242                active = "true"
1243
1244            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1245            print >>gwconfig, "Active: %s" % active
1246            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1247            print >>gwconfig, "BossName: %s" % boss
1248            print >>gwconfig, "FsName: %s" % fs
1249            print >>gwconfig, "EventServerName: %s" % event_server
1250            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1251            print >>gwconfig, "SeerControl: %s" % seer_control
1252            print >>gwconfig, "Type: %s" % type
1253            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1254            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1255                    local_script_dir
1256            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1257            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1258            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1259                    (remote_conf_dir, remote_conf_file)
1260            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1261            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1262            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1263            gwconfig.close()
1264
1265            return active == "true"
1266
1267        def __call__(self, line, allocated, tbparams):
1268            # Process gateways
1269            if not self.current_gateways:
1270                m = self.begin_gateways.match(line)
1271                if m:
1272                    self.current_gateways = m.group(1)
1273                    if allocated.has_key(self.current_gateways):
1274                        # This test should always succeed
1275                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1276                        if not os.path.exists(tb_dir):
1277                            try:
1278                                os.mkdir(tb_dir)
1279                            except IOError:
1280                                raise service_error(service_error.internal,
1281                                        "Cannot create %s" % tb_dir)
1282                    else:
1283                        # XXX
1284                        self.log.error("[gateways]: Ignoring gateways for " + \
1285                                "unknown testbed %s" % self.current_gateways)
1286                        self.current_gateways = None
1287                    return True
1288                else:
1289                    return False
1290            else:
1291                m = self.end_gateways.match(line)
1292                if m :
1293                    if m.group(1) != self.current_gateways:
1294                        raise service_error(service_error.internal,
1295                                "Mismatched gateway markers!?")
1296                    if self.control_gateway:
1297                        try:
1298                            cc = open("%s/%s/client.conf" %
1299                                    (self.tmpdir, self.current_gateways), 'w')
1300                            print >>cc, "ControlGateway: %s" % \
1301                                    self.control_gateway
1302                            if tbparams[self.master].has_key('smbshare'):
1303                                print >>cc, "SMBSHare: %s" % \
1304                                        tbparams[self.master]['smbshare']
1305                            print >>cc, "ProjectUser: %s" % \
1306                                    tbparams[self.master]['user']
1307                            print >>cc, "ProjectName: %s" % \
1308                                    tbparams[self.master]['project']
1309                            cc.close()
1310                        except IOError:
1311                            raise service_error(service_error.internal,
1312                                    "Error creating client config")
1313                        try:
1314                            cc = open("%s/%s/seer.conf" %
1315                                    (self.tmpdir, self.current_gateways),
1316                                    'w')
1317                            if self.current_gateways != self.master:
1318                                print >>cc, "ControlNode: %s" % \
1319                                        self.control_gateway
1320                            print >>cc, "ExperimentID: %s/%s" % \
1321                                    ( tbparams[self.master]['project'], \
1322                                    self.eid )
1323                            cc.close()
1324                        except IOError:
1325                            raise service_error(service_error.internal,
1326                                    "Error creating seer config")
1327                    else:
1328                        debug.error("[gateways]: No control gateway for %s" %\
1329                                    self.current_gateways)
1330                    self.current_gateways = None
1331                else:
1332                    dtb, myname, desthost, type = line.split(" ")
1333
1334                    if type == "control" or type == "both":
1335                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1336                                self.eid, 
1337                                tbparams[self.current_gateways]['project'],
1338                                tbparams[self.current_gateways]['domain'])
1339                    try:
1340                        active = self.gateway_conf_file(self.current_gateways,
1341                                self.master, self.eid, self.gw_pubkey_base,
1342                                self.gw_secretkey_base,
1343                                self.active_end, tbparams, dtb, myname,
1344                                desthost, type)
1345                    except IOError, e:
1346                        raise service_error(service_error.internal,
1347                                "Failed to write config file for %s" % \
1348                                        self.current_gateway)
1349           
1350                    gw_pubkey = "%s/keys/%s" % \
1351                            (self.tmpdir, self.gw_pubkey_base)
1352                    gw_secretkey = "%s/keys/%s" % \
1353                            (self.tmpdir, self.gw_secretkey_base)
1354
1355                    pkfile = "%s/%s/%s" % \
1356                            ( self.tmpdir, self.current_gateways, 
1357                                    self.gw_pubkey_base)
1358                    skfile = "%s/%s/%s" % \
1359                            ( self.tmpdir, self.current_gateways, 
1360                                    self.gw_secretkey_base)
1361
1362                    if not os.path.exists(pkfile):
1363                        try:
1364                            self.copy_file(gw_pubkey, pkfile)
1365                        except IOError:
1366                            service_error(service_error.internal,
1367                                    "Failed to copy pubkey file")
1368
1369                    if active and not os.path.exists(skfile):
1370                        try:
1371                            self.copy_file(gw_secretkey, skfile)
1372                        except IOError:
1373                            service_error(service_error.internal,
1374                                    "Failed to copy secretkey file")
1375                return True
1376
1377    class shunt_to_file:
1378        """
1379        Simple class to write data between two regexps to a file.
1380        """
1381        def __init__(self, begin, end, filename):
1382            """
1383            Begin shunting on a match of begin, stop on end, send data to
1384            filename.
1385            """
1386            self.begin = re.compile(begin)
1387            self.end = re.compile(end)
1388            self.in_shunt = False
1389            self.file = None
1390            self.filename = filename
1391
1392        def __call__(self, line):
1393            """
1394            Call this on each line in the input that may be shunted.
1395            """
1396            if not self.in_shunt:
1397                if self.begin.match(line):
1398                    self.in_shunt = True
1399                    try:
1400                        self.file = open(self.filename, "w")
1401                    except:
1402                        self.file = None
1403                        raise
1404                    return True
1405                else:
1406                    return False
1407            else:
1408                if self.end.match(line):
1409                    if self.file: 
1410                        self.file.close()
1411                        self.file = None
1412                    self.in_shunt = False
1413                else:
1414                    if self.file:
1415                        print >>self.file, line
1416                return True
1417
1418    class shunt_to_list:
1419        """
1420        Same interface as shunt_to_file.  Data collected in self.list, one list
1421        element per line.
1422        """
1423        def __init__(self, begin, end):
1424            self.begin = re.compile(begin)
1425            self.end = re.compile(end)
1426            self.in_shunt = False
1427            self.list = [ ]
1428       
1429        def __call__(self, line):
1430            if not self.in_shunt:
1431                if self.begin.match(line):
1432                    self.in_shunt = True
1433                    return True
1434                else:
1435                    return False
1436            else:
1437                if self.end.match(line):
1438                    self.in_shunt = False
1439                else:
1440                    self.list.append(line)
1441                return True
1442
1443    class shunt_to_string:
1444        """
1445        Same interface as shunt_to_file.  Data collected in self.str, all in
1446        one string.
1447        """
1448        def __init__(self, begin, end):
1449            self.begin = re.compile(begin)
1450            self.end = re.compile(end)
1451            self.in_shunt = False
1452            self.str = ""
1453       
1454        def __call__(self, line):
1455            if not self.in_shunt:
1456                if self.begin.match(line):
1457                    self.in_shunt = True
1458                    return True
1459                else:
1460                    return False
1461            else:
1462                if self.end.match(line):
1463                    self.in_shunt = False
1464                else:
1465                    self.str += line
1466                return True
1467
1468    def create_experiment(self, req, fid):
1469        """
1470        The external interface to experiment creation called from the
1471        dispatcher.
1472
1473        Creates a working directory, splits the incoming description using the
1474        splitter script and parses out the avrious subsections using the
1475        lcasses above.  Once each sub-experiment is created, use pooled threads
1476        to instantiate them and start it all up.
1477        """
1478
1479        if not self.auth.check_attribute(fid, 'create'):
1480            raise service_error(service_error.access, "Create access denied")
1481
1482        try:
1483            tmpdir = tempfile.mkdtemp(prefix="split-")
1484        except IOError:
1485            raise service_error(service_error.internal, "Cannot create tmp dir")
1486
1487        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1488        gw_secretkey_base = "fed.%s" % self.ssh_type
1489        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1490        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1491        tclfile = tmpdir + "/experiment.tcl"
1492        tbparams = { }
1493        try:
1494            access_user = self.accessdb[fid]
1495        except KeyError:
1496            raise service_error(service_error.internal,
1497                    "Access map and authorizer out of sync in " + \
1498                            "create_experiment for fedid %s"  % fid)
1499
1500        pid = "dummy"
1501        gid = "dummy"
1502        # XXX
1503        fail_soft = False
1504
1505        try:
1506            os.mkdir(tmpdir+"/keys")
1507        except OSError:
1508            raise service_error(service_error.internal,
1509                    "Can't make temporary dir")
1510
1511        req = req.get('CreateRequestBody', None)
1512        if not req:
1513            raise service_error(service_error.req,
1514                    "Bad request format (no CreateRequestBody)")
1515        # The tcl parser needs to read a file so put the content into that file
1516        descr=req.get('experimentdescription', None)
1517        if descr:
1518            file_content=descr.get('ns2description', None)
1519            if file_content:
1520                try:
1521                    f = open(tclfile, 'w')
1522                    f.write(file_content)
1523                    f.close()
1524                except IOError:
1525                    raise service_error(service_error.internal,
1526                            "Cannot write temp experiment description")
1527            else:
1528                raise service_error(service_error.req, 
1529                        "Only ns2descriptions supported")
1530        else:
1531            raise service_error(service_error.req, "No experiment description")
1532
1533        if req.has_key('experimentID') and \
1534                req['experimentID'].has_key('localname'):
1535            eid = req['experimentID']['localname']
1536            self.state_lock.acquire()
1537            while (self.state.has_key(eid)):
1538                eid += random.choice(string.ascii_letters)
1539            # To avoid another thread picking this localname
1540            self.state[eid] = "placeholder"
1541            self.state_lock.release()
1542        else:
1543            eid = self.exp_stem
1544            for i in range(0,5):
1545                eid += random.choice(string.ascii_letters)
1546            self.state_lock.acquire()
1547            while (self.state.has_key(eid)):
1548                eid = self.exp_stem
1549                for i in range(0,5):
1550                    eid += random.choice(string.ascii_letters)
1551            # To avoid another thread picking this localname
1552            self.state[eid] = "placeholder"
1553            self.state_lock.release()
1554
1555        try: 
1556            # This catches exceptions to clear the placeholder if necessary
1557            try:
1558                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1559            except ValueError:
1560                raise service_error(service_error.server_config, 
1561                        "Bad key type (%s)" % self.ssh_type)
1562
1563            user = req.get('user', None)
1564            if user == None:
1565                raise service_error(service_error.req, "No user")
1566
1567            master = req.get('master', None)
1568            if not master:
1569                raise service_error(service_error.req,
1570                        "No master testbed label")
1571            export_project = req.get('exportProject', None)
1572            if not export_project:
1573                raise service_error(service_error.req, "No export project")
1574           
1575            if self.splitter_url:
1576                self.log.debug("Calling remote splitter at %s" % \
1577                        self.splitter_url)
1578                split_data = self.remote_splitter(self.splitter_url,
1579                        file_content, master)
1580            else:
1581                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1582                    str(self.muxmax), '-m', master]
1583
1584                if self.fedkit:
1585                    tclcmd.append('-k')
1586
1587                tclcmd.extend([pid, gid, eid, tclfile])
1588
1589                self.log.debug("running local splitter %s", " ".join(tclcmd))
1590                tclparser = Popen(tclcmd, stdout=PIPE)
1591                split_data = tclparser.stdout
1592
1593            allocated = { }         # Testbeds we can access
1594            started = { }           # Testbeds where a sub-experiment started
1595                                # successfully
1596
1597            # Objects to parse the splitter output (defined above)
1598            parse_current_testbed = self.current_testbed(eid, tmpdir,
1599                    self.fedkit)
1600            parse_allbeds = self.allbeds(self.get_access)
1601            parse_gateways = self.gateways(eid, master, tmpdir,
1602                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1603                    self.fedkit)
1604            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1605                        "^#\s+End\s+Vtopo")
1606            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1607                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1608            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1609                    "^#\s+End\s+tarfiles")
1610            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1611                    "^#\s+End\s+rpms")
1612
1613            # Working on the split data
1614            for line in split_data:
1615                line = line.rstrip()
1616                if parse_current_testbed(line, master, allocated, tbparams):
1617                    continue
1618                elif parse_allbeds(line, user, tbparams, master, export_project,
1619                        access_user):
1620                    continue
1621                elif parse_gateways(line, allocated, tbparams):
1622                    continue
1623                elif parse_vtopo(line):
1624                    continue
1625                elif parse_hostnames(line):
1626                    continue
1627                elif parse_tarfiles(line):
1628                    continue
1629                elif parse_rpms(line):
1630                    continue
1631                else:
1632                    raise service_error(service_error.internal, 
1633                            "Bad tcl parse? %s" % line)
1634            # Virtual topology and visualization
1635            vtopo = self.gentopo(parse_vtopo.str)
1636            if not vtopo:
1637                raise service_error(service_error.internal, 
1638                        "Failed to generate virtual topology")
1639
1640            vis = self.genviz(vtopo)
1641            if not vis:
1642                raise service_error(service_error.internal, 
1643                        "Failed to generate visualization")
1644           
1645            # save federant information
1646            for k in allocated.keys():
1647                tbparams[k]['federant'] = {\
1648                        'name': [ { 'localname' : eid} ],\
1649                        'emulab': tbparams[k]['emulab'],\
1650                        'allocID' : tbparams[k]['allocID'],\
1651                        'master' : k == master,\
1652                    }
1653
1654
1655            # Copy tarfiles and rpms needed at remote sites into a staging area
1656            try:
1657                if self.fedkit:
1658                    parse_tarfiles.list.append(self.fedkit)
1659                for t in parse_tarfiles.list:
1660                    if not os.path.exists("%s/tarfiles" % tmpdir):
1661                        os.mkdir("%s/tarfiles" % tmpdir)
1662                    self.copy_file(t, "%s/tarfiles/%s" % \
1663                            (tmpdir, os.path.basename(t)))
1664                for r in parse_rpms.list:
1665                    if not os.path.exists("%s/rpms" % tmpdir):
1666                        os.mkdir("%s/rpms" % tmpdir)
1667                    self.copy_file(r, "%s/rpms/%s" % \
1668                            (tmpdir, os.path.basename(r)))
1669            except IOError, e:
1670                raise service_error(service_error.internal, 
1671                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1672
1673        except service_error, e:
1674            # If something goes wrong in the parse (usually an access error)
1675            # clear the placeholder state.  From here on out the code delays
1676            # exceptions.
1677            self.state_lock.acquire()
1678            del self.state[eid]
1679            self.state_lock.release()
1680            raise e
1681
1682        thread_pool_info = self.thread_pool()
1683        threads = [ ]
1684
1685        for tb in [ k for k in allocated.keys() if k != master]:
1686            # Wait until we have a free slot to start the next testbed load
1687            thread_pool_info.acquire()
1688            while thread_pool_info.started - \
1689                    thread_pool_info.terminated >= self.nthreads:
1690                thread_pool_info.wait()
1691            thread_pool_info.release()
1692
1693            # Create and start a thread to start the segment, and save it to
1694            # get the return value later
1695            t  = self.pooled_thread(target=self.start_segment, 
1696                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1697                    pdata=thread_pool_info, trace_file=self.trace_file)
1698            threads.append(t)
1699            t.start()
1700
1701        # Wait until all finish (the first clause of the while is to make sure
1702        # one starts)
1703        thread_pool_info.acquire()
1704        while thread_pool_info.started == 0 or \
1705                thread_pool_info.started > thread_pool_info.terminated:
1706            thread_pool_info.wait()
1707        thread_pool_info.release()
1708
1709        # If none failed, start the master
1710        failed = [ t.getName() for t in threads if not t.rv ]
1711
1712        if len(failed) == 0:
1713            if not self.start_segment(master, eid, tbparams, tmpdir):
1714                failed.append(master)
1715
1716        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1717        # If one failed clean up, unless fail_soft is set
1718        if failed:
1719            if not fail_soft:
1720                for tb in succeeded:
1721                    self.stop_segment(tb, eid, tbparams)
1722                # Remove the placeholder
1723                self.state_lock.acquire()
1724                del self.state[eid]
1725                self.state_lock.release()
1726
1727                raise service_error(service_error.federant,
1728                    "Swap in failed on %s" % ",".join(failed))
1729        else:
1730            self.log.info("[start_segment]: Experiment %s started" % eid)
1731
1732        # Generate an ID for the experiment (slice) and a certificate that the
1733        # allocator can use to prove they own it.  We'll ship it back through
1734        # the encrypted connection.
1735        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1736
1737        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1738
1739        # Walk up tmpdir, deleting as we go
1740        for path, dirs, files in os.walk(tmpdir, topdown=False):
1741            for f in files:
1742                os.remove(os.path.join(path, f))
1743            for d in dirs:
1744                os.rmdir(os.path.join(path, d))
1745        os.rmdir(tmpdir)
1746
1747        # The deepcopy prevents the allocation ID and other binaries from being
1748        # translated into other formats
1749        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1750                for tb in tbparams.keys() \
1751                    if tbparams[tb].has_key('federant') ],\
1752                    'vtopo': vtopo,\
1753                    'vis' : vis,
1754                    'experimentID' : [\
1755                            { 'fedid': copy.copy(expid) }, \
1756                            { 'localname': eid },\
1757                        ],\
1758                    'experimentAccess': { 'X509' : expcert },\
1759                }
1760
1761        # Insert the experiment into our state and update the disk copy
1762        self.state_lock.acquire()
1763        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1764                for tb in tbparams.keys() \
1765                    if tbparams[tb].has_key('federant') ],\
1766                    'vtopo': vtopo,\
1767                    'vis' : vis,
1768                    'owner': fid,
1769                    'experimentID' : [\
1770                            { 'fedid': expid }, { 'localname': eid },\
1771                        ],\
1772                }
1773        self.state[eid] = self.state[expid]
1774        if self.state_filename: self.write_state()
1775        self.state_lock.release()
1776
1777        self.auth.set_attribute(fid, expid)
1778        self.auth.set_attribute(expid, expid)
1779
1780        if not failed:
1781            return resp
1782        else:
1783            raise service_error(service_error.partial, \
1784                    "Partial swap in on %s" % ",".join(succeeded))
1785
1786    def check_experiment_access(self, fid, key):
1787        """
1788        Confirm that the fid has access to the experiment.  Though a request
1789        may be made in terms of a local name, the access attribute is always
1790        the experiment's fedid.
1791        """
1792        if not isinstance(key, fedid):
1793            self.state_lock.acquire()
1794            if self.state.has_key(key):
1795                try:
1796                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1797                            if f.has_key('fedid') ]
1798                except KeyError:
1799                    self.state_lock.release()
1800                    raise service_error(service_error.internal, 
1801                            "No fedid for experiment %s when checking " +\
1802                                    "access(!?)" % key)
1803                if len(kl) == 1:
1804                    key = kl[0]
1805                else:
1806                    self.state_lock.release()
1807                    raise service_error(service_error.internal, 
1808                            "multiple fedids for experiment %s when " +\
1809                                    "checking access(!?)" % key)
1810            else:
1811                self.state_lock.release()
1812                raise service_error(service_error.access, "Access Denied")
1813            self.state_lock.release()
1814
1815        if self.auth.check_attribute(fid, key):
1816            return True
1817        else:
1818            raise service_error(service_error.access, "Access Denied")
1819
1820
1821
1822    def get_vtopo(self, req, fid):
1823        """
1824        Return the stored virtual topology for this experiment
1825        """
1826        rv = None
1827
1828        req = req.get('VtopoRequestBody', None)
1829        if not req:
1830            raise service_error(service_error.req,
1831                    "Bad request format (no VtopoRequestBody)")
1832        exp = req.get('experiment', None)
1833        if exp:
1834            if exp.has_key('fedid'):
1835                key = exp['fedid']
1836                keytype = "fedid"
1837            elif exp.has_key('localname'):
1838                key = exp['localname']
1839                keytype = "localname"
1840            else:
1841                raise service_error(service_error.req, "Unknown lookup type")
1842        else:
1843            raise service_error(service_error.req, "No request?")
1844
1845        self.check_experiment_access(fid, key)
1846
1847        self.state_lock.acquire()
1848        if self.state.has_key(key):
1849            rv = { 'experiment' : {keytype: key },\
1850                    'vtopo': self.state[key]['vtopo'],\
1851                }
1852        self.state_lock.release()
1853
1854        if rv: return rv
1855        else: raise service_error(service_error.req, "No such experiment")
1856
1857    def get_vis(self, req, fid):
1858        """
1859        Return the stored visualization for this experiment
1860        """
1861        rv = None
1862
1863        req = req.get('VisRequestBody', None)
1864        if not req:
1865            raise service_error(service_error.req,
1866                    "Bad request format (no VisRequestBody)")
1867        exp = req.get('experiment', None)
1868        if exp:
1869            if exp.has_key('fedid'):
1870                key = exp['fedid']
1871                keytype = "fedid"
1872            elif exp.has_key('localname'):
1873                key = exp['localname']
1874                keytype = "localname"
1875            else:
1876                raise service_error(service_error.req, "Unknown lookup type")
1877        else:
1878            raise service_error(service_error.req, "No request?")
1879
1880        self.check_experiment_access(fid, key)
1881
1882        self.state_lock.acquire()
1883        if self.state.has_key(key):
1884            rv =  { 'experiment' : {keytype: key },\
1885                    'vis': self.state[key]['vis'],\
1886                    }
1887        self.state_lock.release()
1888
1889        if rv: return rv
1890        else: raise service_error(service_error.req, "No such experiment")
1891
1892    def get_info(self, req, fid):
1893        """
1894        Return all the stored info about this experiment
1895        """
1896        rv = None
1897
1898        req = req.get('InfoRequestBody', None)
1899        if not req:
1900            raise service_error(service_error.req,
1901                    "Bad request format (no VisRequestBody)")
1902        exp = req.get('experiment', None)
1903        if exp:
1904            if exp.has_key('fedid'):
1905                key = exp['fedid']
1906                keytype = "fedid"
1907            elif exp.has_key('localname'):
1908                key = exp['localname']
1909                keytype = "localname"
1910            else:
1911                raise service_error(service_error.req, "Unknown lookup type")
1912        else:
1913            raise service_error(service_error.req, "No request?")
1914
1915        self.check_experiment_access(fid, key)
1916
1917        # The state may be massaged by the service function that called
1918        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1919        # state.
1920        self.state_lock.acquire()
1921        if self.state.has_key(key):
1922            rv = copy.deepcopy(self.state[key])
1923        self.state_lock.release()
1924
1925        if rv: return rv
1926        else: raise service_error(service_error.req, "No such experiment")
1927
1928
1929    def terminate_experiment(self, req, fid):
1930        """
1931        Swap this experiment out on the federants and delete the shared
1932        information
1933        """
1934        tbparams = { }
1935        req = req.get('TerminateRequestBody', None)
1936        if not req:
1937            raise service_error(service_error.req,
1938                    "Bad request format (no TerminateRequestBody)")
1939        exp = req.get('experiment', None)
1940        if exp:
1941            if exp.has_key('fedid'):
1942                key = exp['fedid']
1943                keytype = "fedid"
1944            elif exp.has_key('localname'):
1945                key = exp['localname']
1946                keytype = "localname"
1947            else:
1948                raise service_error(service_error.req, "Unknown lookup type")
1949        else:
1950            raise service_error(service_error.req, "No request?")
1951
1952        self.check_experiment_access(fid, key)
1953
1954        self.state_lock.acquire()
1955        fed_exp = self.state.get(key, None)
1956
1957        if fed_exp:
1958            # This branch of the conditional holds the lock to generate a
1959            # consistent temporary tbparams variable to deallocate experiments.
1960            # It releases the lock to do the deallocations and reacquires it to
1961            # remove the experiment state when the termination is complete.
1962            ids = []
1963            #  experimentID is a list of dicts that are self-describing
1964            #  identifiers.  This finds all the fedids and localnames - the
1965            #  keys of self.state - and puts them into ids.
1966            for id in fed_exp.get('experimentID', []):
1967                if id.has_key('fedid'): ids.append(id['fedid'])
1968                if id.has_key('localname'): ids.append(id['localname'])
1969
1970            # Construct enough of the tbparams to make the stop_segment calls
1971            # work
1972            for fed in fed_exp['federant']:
1973                try:
1974                    for e in fed['name']:
1975                        eid = e.get('localname', None)
1976                        if eid: break
1977                    else:
1978                        continue
1979
1980                    p = fed['emulab']['project']
1981
1982                    project = p['name']['localname']
1983                    tb = p['testbed']['localname']
1984                    user = p['user'][0]['userID']['localname']
1985
1986                    domain = fed['emulab']['domain']
1987                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1988                    aid = fed['allocID']
1989                except KeyError, e:
1990                    continue
1991                tbparams[tb] = {\
1992                        'user': user,\
1993                        'domain': domain,\
1994                        'project': project,\
1995                        'host': host,\
1996                        'eid': eid,\
1997                        'aid': aid,\
1998                    }
1999            self.state_lock.release()
2000
2001            # Stop everyone.
2002            for tb in tbparams.keys():
2003                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
2004
2005            # release the allocations
2006            for tb in tbparams.keys():
2007                self.release_access(tb, tbparams[tb]['aid'])
2008
2009            # Remove the terminated experiment
2010            self.state_lock.acquire()
2011            for id in ids:
2012                if self.state.has_key(id): del self.state[id]
2013
2014            if self.state_filename: self.write_state()
2015            self.state_lock.release()
2016
2017            return { 'experiment': exp }
2018        else:
2019            # Don't forget to release the lock
2020            self.state_lock.release()
2021            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.