source: fedd/federation/experiment_control.py @ 5d3f239

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 5d3f239 was 5d3f239, checked in by Ted Faber <faber@…>, 15 years ago

change package name to avoid conflicts with fedd on install

  • Property mode set to 100644
File size: 59.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = None
152        self.cert_pwd = None
153        self.trusted_certs = None
154
155        # Walk through the various relevant certificat specifying config
156        # attributes until the local certificate attributes can be resolved.
157        # The walk is from most specific to most general specification.
158        for s in ("experiment_control", "globals"):
159            if config.has_section(s): 
160                if config.has_option(s, "cert_file"):
161                    if not self.cert_file:
162                        self.cert_file = config.get(s, "cert_file")
163                        self.cert_pwd = config.get(s, "cert_pwd")
164
165                if config.has_option(s, "trusted_certs"):
166                    if not self.trusted_certs:
167                        self.trusted_certs = config.get(s, "trusted_certs")
168
169
170        self.exp_stem = "fed-stem"
171        self.log = logging.getLogger("fedd.experiment_control")
172        set_log_level(config, "experiment_control", self.log)
173        self.muxmax = 2
174        self.nthreads = 2
175        self.randomize_experiments = False
176
177        self.scp_exec = "/usr/bin/scp"
178        self.splitter = None
179        self.ssh_exec="/usr/bin/ssh"
180        self.ssh_keygen = "/usr/bin/ssh-keygen"
181        self.ssh_identity_file = None
182
183
184        self.debug = config.getboolean("experiment_control", "create_debug")
185        self.state_filename = config.get("experiment_control", 
186                "experiment_state_file")
187        self.splitter_url = config.get("experiment_control", "splitter_url")
188        self.fedkit = config.get("experiment_control", "fedkit")
189        accessdb_file = config.get("experiment_control", "accessdb")
190
191        self.ssh_pubkey_file = config.get("experiment_control", 
192                "ssh_pubkey_file")
193        self.ssh_privkey_file = config.get("experiment_control",
194                "ssh_privkey_file")
195        # NB for internal master/slave ops, not experiment setup
196        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
197        self.state = { }
198        self.state_lock = Lock()
199        self.tclsh = "/usr/local/bin/otclsh"
200        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
201        mapdb_file = config.get("experiment_control", "mapdb")
202        self.trace_file = sys.stderr
203
204        self.def_expstart = \
205                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
206                "/tmp/federate";
207        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
208                "FEDDIR/hosts";
209        self.def_gwstart = \
210                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
211                "/tmp/bridge.log";
212        self.def_mgwstart = \
213                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
214                "/tmp/bridge.log";
215        self.def_gwimage = "FBSD61-TUNNEL2";
216        self.def_gwtype = "pc";
217
218        if auth:
219            self.auth = auth
220        else:
221            self.log.error(\
222                    "[access]: No authorizer initialized, creating local one.")
223            auth = authorizer()
224
225
226        if self.ssh_pubkey_file:
227            try:
228                f = open(self.ssh_pubkey_file, 'r')
229                self.ssh_pubkey = f.read()
230                f.close()
231            except IOError:
232                raise service_error(service_error.internal,
233                        "Cannot read sshpubkey")
234        else:
235            raise service_error(service_error.internal, 
236                    "No SSH public key file?")
237
238        if not self.ssh_privkey_file:
239            raise service_error(service_error.internal, 
240                    "No SSH public key file?")
241
242
243        if mapdb_file:
244            self.read_mapdb(mapdb_file)
245        else:
246            self.log.warn("[experiment_control] No testbed map, using defaults")
247            self.tbmap = { 
248                    'deter':'https://users.isi.deterlab.net:23235',
249                    'emulab':'https://users.isi.deterlab.net:23236',
250                    'ucb':'https://users.isi.deterlab.net:23237',
251                    }
252
253        if accessdb_file:
254                self.read_accessdb(accessdb_file)
255        else:
256            raise service_error(service_error.internal,
257                    "No accessdb specified in config")
258
259        # Grab saved state.  OK to do this w/o locking because it's read only
260        # and only one thread should be in existence that can see self.state at
261        # this point.
262        if self.state_filename:
263            self.read_state()
264
265        # Dispatch tables
266        self.soap_services = {\
267                'Create': soap_handler('Create', self.create_experiment),
268                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
269                'Vis': soap_handler('Vis', self.get_vis),
270                'Info': soap_handler('Info', self.get_info),
271                'Terminate': soap_handler('Terminate', 
272                    self.terminate_experiment),
273        }
274
275        self.xmlrpc_services = {\
276                'Create': xmlrpc_handler('Create', self.create_experiment),
277                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
278                'Vis': xmlrpc_handler('Vis', self.get_vis),
279                'Info': xmlrpc_handler('Info', self.get_info),
280                'Terminate': xmlrpc_handler('Terminate',
281                    self.terminate_experiment),
282        }
283
284    def copy_file(self, src, dest, size=1024):
285        """
286        Exceedingly simple file copy.
287        """
288        s = open(src,'r')
289        d = open(dest, 'w')
290
291        buf = "x"
292        while buf != "":
293            buf = s.read(size)
294            d.write(buf)
295        s.close()
296        d.close()
297
298    # Call while holding self.state_lock
299    def write_state(self):
300        """
301        Write a new copy of experiment state after copying the existing state
302        to a backup.
303
304        State format is a simple pickling of the state dictionary.
305        """
306        if os.access(self.state_filename, os.W_OK):
307            self.copy_file(self.state_filename, \
308                    "%s.bak" % self.state_filename)
309        try:
310            f = open(self.state_filename, 'w')
311            pickle.dump(self.state, f)
312        except IOError, e:
313            self.log.error("Can't write file %s: %s" % \
314                    (self.state_filename, e))
315        except pickle.PicklingError, e:
316            self.log.error("Pickling problem: %s" % e)
317        except TypeError, e:
318            self.log.error("Pickling problem (TypeError): %s" % e)
319
320    # Call while holding self.state_lock
321    def read_state(self):
322        """
323        Read a new copy of experiment state.  Old state is overwritten.
324
325        State format is a simple pickling of the state dictionary.
326        """
327        try:
328            f = open(self.state_filename, "r")
329            self.state = pickle.load(f)
330            self.log.debug("[read_state]: Read state from %s" % \
331                    self.state_filename)
332        except IOError, e:
333            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
334                    % (self.state_filename, e))
335        except pickle.UnpicklingError, e:
336            self.log.warning(("[read_state]: No saved state: " + \
337                    "Unpickling failed: %s") % e)
338       
339        for k in self.state.keys():
340            try:
341                # This list should only have one element in it, but phrasing it
342                # as a for loop doesn't cost much, really.  We have to find the
343                # fedid elements anyway.
344                for eid in [ f['fedid'] \
345                        for f in self.state[k]['experimentID']\
346                            if f.has_key('fedid') ]:
347                    self.auth.set_attribute(self.state[k]['owner'], eid)
348            except KeyError, e:
349                self.log.warning("[read_state]: State ownership or identity " +\
350                        "misformatted in %s: %s" % (self.state_filename, e))
351
352
353    def read_accessdb(self, accessdb_file):
354        """
355        Read the mapping from fedids that can create experiments to their name
356        in the 3-level access namespace.  All will be asserted from this
357        testbed and can include the local username and porject that will be
358        asserted on their behalf by this fedd.  Each fedid is also added to the
359        authorization system with the "create" attribute.
360        """
361        self.accessdb = {}
362        # These are the regexps for parsing the db
363        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
364        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
365                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
366        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
367                "\s*->\s*(" + name_expr + ")\s*$")
368        lineno = 0
369
370        # Parse the mappings and store in self.authdb, a dict of
371        # fedid -> (proj, user)
372        try:
373            f = open(accessdb_file, "r")
374            for line in f:
375                lineno += 1
376                line = line.strip()
377                if len(line) == 0 or line.startswith('#'):
378                    continue
379                m = project_line.match(line)
380                if m:
381                    fid = fedid(hexstr=m.group(1))
382                    project, user = m.group(2,3)
383                    if not self.accessdb.has_key(fid):
384                        self.accessdb[fid] = []
385                    self.accessdb[fid].append((project, user))
386                    continue
387
388                m = user_line.match(line)
389                if m:
390                    fid = fedid(hexstr=m.group(1))
391                    project = None
392                    user = m.group(2)
393                    if not self.accessdb.has_key(fid):
394                        self.accessdb[fid] = []
395                    self.accessdb[fid].append((project, user))
396                    continue
397                self.log.warn("[experiment_control] Error parsing access " +\
398                        "db %s at line %d" %  (accessdb_file, lineno))
399        except IOError:
400            raise service_error(service_error.internal,
401                    "Error opening/reading %s as experiment " +\
402                            "control accessdb" %  accessdb_file)
403        f.close()
404
405        # Initialize the authorization attributes
406        for fid in self.accessdb.keys():
407            self.auth.set_attribute(fid, 'create')
408
409    def read_mapdb(self, file):
410        """
411        Read a simple colon separated list of mappings for the
412        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
413        """
414
415        self.tbmap = { }
416        lineno =0
417        try:
418            f = open(file, "r")
419            for line in f:
420                lineno += 1
421                line = line.strip()
422                if line.startswith('#') or len(line) == 0:
423                    continue
424                try:
425                    label, url = line.split(':', 1)
426                    self.tbmap[label] = url
427                except ValueError, e:
428                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
429                            "map db: %s %s" % (lineno, line, e))
430        except IOError, e:
431            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
432                    "open %s: %s" % (file, e))
433        f.close()
434
435    def scp_file(self, file, user, host, dest=""):
436        """
437        scp a file to the remote host.  If debug is set the action is only
438        logged.
439        """
440
441        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
442                "%s@%s:%s" % (user, host, dest)]
443        rv = 0
444
445        try:
446            dnull = open("/dev/null", "r")
447        except IOError:
448            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
449            dnull = Null
450
451        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
452        if not self.debug:
453            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
454            else: rv = call(scp_cmd)
455
456        return rv == 0
457
458    def ssh_cmd(self, user, host, cmd, wname=None):
459        """
460        Run a remote command on host as user.  If debug is set, the action is
461        only logged.
462        """
463        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
464                user, host, cmd)
465
466        try:
467            dnull = open("/dev/null", "r")
468        except IOError:
469            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
470            dnull = Null
471
472        self.log.debug("[ssh_cmd]: %s" % sh_str)
473        if not self.debug:
474            if dnull:
475                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
476            else:
477                sub = Popen(sh_str, shell=True)
478            return sub.wait() == 0
479        else:
480            return True
481
482    def ship_configs(self, host, user, src_dir, dest_dir):
483        """
484        Copy federant-specific configuration files to the federant.
485        """
486        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
487            return False
488        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
489            return False
490
491        for f in os.listdir(src_dir):
492            if os.path.isdir(f):
493                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
494                        "%s/%s" % (dest_dir, f)):
495                    return False
496            else:
497                if not self.scp_file("%s/%s" % (src_dir, f), 
498                        user, host, dest_dir):
499                    return False
500        return True
501
502    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
503        """
504        Start a sub-experiment on a federant.
505
506        Get the current state, modify or create as appropriate, ship data and
507        configs and start the experiment.  There are small ordering differences
508        based on the initial state of the sub-experiment.
509        """
510        # ops node in the federant
511        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
512        user = tbparams[tb]['user']     # federant user
513        pid = tbparams[tb]['project']   # federant project
514        # XXX
515        base_confs = ( "hosts",)
516        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
517        # command to test experiment state
518        expinfo_exec = "/usr/testbed/bin/expinfo" 
519        # Configuration directories on the remote machine
520        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
521        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
522        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
523        # Regular expressions to parse the expinfo response
524        state_re = re.compile("State:\s+(\w+)")
525        no_exp_re = re.compile("^No\s+such\s+experiment")
526        state = None    # Experiment state parsed from expinfo
527        # The expinfo ssh command
528        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
529
530        # Get status
531        self.log.debug("[start_segment]: %s"% " ".join(cmd))
532        dev_null = None
533        try:
534            dev_null = open("/dev/null", "a")
535        except IOError, e:
536            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
537
538        if self.debug:
539            state = 'swapped'
540            rv = 0
541        else:
542            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
543            for line in status.stdout:
544                m = state_re.match(line)
545                if m: state = m.group(1)
546                else:
547                    m = no_exp_re.match(line)
548                    if m: state = "none"
549            rv = status.wait()
550
551        # If the experiment is not present the subcommand returns a non-zero
552        # return value.  If we successfully parsed a "none" outcome, ignore the
553        # return code.
554        if rv != 0 and state != "none":
555            raise service_error(service_error.internal,
556                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
557
558        self.log.debug("[start_segment]: %s: %s" % (tb, state))
559        self.log.info("[start_segment]:transferring experiment to %s" % tb)
560
561        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
562            return False
563        # Clear the federation files
564        if not self.ssh_cmd(user, host, 
565                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
566            return False
567        if not self.ssh_cmd(user, host, 
568                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
569            return False
570        # Clear and create the tarfiles and rpm directories
571        for d in (tarfiles_dir, rpms_dir):
572            if not self.ssh_cmd(user, host, 
573                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
574                return False
575            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
576                    "create tarfiles"):
577                return False
578       
579        if state == 'active':
580            # Remote experiment is active.  Modify it.
581            for f in base_confs:
582                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
583                        "%s/%s" % (proj_dir, f)):
584                    return False
585            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
586                    proj_dir):
587                return False
588            if os.path.isdir("%s/tarfiles" % tmpdir):
589                if not self.ship_configs(host, user,
590                        "%s/tarfiles" % tmpdir, tarfiles_dir):
591                    return False
592            if os.path.isdir("%s/rpms" % tmpdir):
593                if not self.ship_configs(host, user,
594                        "%s/rpms" % tmpdir, tarfiles_dir):
595                    return False
596            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
597            if not self.ssh_cmd(user, host,
598                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
599                            (pid, eid, tclfile), "modexp"):
600                return False
601            return True
602        elif state == "swapped":
603            # Remote experiment swapped out.  Modify it and swap it in.
604            for f in base_confs:
605                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
606                        "%s/%s" % (proj_dir, f)):
607                    return False
608            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
609                    proj_dir):
610                return False
611            if os.path.isdir("%s/tarfiles" % tmpdir):
612                if not self.ship_configs(host, user,
613                        "%s/tarfiles" % tmpdir, tarfiles_dir):
614                    return False
615            if os.path.isdir("%s/rpms" % tmpdir):
616                if not self.ship_configs(host, user,
617                        "%s/rpms" % tmpdir, tarfiles_dir):
618                    return False
619            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
620            if not self.ssh_cmd(user, host,
621                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
622                    "modexp"):
623                return False
624            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
625            if not self.ssh_cmd(user, host,
626                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
627                    "swapexp"):
628                return False
629            return True
630        elif state == "none":
631            # No remote experiment.  Create one.  We do this in 2 steps so we
632            # can put the configuration files and scripts into the new
633            # experiment directories.
634
635            # Tarfiles must be present for creation to work
636            if os.path.isdir("%s/tarfiles" % tmpdir):
637                if not self.ship_configs(host, user,
638                        "%s/tarfiles" % tmpdir, tarfiles_dir):
639                    return False
640            if os.path.isdir("%s/rpms" % tmpdir):
641                if not self.ship_configs(host, user,
642                        "%s/rpms" % tmpdir, tarfiles_dir):
643                    return False
644            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
645            if not self.ssh_cmd(user, host,
646                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
647                            (pid, eid, tclfile), "startexp"):
648                return False
649            # After startexp the per-experiment directories exist
650            for f in base_confs:
651                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
652                        "%s/%s" % (proj_dir, f)):
653                    return False
654            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
655                    proj_dir):
656                return False
657            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
658            if not self.ssh_cmd(user, host,
659                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
660                    "swapexp"):
661                return False
662            return True
663        else:
664            self.log.debug("[start_segment]:unknown state %s" % state)
665            return False
666
667    def stop_segment(self, tb, eid, tbparams):
668        """
669        Stop a sub experiment by calling swapexp on the federant
670        """
671        user = tbparams[tb]['user']
672        host = tbparams[tb]['host']
673        pid = tbparams[tb]['project']
674
675        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
676        return self.ssh_cmd(user, host,
677                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
678
679       
680    def generate_ssh_keys(self, dest, type="rsa" ):
681        """
682        Generate a set of keys for the gateways to use to talk.
683
684        Keys are of type type and are stored in the required dest file.
685        """
686        valid_types = ("rsa", "dsa")
687        t = type.lower();
688        if t not in valid_types: raise ValueError
689        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
690
691        try:
692            trace = open("/dev/null", "w")
693        except IOError:
694            raise service_error(service_error.internal,
695                    "Cannot open /dev/null??");
696
697        # May raise CalledProcessError
698        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
699        rv = call(cmd, stdout=trace, stderr=trace)
700        if rv != 0:
701            raise service_error(service_error.internal, 
702                    "Cannot generate nonce ssh keys.  %s return code %d" \
703                            % (self.ssh_keygen, rv))
704
705    def gentopo(self, str):
706        """
707        Generate the topology dtat structure from the splitter's XML
708        representation of it.
709
710        The topology XML looks like:
711            <experiment>
712                <nodes>
713                    <node><vname></vname><ips>ip1:ip2</ips></node>
714                </nodes>
715                <lans>
716                    <lan>
717                        <vname></vname><vnode></vnode><ip></ip>
718                        <bandwidth></bandwidth><member>node:port</member>
719                    </lan>
720                </lans>
721        """
722        class topo_parse:
723            """
724            Parse the topology XML and create the dats structure.
725            """
726            def __init__(self):
727                # Typing of the subelements for data conversion
728                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
729                self.int_subelements = ( 'bandwidth',)
730                self.float_subelements = ( 'delay',)
731                # The final data structure
732                self.nodes = [ ]
733                self.lans =  [ ]
734                self.topo = { \
735                        'node': self.nodes,\
736                        'lan' : self.lans,\
737                    }
738                self.element = { }  # Current element being created
739                self.chars = ""     # Last text seen
740
741            def end_element(self, name):
742                # After each sub element the contents is added to the current
743                # element or to the appropriate list.
744                if name == 'node':
745                    self.nodes.append(self.element)
746                    self.element = { }
747                elif name == 'lan':
748                    self.lans.append(self.element)
749                    self.element = { }
750                elif name in self.str_subelements:
751                    self.element[name] = self.chars
752                    self.chars = ""
753                elif name in self.int_subelements:
754                    self.element[name] = int(self.chars)
755                    self.chars = ""
756                elif name in self.float_subelements:
757                    self.element[name] = float(self.chars)
758                    self.chars = ""
759
760            def found_chars(self, data):
761                self.chars += data.rstrip()
762
763
764        tp = topo_parse();
765        parser = xml.parsers.expat.ParserCreate()
766        parser.EndElementHandler = tp.end_element
767        parser.CharacterDataHandler = tp.found_chars
768
769        parser.Parse(str)
770
771        return tp.topo
772       
773
774    def genviz(self, topo):
775        """
776        Generate the visualization the virtual topology
777        """
778
779        neato = "/usr/local/bin/neato"
780        # These are used to parse neato output and to create the visualization
781        # file.
782        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
783        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
784                "%s</type></node>"
785
786        try:
787            # Node names
788            nodes = [ n['vname'] for n in topo['node'] ]
789            topo_lans = topo['lan']
790        except KeyError:
791            raise service_error(service_error.internal, "Bad topology")
792
793        lans = { }
794        links = { }
795
796        # Walk through the virtual topology, organizing the connections into
797        # 2-node connections (links) and more-than-2-node connections (lans).
798        # When a lan is created, it's added to the list of nodes (there's a
799        # node in the visualization for the lan).
800        for l in topo_lans:
801            if links.has_key(l['vname']):
802                if len(links[l['vname']]) < 2:
803                    links[l['vname']].append(l['vnode'])
804                else:
805                    nodes.append(l['vname'])
806                    lans[l['vname']] = links[l['vname']]
807                    del links[l['vname']]
808                    lans[l['vname']].append(l['vnode'])
809            elif lans.has_key(l['vname']):
810                lans[l['vname']].append(l['vnode'])
811            else:
812                links[l['vname']] = [ l['vnode'] ]
813
814
815        # Open up a temporary file for dot to turn into a visualization
816        try:
817            df, dotname = tempfile.mkstemp()
818            dotfile = os.fdopen(df, 'w')
819        except IOError:
820            raise service_error(service_error.internal,
821                    "Failed to open file in genviz")
822
823        # Generate a dot/neato input file from the links, nodes and lans
824        try:
825            print >>dotfile, "graph G {"
826            for n in nodes:
827                print >>dotfile, '\t"%s"' % n
828            for l in links.keys():
829                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
830            for l in lans.keys():
831                for n in lans[l]:
832                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
833            print >>dotfile, "}"
834            dotfile.close()
835        except TypeError:
836            raise service_error(service_error.internal,
837                    "Single endpoint link in vtopo")
838        except IOError:
839            raise service_error(service_error.internal, "Cannot write dot file")
840
841        # Use dot to create a visualization
842        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
843                '-Gpack=true', dotname], stdout=PIPE)
844
845        # Translate dot to vis format
846        vis_nodes = [ ]
847        vis = { 'node': vis_nodes }
848        for line in dot.stdout:
849            m = vis_re.match(line)
850            if m:
851                vn = m.group(1)
852                vis_node = {'name': vn, \
853                        'x': float(m.group(2)),\
854                        'y' : float(m.group(3)),\
855                    }
856                if vn in links.keys() or vn in lans.keys():
857                    vis_node['type'] = 'lan'
858                else:
859                    vis_node['type'] = 'node'
860                vis_nodes.append(vis_node)
861        rv = dot.wait()
862
863        os.remove(dotname)
864        if rv == 0 : return vis
865        else: return None
866
867    def get_access(self, tb, nodes, user, tbparam, master, export_project,
868            access_user):
869        """
870        Get access to testbed through fedd and set the parameters for that tb
871        """
872
873        translate_attr = {
874            'slavenodestartcmd': 'expstart',
875            'slaveconnectorstartcmd': 'gwstart',
876            'masternodestartcmd': 'mexpstart',
877            'masterconnectorstartcmd': 'mgwstart',
878            'connectorimage': 'gwimage',
879            'connectortype': 'gwtype',
880            'tunnelcfg': 'tun',
881            'smbshare': 'smbshare',
882        }
883
884        uri = self.tbmap.get(tb, None)
885        if not uri:
886            raise service_error(serice_error.server_config, 
887                    "Unknown testbed: %s" % tb)
888
889        # currently this lumps all users into one service access group
890        service_keys = [ a for u in user \
891                for a in u.get('access', []) \
892                    if a.has_key('sshPubkey')]
893
894        if len(service_keys) == 0:
895            raise service_error(service_error.req, 
896                    "Must have at least one SSH pubkey for services")
897
898
899        for p, u in access_user:
900            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
901                    "to %s") %  ((p or "None"), u, uri))
902
903            if p:
904                # Request with user and project specified
905                req = {\
906                        'destinationTestbed' : { 'uri' : uri },
907                        'project': { 
908                            'name': {'localname': p},
909                            'user': [ {'userID': { 'localname': u } } ],
910                            },
911                        'user':  user,
912                        'allocID' : { 'localname': 'test' },
913                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
914                        'serviceAccess' : service_keys
915                    }
916            else:
917                # Request with only user specified
918                req = {\
919                        'destinationTestbed' : { 'uri' : uri },
920                        'user':  [ {'userID': { 'localname': u } } ],
921                        'allocID' : { 'localname': 'test' },
922                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
923                        'serviceAccess' : service_keys
924                    }
925
926            if tb == master:
927                # NB, the export_project parameter is a dict that includes
928                # the type
929                req['exportProject'] = export_project
930
931            # node resources if any
932            if nodes != None and len(nodes) > 0:
933                rnodes = [ ]
934                for n in nodes:
935                    rn = { }
936                    image, hw, count = n.split(":")
937                    if image: rn['image'] = [ image ]
938                    if hw: rn['hardware'] = [ hw ]
939                    if count: rn['count'] = int(count)
940                    rnodes.append(rn)
941                req['resources']= { }
942                req['resources']['node'] = rnodes
943
944            try:
945                r = self.call_RequestAccess(uri, req, 
946                        self.cert_file, self.cert_pwd, self.trusted_certs)
947            except service_error, e:
948                if e.code == service_error.access:
949                    self.log.debug("[get_access] Access denied")
950                    r = None
951                    continue
952                else:
953                    raise e
954
955            if r.has_key('RequestAccessResponseBody'):
956                # Through to here we have a valid response, not a fault.
957                # Access denied is a fault, so something better or worse than
958                # access denied has happened.
959                r = r['RequestAccessResponseBody']
960                self.log.debug("[get_access] Access granted")
961                break
962            else:
963                raise service_error(service_error.protocol,
964                        "Bad proxy response")
965       
966        if not r:
967            raise service_error(service_error.access, 
968                    "Access denied by %s (%s)" % (tb, uri))
969
970        e = r['emulab']
971        p = e['project']
972        tbparam[tb] = { 
973                "boss": e['boss'],
974                "host": e['ops'],
975                "domain": e['domain'],
976                "fs": e['fileServer'],
977                "eventserver": e['eventServer'],
978                "project": unpack_id(p['name']),
979                "emulab" : e,
980                "allocID" : r['allocID'],
981                }
982        # Make the testbed name be the label the user applied
983        p['testbed'] = {'localname': tb }
984
985        for u in p['user']:
986            tbparam[tb]['user'] = unpack_id(u['userID'])
987
988        for a in e['fedAttr']:
989            if a['attribute']:
990                key = translate_attr.get(a['attribute'].lower(), None)
991                if key:
992                    tbparam[tb][key]= a['value']
993       
994    def release_access(self, tb, aid):
995        """
996        Release access to testbed through fedd
997        """
998
999        uri = self.tbmap.get(tb, None)
1000        if not uri:
1001            raise service_error(serice_error.server_config, 
1002                    "Unknown testbed: %s" % tb)
1003
1004        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1005                self.cert_file, self.cert_pwd, self.trusted_certs)
1006
1007        # better error coding
1008
1009    def remote_splitter(self, uri, desc, master):
1010
1011        req = {
1012                'description' : { 'ns2description': desc },
1013                'master': master,
1014                'include_fedkit': bool(self.fedkit)
1015            }
1016
1017        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1018                self.trusted_certs)
1019
1020        if r.has_key('Ns2SplitResponseBody'):
1021            r = r['Ns2SplitResponseBody']
1022            if r.has_key('output'):
1023                return r['output'].splitlines()
1024            else:
1025                raise service_error(service_error.protocol, 
1026                        "Bad splitter response (no output)")
1027        else:
1028            raise service_error(service_error.protocol, "Bad splitter response")
1029       
1030    class current_testbed:
1031        """
1032        Object for collecting the current testbed description.  The testbed
1033        description is saved to a file with the local testbed variables
1034        subsittuted line by line.
1035        """
1036        def __init__(self, eid, tmpdir, fedkit):
1037            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1038            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1039            self.current_testbed = None
1040            self.testbed_file = None
1041
1042            self.def_expstart = \
1043                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1044            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1045            self.def_gwstart = \
1046                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1047            self.def_mgwstart = \
1048                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1049            self.def_gwimage = "FBSD61-TUNNEL2";
1050            self.def_gwtype = "pc";
1051
1052            self.eid = eid
1053            self.tmpdir = tmpdir
1054            self.fedkit = fedkit
1055
1056        def __call__(self, line, master, allocated, tbparams):
1057            # Capture testbed topology descriptions
1058            if self.current_testbed == None:
1059                m = self.begin_testbed.match(line)
1060                if m != None:
1061                    self.current_testbed = m.group(1)
1062                    if self.current_testbed == None:
1063                        raise service_error(service_error.req,
1064                                "Bad request format (unnamed testbed)")
1065                    allocated[self.current_testbed] = \
1066                            allocated.get(self.current_testbed,0) + 1
1067                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1068                    if not os.path.exists(tb_dir):
1069                        try:
1070                            os.mkdir(tb_dir)
1071                        except IOError:
1072                            raise service_error(service_error.internal,
1073                                    "Cannot create %s" % tb_dir)
1074                    try:
1075                        self.testbed_file = open("%s/%s.%s.tcl" %
1076                                (tb_dir, self.eid, self.current_testbed), 'w')
1077                    except IOError:
1078                        self.testbed_file = None
1079                    return True
1080                else: return False
1081            else:
1082                m = self.end_testbed.match(line)
1083                if m != None:
1084                    if m.group(1) != self.current_testbed:
1085                        raise service_error(service_error.internal, 
1086                                "Mismatched testbed markers!?")
1087                    if self.testbed_file != None: 
1088                        self.testbed_file.close()
1089                        self.testbed_file = None
1090                    self.current_testbed = None
1091                elif self.testbed_file:
1092                    # Substitute variables and put the line into the local
1093                    # testbed file.
1094                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1095                            self.def_gwtype)
1096                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1097                            self.def_gwimage)
1098                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1099                            self.def_mgwstart)
1100                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1101                            self.def_mexpstart)
1102                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1103                            self.def_gwstart)
1104                    expstart = tbparams[self.current_testbed].get('expstart', 
1105                            self.def_expstart)
1106                    project = tbparams[self.current_testbed].get('project')
1107                    line = re.sub("GWTYPE", gwtype, line)
1108                    line = re.sub("GWIMAGE", gwimage, line)
1109                    if self.current_testbed == master:
1110                        line = re.sub("GWSTART", mgwstart, line)
1111                        line = re.sub("EXPSTART", mexpstart, line)
1112                    else:
1113                        line = re.sub("GWSTART", gwstart, line)
1114                        line = re.sub("EXPSTART", expstart, line)
1115                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1116                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1117                    line = re.sub("EID", self.eid, line)
1118                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1119                            (project, self.eid), line)
1120                    if self.fedkit:
1121                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1122                                line)
1123                    print >>self.testbed_file, line
1124                return True
1125
1126    class allbeds:
1127        """
1128        Process the Allbeds section.  Get access to each federant and save the
1129        parameters in tbparams
1130        """
1131        def __init__(self, get_access):
1132            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1133            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1134            self.in_allbeds = False
1135            self.get_access = get_access
1136
1137        def __call__(self, line, user, tbparams, master, export_project,
1138                access_user):
1139            # Testbed access parameters
1140            if not self.in_allbeds:
1141                if self.begin_allbeds.match(line):
1142                    self.in_allbeds = True
1143                    return True
1144                else:
1145                    return False
1146            else:
1147                if self.end_allbeds.match(line):
1148                    self.in_allbeds = False
1149                else:
1150                    nodes = line.split('|')
1151                    tb = nodes.pop(0)
1152                    self.get_access(tb, nodes, user, tbparams, master,
1153                            export_project, access_user)
1154                return True
1155
1156    class gateways:
1157        def __init__(self, eid, master, tmpdir, gw_pubkey,
1158                gw_secretkey, copy_file, fedkit):
1159            self.begin_gateways = \
1160                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1161            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1162            self.current_gateways = None
1163            self.control_gateway = None
1164            self.active_end = { }
1165
1166            self.eid = eid
1167            self.master = master
1168            self.tmpdir = tmpdir
1169            self.gw_pubkey_base = gw_pubkey
1170            self.gw_secretkey_base = gw_secretkey
1171
1172            self.copy_file = copy_file
1173            self.fedkit = fedkit
1174
1175
1176        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1177                active_end, tbparams, dtb, myname, desthost, type):
1178            """
1179            Produce a gateway configuration file from a gateways line.
1180            """
1181
1182            sproject = tbparams[gw].get('project', 'project')
1183            dproject = tbparams[dtb].get('project', 'project')
1184            sdomain = ".%s.%s%s" % (eid, sproject,
1185                    tbparams[gw].get('domain', ".example.com"))
1186            ddomain = ".%s.%s%s" % (eid, dproject,
1187                    tbparams[dtb].get('domain', ".example.com"))
1188            boss = tbparams[master].get('boss', "boss")
1189            fs = tbparams[master].get('fs', "fs")
1190            event_server = "%s%s" % \
1191                    (tbparams[gw].get('eventserver', "event_server"),
1192                            tbparams[gw].get('domain', "example.com"))
1193            remote_event_server = "%s%s" % \
1194                    (tbparams[dtb].get('eventserver', "event_server"),
1195                            tbparams[dtb].get('domain', "example.com"))
1196            seer_control = "%s%s" % \
1197                    (tbparams[gw].get('control', "control"), sdomain)
1198
1199            if self.fedkit:
1200                remote_script_dir = "/usr/local/federation/bin"
1201                local_script_dir = "/usr/local/federation/bin"
1202            else:
1203                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1204                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1205
1206            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1207            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1208            tunnel_cfg = tbparams[gw].get("tun", "false")
1209
1210            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1211            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1212
1213            # translate to lower case so the `hostname` hack for specifying
1214            # configuration files works.
1215            conf_file = conf_file.lower();
1216            remote_conf_file = remote_conf_file.lower();
1217
1218            if dtb == master:
1219                active = "false"
1220            elif gw == master:
1221                active = "true"
1222            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1223                active = "false"
1224            else:
1225                active_end['%s-%s' % (gw, dtb)] = 1
1226                active = "true"
1227
1228            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1229            print >>gwconfig, "Active: %s" % active
1230            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1231            print >>gwconfig, "BossName: %s" % boss
1232            print >>gwconfig, "FsName: %s" % fs
1233            print >>gwconfig, "EventServerName: %s" % event_server
1234            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1235            print >>gwconfig, "SeerControl: %s" % seer_control
1236            print >>gwconfig, "Type: %s" % type
1237            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1238            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1239                    local_script_dir
1240            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1241            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1242            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1243                    (remote_conf_dir, remote_conf_file)
1244            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1245            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1246            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1247            gwconfig.close()
1248
1249            return active == "true"
1250
1251        def __call__(self, line, allocated, tbparams):
1252            # Process gateways
1253            if not self.current_gateways:
1254                m = self.begin_gateways.match(line)
1255                if m:
1256                    self.current_gateways = m.group(1)
1257                    if allocated.has_key(self.current_gateways):
1258                        # This test should always succeed
1259                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1260                        if not os.path.exists(tb_dir):
1261                            try:
1262                                os.mkdir(tb_dir)
1263                            except IOError:
1264                                raise service_error(service_error.internal,
1265                                        "Cannot create %s" % tb_dir)
1266                    else:
1267                        # XXX
1268                        self.log.error("[gateways]: Ignoring gateways for " + \
1269                                "unknown testbed %s" % self.current_gateways)
1270                        self.current_gateways = None
1271                    return True
1272                else:
1273                    return False
1274            else:
1275                m = self.end_gateways.match(line)
1276                if m :
1277                    if m.group(1) != self.current_gateways:
1278                        raise service_error(service_error.internal,
1279                                "Mismatched gateway markers!?")
1280                    if self.control_gateway:
1281                        try:
1282                            cc = open("%s/%s/client.conf" %
1283                                    (self.tmpdir, self.current_gateways), 'w')
1284                            print >>cc, "ControlGateway: %s" % \
1285                                    self.control_gateway
1286                            if tbparams[self.master].has_key('smbshare'):
1287                                print >>cc, "SMBSHare: %s" % \
1288                                        tbparams[self.master]['smbshare']
1289                            print >>cc, "ProjectUser: %s" % \
1290                                    tbparams[self.master]['user']
1291                            print >>cc, "ProjectName: %s" % \
1292                                    tbparams[self.master]['project']
1293                            cc.close()
1294                        except IOError:
1295                            raise service_error(service_error.internal,
1296                                    "Error creating client config")
1297                        try:
1298                            cc = open("%s/%s/seer.conf" %
1299                                    (self.tmpdir, self.current_gateways),
1300                                    'w')
1301                            if self.current_gateways != self.master:
1302                                print >>cc, "ControlNode: %s" % \
1303                                        self.control_gateway
1304                            print >>cc, "ExperimentID: %s/%s" % \
1305                                    ( tbparams[self.master]['project'], \
1306                                    self.eid )
1307                            cc.close()
1308                        except IOError:
1309                            raise service_error(service_error.internal,
1310                                    "Error creating seer config")
1311                    else:
1312                        debug.error("[gateways]: No control gateway for %s" %\
1313                                    self.current_gateways)
1314                    self.current_gateways = None
1315                else:
1316                    dtb, myname, desthost, type = line.split(" ")
1317
1318                    if type == "control" or type == "both":
1319                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1320                                self.eid, 
1321                                tbparams[self.current_gateways]['project'],
1322                                tbparams[self.current_gateways]['domain'])
1323                    try:
1324                        active = self.gateway_conf_file(self.current_gateways,
1325                                self.master, self.eid, self.gw_pubkey_base,
1326                                self.gw_secretkey_base,
1327                                self.active_end, tbparams, dtb, myname,
1328                                desthost, type)
1329                    except IOError, e:
1330                        raise service_error(service_error.internal,
1331                                "Failed to write config file for %s" % \
1332                                        self.current_gateway)
1333           
1334                    gw_pubkey = "%s/keys/%s" % \
1335                            (self.tmpdir, self.gw_pubkey_base)
1336                    gw_secretkey = "%s/keys/%s" % \
1337                            (self.tmpdir, self.gw_secretkey_base)
1338
1339                    pkfile = "%s/%s/%s" % \
1340                            ( self.tmpdir, self.current_gateways, 
1341                                    self.gw_pubkey_base)
1342                    skfile = "%s/%s/%s" % \
1343                            ( self.tmpdir, self.current_gateways, 
1344                                    self.gw_secretkey_base)
1345
1346                    if not os.path.exists(pkfile):
1347                        try:
1348                            self.copy_file(gw_pubkey, pkfile)
1349                        except IOError:
1350                            service_error(service_error.internal,
1351                                    "Failed to copy pubkey file")
1352
1353                    if active and not os.path.exists(skfile):
1354                        try:
1355                            self.copy_file(gw_secretkey, skfile)
1356                        except IOError:
1357                            service_error(service_error.internal,
1358                                    "Failed to copy secretkey file")
1359                return True
1360
1361    class shunt_to_file:
1362        """
1363        Simple class to write data between two regexps to a file.
1364        """
1365        def __init__(self, begin, end, filename):
1366            """
1367            Begin shunting on a match of begin, stop on end, send data to
1368            filename.
1369            """
1370            self.begin = re.compile(begin)
1371            self.end = re.compile(end)
1372            self.in_shunt = False
1373            self.file = None
1374            self.filename = filename
1375
1376        def __call__(self, line):
1377            """
1378            Call this on each line in the input that may be shunted.
1379            """
1380            if not self.in_shunt:
1381                if self.begin.match(line):
1382                    self.in_shunt = True
1383                    try:
1384                        self.file = open(self.filename, "w")
1385                    except:
1386                        self.file = None
1387                        raise
1388                    return True
1389                else:
1390                    return False
1391            else:
1392                if self.end.match(line):
1393                    if self.file: 
1394                        self.file.close()
1395                        self.file = None
1396                    self.in_shunt = False
1397                else:
1398                    if self.file:
1399                        print >>self.file, line
1400                return True
1401
1402    class shunt_to_list:
1403        """
1404        Same interface as shunt_to_file.  Data collected in self.list, one list
1405        element per line.
1406        """
1407        def __init__(self, begin, end):
1408            self.begin = re.compile(begin)
1409            self.end = re.compile(end)
1410            self.in_shunt = False
1411            self.list = [ ]
1412       
1413        def __call__(self, line):
1414            if not self.in_shunt:
1415                if self.begin.match(line):
1416                    self.in_shunt = True
1417                    return True
1418                else:
1419                    return False
1420            else:
1421                if self.end.match(line):
1422                    self.in_shunt = False
1423                else:
1424                    self.list.append(line)
1425                return True
1426
1427    class shunt_to_string:
1428        """
1429        Same interface as shunt_to_file.  Data collected in self.str, all in
1430        one string.
1431        """
1432        def __init__(self, begin, end):
1433            self.begin = re.compile(begin)
1434            self.end = re.compile(end)
1435            self.in_shunt = False
1436            self.str = ""
1437       
1438        def __call__(self, line):
1439            if not self.in_shunt:
1440                if self.begin.match(line):
1441                    self.in_shunt = True
1442                    return True
1443                else:
1444                    return False
1445            else:
1446                if self.end.match(line):
1447                    self.in_shunt = False
1448                else:
1449                    self.str += line
1450                return True
1451
1452    def create_experiment(self, req, fid):
1453        """
1454        The external interface to experiment creation called from the
1455        dispatcher.
1456
1457        Creates a working directory, splits the incoming description using the
1458        splitter script and parses out the avrious subsections using the
1459        lcasses above.  Once each sub-experiment is created, use pooled threads
1460        to instantiate them and start it all up.
1461        """
1462
1463        if not self.auth.check_attribute(fid, 'create'):
1464            raise service_error(service_error.access, "Create access denied")
1465
1466        try:
1467            tmpdir = tempfile.mkdtemp(prefix="split-")
1468        except IOError:
1469            raise service_error(service_error.internal, "Cannot create tmp dir")
1470
1471        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1472        gw_secretkey_base = "fed.%s" % self.ssh_type
1473        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1474        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1475        tclfile = tmpdir + "/experiment.tcl"
1476        tbparams = { }
1477        try:
1478            access_user = self.accessdb[fid]
1479        except KeyError:
1480            raise service_error(service_error.internal,
1481                    "Access map and authorizer out of sync in " + \
1482                            "create_experiment for fedid %s"  % fid)
1483
1484        pid = "dummy"
1485        gid = "dummy"
1486        # XXX
1487        fail_soft = False
1488
1489        try:
1490            os.mkdir(tmpdir+"/keys")
1491        except OSError:
1492            raise service_error(service_error.internal,
1493                    "Can't make temporary dir")
1494
1495        req = req.get('CreateRequestBody', None)
1496        if not req:
1497            raise service_error(service_error.req,
1498                    "Bad request format (no CreateRequestBody)")
1499        # The tcl parser needs to read a file so put the content into that file
1500        descr=req.get('experimentdescription', None)
1501        if descr:
1502            file_content=descr.get('ns2description', None)
1503            if file_content:
1504                try:
1505                    f = open(tclfile, 'w')
1506                    f.write(file_content)
1507                    f.close()
1508                except IOError:
1509                    raise service_error(service_error.internal,
1510                            "Cannot write temp experiment description")
1511            else:
1512                raise service_error(service_error.req, 
1513                        "Only ns2descriptions supported")
1514        else:
1515            raise service_error(service_error.req, "No experiment description")
1516
1517        if req.has_key('experimentID') and \
1518                req['experimentID'].has_key('localname'):
1519            eid = req['experimentID']['localname']
1520            self.state_lock.acquire()
1521            while (self.state.has_key(eid)):
1522                eid += random.choice(string.ascii_letters)
1523            # To avoid another thread picking this localname
1524            self.state[eid] = "placeholder"
1525            self.state_lock.release()
1526        else:
1527            eid = self.exp_stem
1528            for i in range(0,5):
1529                eid += random.choice(string.ascii_letters)
1530            self.state_lock.acquire()
1531            while (self.state.has_key(eid)):
1532                eid = self.exp_stem
1533                for i in range(0,5):
1534                    eid += random.choice(string.ascii_letters)
1535            # To avoid another thread picking this localname
1536            self.state[eid] = "placeholder"
1537            self.state_lock.release()
1538
1539        try: 
1540            # This catches exceptions to clear the placeholder if necessary
1541            try:
1542                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1543            except ValueError:
1544                raise service_error(service_error.server_config, 
1545                        "Bad key type (%s)" % self.ssh_type)
1546
1547            user = req.get('user', None)
1548            if user == None:
1549                raise service_error(service_error.req, "No user")
1550
1551            master = req.get('master', None)
1552            if not master:
1553                raise service_error(service_error.req,
1554                        "No master testbed label")
1555            export_project = req.get('exportProject', None)
1556            if not export_project:
1557                raise service_error(service_error.req, "No export project")
1558           
1559            if self.splitter_url:
1560                self.log.debug("Calling remote splitter at %s" % \
1561                        self.splitter_url)
1562                split_data = self.remote_splitter(self.splitter_url,
1563                        file_content, master)
1564            else:
1565                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1566                    str(self.muxmax), '-m', master]
1567
1568                if self.fedkit:
1569                    tclcmd.append('-k')
1570
1571                tclcmd.extend([pid, gid, eid, tclfile])
1572
1573                self.log.debug("running local splitter %s", " ".join(tclcmd))
1574                tclparser = Popen(tclcmd, stdout=PIPE)
1575                split_data = tclparser.stdout
1576
1577            allocated = { }         # Testbeds we can access
1578            started = { }           # Testbeds where a sub-experiment started
1579                                # successfully
1580
1581            # Objects to parse the splitter output (defined above)
1582            parse_current_testbed = self.current_testbed(eid, tmpdir,
1583                    self.fedkit)
1584            parse_allbeds = self.allbeds(self.get_access)
1585            parse_gateways = self.gateways(eid, master, tmpdir,
1586                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1587                    self.fedkit)
1588            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1589                        "^#\s+End\s+Vtopo")
1590            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1591                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1592            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1593                    "^#\s+End\s+tarfiles")
1594            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1595                    "^#\s+End\s+rpms")
1596
1597            # Working on the split data
1598            for line in split_data:
1599                line = line.rstrip()
1600                if parse_current_testbed(line, master, allocated, tbparams):
1601                    continue
1602                elif parse_allbeds(line, user, tbparams, master, export_project,
1603                        access_user):
1604                    continue
1605                elif parse_gateways(line, allocated, tbparams):
1606                    continue
1607                elif parse_vtopo(line):
1608                    continue
1609                elif parse_hostnames(line):
1610                    continue
1611                elif parse_tarfiles(line):
1612                    continue
1613                elif parse_rpms(line):
1614                    continue
1615                else:
1616                    raise service_error(service_error.internal, 
1617                            "Bad tcl parse? %s" % line)
1618            # Virtual topology and visualization
1619            vtopo = self.gentopo(parse_vtopo.str)
1620            if not vtopo:
1621                raise service_error(service_error.internal, 
1622                        "Failed to generate virtual topology")
1623
1624            vis = self.genviz(vtopo)
1625            if not vis:
1626                raise service_error(service_error.internal, 
1627                        "Failed to generate visualization")
1628           
1629            # save federant information
1630            for k in allocated.keys():
1631                tbparams[k]['federant'] = {\
1632                        'name': [ { 'localname' : eid} ],\
1633                        'emulab': tbparams[k]['emulab'],\
1634                        'allocID' : tbparams[k]['allocID'],\
1635                        'master' : k == master,\
1636                    }
1637
1638
1639            # Copy tarfiles and rpms needed at remote sites into a staging area
1640            try:
1641                if self.fedkit:
1642                    parse_tarfiles.list.append(self.fedkit)
1643                for t in parse_tarfiles.list:
1644                    if not os.path.exists("%s/tarfiles" % tmpdir):
1645                        os.mkdir("%s/tarfiles" % tmpdir)
1646                    self.copy_file(t, "%s/tarfiles/%s" % \
1647                            (tmpdir, os.path.basename(t)))
1648                for r in parse_rpms.list:
1649                    if not os.path.exists("%s/rpms" % tmpdir):
1650                        os.mkdir("%s/rpms" % tmpdir)
1651                    self.copy_file(r, "%s/rpms/%s" % \
1652                            (tmpdir, os.path.basename(r)))
1653            except IOError, e:
1654                raise service_error(service_error.internal, 
1655                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1656
1657        except service_error, e:
1658            # If something goes wrong in the parse (usually an access error)
1659            # clear the placeholder state.  From here on out the code delays
1660            # exceptions.
1661            self.state_lock.acquire()
1662            del self.state[eid]
1663            self.state_lock.release()
1664            raise e
1665
1666        thread_pool_info = self.thread_pool()
1667        threads = [ ]
1668
1669        for tb in [ k for k in allocated.keys() if k != master]:
1670            # Wait until we have a free slot to start the next testbed load
1671            thread_pool_info.acquire()
1672            while thread_pool_info.started - \
1673                    thread_pool_info.terminated >= self.nthreads:
1674                thread_pool_info.wait()
1675            thread_pool_info.release()
1676
1677            # Create and start a thread to start the segment, and save it to
1678            # get the return value later
1679            t  = self.pooled_thread(target=self.start_segment, 
1680                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1681                    pdata=thread_pool_info, trace_file=self.trace_file)
1682            threads.append(t)
1683            t.start()
1684
1685        # Wait until all finish (the first clause of the while is to make sure
1686        # one starts)
1687        thread_pool_info.acquire()
1688        while thread_pool_info.started == 0 or \
1689                thread_pool_info.started > thread_pool_info.terminated:
1690            thread_pool_info.wait()
1691        thread_pool_info.release()
1692
1693        # If none failed, start the master
1694        failed = [ t.getName() for t in threads if not t.rv ]
1695
1696        if len(failed) == 0:
1697            if not self.start_segment(master, eid, tbparams, tmpdir):
1698                failed.append(master)
1699
1700        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1701        # If one failed clean up, unless fail_soft is set
1702        if failed:
1703            if not fail_soft:
1704                for tb in succeeded:
1705                    self.stop_segment(tb, eid, tbparams)
1706                # Remove the placeholder
1707                self.state_lock.acquire()
1708                del self.state[eid]
1709                self.state_lock.release()
1710
1711                raise service_error(service_error.federant,
1712                    "Swap in failed on %s" % ",".join(failed))
1713        else:
1714            self.log.info("[start_segment]: Experiment %s started" % eid)
1715
1716        # Generate an ID for the experiment (slice) and a certificate that the
1717        # allocator can use to prove they own it.  We'll ship it back through
1718        # the encrypted connection.
1719        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1720
1721        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1722
1723        # Walk up tmpdir, deleting as we go
1724        for path, dirs, files in os.walk(tmpdir, topdown=False):
1725            for f in files:
1726                os.remove(os.path.join(path, f))
1727            for d in dirs:
1728                os.rmdir(os.path.join(path, d))
1729        os.rmdir(tmpdir)
1730
1731        # The deepcopy prevents the allocation ID and other binaries from being
1732        # translated into other formats
1733        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1734                for tb in tbparams.keys() \
1735                    if tbparams[tb].has_key('federant') ],\
1736                    'vtopo': vtopo,\
1737                    'vis' : vis,
1738                    'experimentID' : [\
1739                            { 'fedid': copy.copy(expid) }, \
1740                            { 'localname': eid },\
1741                        ],\
1742                    'experimentAccess': { 'X509' : expcert },\
1743                }
1744
1745        # Insert the experiment into our state and update the disk copy
1746        self.state_lock.acquire()
1747        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1748                for tb in tbparams.keys() \
1749                    if tbparams[tb].has_key('federant') ],\
1750                    'vtopo': vtopo,\
1751                    'vis' : vis,
1752                    'owner': fid,
1753                    'experimentID' : [\
1754                            { 'fedid': expid }, { 'localname': eid },\
1755                        ],\
1756                }
1757        self.state[eid] = self.state[expid]
1758        if self.state_filename: self.write_state()
1759        self.state_lock.release()
1760
1761        self.auth.set_attribute(fid, expid)
1762        self.auth.set_attribute(expid, expid)
1763
1764        if not failed:
1765            return resp
1766        else:
1767            raise service_error(service_error.partial, \
1768                    "Partial swap in on %s" % ",".join(succeeded))
1769
1770    def check_experiment_access(self, fid, key):
1771        """
1772        Confirm that the fid has access to the experiment.  Though a request
1773        may be made in terms of a local name, the access attribute is always
1774        the experiment's fedid.
1775        """
1776        if not isinstance(key, fedid):
1777            self.state_lock.acquire()
1778            if self.state.has_key(key):
1779                try:
1780                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1781                            if f.has_key('fedid') ]
1782                except KeyError:
1783                    self.state_lock.release()
1784                    raise service_error(service_error.internal, 
1785                            "No fedid for experiment %s when checking " +\
1786                                    "access(!?)" % key)
1787                if len(kl) == 1:
1788                    key = kl[0]
1789                else:
1790                    self.state_lock.release()
1791                    raise service_error(service_error.internal, 
1792                            "multiple fedids for experiment %s when " +\
1793                                    "checking access(!?)" % key)
1794            else:
1795                self.state_lock.release()
1796                raise service_error(service_error.access, "Access Denied")
1797            self.state_lock.release()
1798
1799        if self.auth.check_attribute(fid, key):
1800            return True
1801        else:
1802            raise service_error(service_error.access, "Access Denied")
1803
1804
1805
1806    def get_vtopo(self, req, fid):
1807        """
1808        Return the stored virtual topology for this experiment
1809        """
1810        rv = None
1811
1812        req = req.get('VtopoRequestBody', None)
1813        if not req:
1814            raise service_error(service_error.req,
1815                    "Bad request format (no VtopoRequestBody)")
1816        exp = req.get('experiment', None)
1817        if exp:
1818            if exp.has_key('fedid'):
1819                key = exp['fedid']
1820                keytype = "fedid"
1821            elif exp.has_key('localname'):
1822                key = exp['localname']
1823                keytype = "localname"
1824            else:
1825                raise service_error(service_error.req, "Unknown lookup type")
1826        else:
1827            raise service_error(service_error.req, "No request?")
1828
1829        self.check_experiment_access(fid, key)
1830
1831        self.state_lock.acquire()
1832        if self.state.has_key(key):
1833            rv = { 'experiment' : {keytype: key },\
1834                    'vtopo': self.state[key]['vtopo'],\
1835                }
1836        self.state_lock.release()
1837
1838        if rv: return rv
1839        else: raise service_error(service_error.req, "No such experiment")
1840
1841    def get_vis(self, req, fid):
1842        """
1843        Return the stored visualization for this experiment
1844        """
1845        rv = None
1846
1847        req = req.get('VisRequestBody', None)
1848        if not req:
1849            raise service_error(service_error.req,
1850                    "Bad request format (no VisRequestBody)")
1851        exp = req.get('experiment', None)
1852        if exp:
1853            if exp.has_key('fedid'):
1854                key = exp['fedid']
1855                keytype = "fedid"
1856            elif exp.has_key('localname'):
1857                key = exp['localname']
1858                keytype = "localname"
1859            else:
1860                raise service_error(service_error.req, "Unknown lookup type")
1861        else:
1862            raise service_error(service_error.req, "No request?")
1863
1864        self.check_experiment_access(fid, key)
1865
1866        self.state_lock.acquire()
1867        if self.state.has_key(key):
1868            rv =  { 'experiment' : {keytype: key },\
1869                    'vis': self.state[key]['vis'],\
1870                    }
1871        self.state_lock.release()
1872
1873        if rv: return rv
1874        else: raise service_error(service_error.req, "No such experiment")
1875
1876    def get_info(self, req, fid):
1877        """
1878        Return all the stored info about this experiment
1879        """
1880        rv = None
1881
1882        req = req.get('InfoRequestBody', None)
1883        if not req:
1884            raise service_error(service_error.req,
1885                    "Bad request format (no VisRequestBody)")
1886        exp = req.get('experiment', None)
1887        if exp:
1888            if exp.has_key('fedid'):
1889                key = exp['fedid']
1890                keytype = "fedid"
1891            elif exp.has_key('localname'):
1892                key = exp['localname']
1893                keytype = "localname"
1894            else:
1895                raise service_error(service_error.req, "Unknown lookup type")
1896        else:
1897            raise service_error(service_error.req, "No request?")
1898
1899        self.check_experiment_access(fid, key)
1900
1901        # The state may be massaged by the service function that called
1902        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1903        # state.
1904        self.state_lock.acquire()
1905        if self.state.has_key(key):
1906            rv = copy.deepcopy(self.state[key])
1907        self.state_lock.release()
1908
1909        if rv: return rv
1910        else: raise service_error(service_error.req, "No such experiment")
1911
1912
1913    def terminate_experiment(self, req, fid):
1914        """
1915        Swap this experiment out on the federants and delete the shared
1916        information
1917        """
1918        tbparams = { }
1919        req = req.get('TerminateRequestBody', None)
1920        if not req:
1921            raise service_error(service_error.req,
1922                    "Bad request format (no TerminateRequestBody)")
1923        exp = req.get('experiment', None)
1924        if exp:
1925            if exp.has_key('fedid'):
1926                key = exp['fedid']
1927                keytype = "fedid"
1928            elif exp.has_key('localname'):
1929                key = exp['localname']
1930                keytype = "localname"
1931            else:
1932                raise service_error(service_error.req, "Unknown lookup type")
1933        else:
1934            raise service_error(service_error.req, "No request?")
1935
1936        self.check_experiment_access(fid, key)
1937
1938        self.state_lock.acquire()
1939        fed_exp = self.state.get(key, None)
1940
1941        if fed_exp:
1942            # This branch of the conditional holds the lock to generate a
1943            # consistent temporary tbparams variable to deallocate experiments.
1944            # It releases the lock to do the deallocations and reacquires it to
1945            # remove the experiment state when the termination is complete.
1946            ids = []
1947            #  experimentID is a list of dicts that are self-describing
1948            #  identifiers.  This finds all the fedids and localnames - the
1949            #  keys of self.state - and puts them into ids.
1950            for id in fed_exp.get('experimentID', []):
1951                if id.has_key('fedid'): ids.append(id['fedid'])
1952                if id.has_key('localname'): ids.append(id['localname'])
1953
1954            # Construct enough of the tbparams to make the stop_segment calls
1955            # work
1956            for fed in fed_exp['federant']:
1957                try:
1958                    for e in fed['name']:
1959                        eid = e.get('localname', None)
1960                        if eid: break
1961                    else:
1962                        continue
1963
1964                    p = fed['emulab']['project']
1965
1966                    project = p['name']['localname']
1967                    tb = p['testbed']['localname']
1968                    user = p['user'][0]['userID']['localname']
1969
1970                    domain = fed['emulab']['domain']
1971                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1972                    aid = fed['allocID']
1973                except KeyError, e:
1974                    continue
1975                tbparams[tb] = {\
1976                        'user': user,\
1977                        'domain': domain,\
1978                        'project': project,\
1979                        'host': host,\
1980                        'eid': eid,\
1981                        'aid': aid,\
1982                    }
1983            self.state_lock.release()
1984
1985            # Stop everyone.
1986            for tb in tbparams.keys():
1987                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1988
1989            # release the allocations
1990            for tb in tbparams.keys():
1991                self.release_access(tb, tbparams[tb]['aid'])
1992
1993            # Remove the terminated experiment
1994            self.state_lock.acquire()
1995            for id in ids:
1996                if self.state.has_key(id): del self.state[id]
1997
1998            if self.state_filename: self.write_state()
1999            self.state_lock.release()
2000
2001            return { 'experiment': exp }
2002        else:
2003            # Don't forget to release the lock
2004            self.state_lock.release()
2005            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.