source: fedd/fedd/experiment_control.py @ f069052

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since f069052 was f069052, checked in by Ted Faber <faber@…>, 15 years ago

Two changes. Get allow_any_CA checking to work (i.e., self signed certs or
certs signed by an unknown entity) and put more of the ZSI-dependent stuff into
the hidden parts or remote_services. Now those routines will find all the
relevant classes and part names from the naming conventions.

  • Property mode set to 100644
File size: 59.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = None
152        self.cert_pwd = None
153        self.trusted_certs = None
154
155        # Walk through the various relevant certificat specifying config
156        # attributes until the local certificate attributes can be resolved.
157        # The walk is from most specific to most general specification.
158        for s in ("experiment_control", "globals"):
159            if config.has_section(s): 
160                if config.has_option(s, "cert_file"):
161                    if not self.cert_file:
162                        self.cert_file = config.get(s, "cert_file")
163                        self.cert_pwd = config.get(s, "cert_pwd")
164
165                if config.has_option(s, "trusted_certs"):
166                    if not self.trusted_certs:
167                        self.trusted_certs = config.get(s, "trusted_certs")
168
169
170        self.exp_stem = "fed-stem"
171        self.log = logging.getLogger("fedd.experiment_control")
172        set_log_level(config, "experiment_control", self.log)
173        self.muxmax = 2
174        self.nthreads = 2
175        self.randomize_experiments = False
176
177        self.scp_exec = "/usr/bin/scp"
178        self.splitter = None
179        self.ssh_exec="/usr/bin/ssh"
180        self.ssh_keygen = "/usr/bin/ssh-keygen"
181        self.ssh_identity_file = None
182
183
184        self.debug = config.getboolean("experiment_control", "create_debug")
185        self.state_filename = config.get("experiment_control", 
186                "experiment_state_file")
187        self.splitter_url = config.get("experiment_control", "splitter_url")
188        self.fedkit = config.get("experiment_control", "fedkit")
189        accessdb_file = config.get("experiment_control", "accessdb")
190
191        self.ssh_pubkey_file = config.get("experiment_control", 
192                "ssh_pubkey_file")
193        self.ssh_privkey_file = config.get("experiment_control",
194                "ssh_privkey_file")
195        # NB for internal master/slave ops, not experiment setup
196        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
197        self.state = { }
198        self.state_lock = Lock()
199        self.tclsh = "/usr/local/bin/otclsh"
200        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
201        mapdb_file = config.get("experiment_control", "mapdb")
202        self.trace_file = sys.stderr
203
204        self.def_expstart = \
205                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
206                "/tmp/federate";
207        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
208                "FEDDIR/hosts";
209        self.def_gwstart = \
210                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
211                "/tmp/bridge.log";
212        self.def_mgwstart = \
213                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
214                "/tmp/bridge.log";
215        self.def_gwimage = "FBSD61-TUNNEL2";
216        self.def_gwtype = "pc";
217
218        if auth:
219            self.auth = auth
220        else:
221            self.log.error(\
222                    "[access]: No authorizer initialized, creating local one.")
223            auth = authorizer()
224
225
226        if self.ssh_pubkey_file:
227            try:
228                f = open(self.ssh_pubkey_file, 'r')
229                self.ssh_pubkey = f.read()
230                f.close()
231            except IOError:
232                raise service_error(service_error.internal,
233                        "Cannot read sshpubkey")
234        else:
235            raise service_error(service_error.internal, 
236                    "No SSH public key file?")
237
238        if not self.ssh_privkey_file:
239            raise service_error(service_error.internal, 
240                    "No SSH public key file?")
241
242
243        if mapdb_file:
244            self.read_mapdb(mapdb_file)
245        else:
246            self.log.warn("[experiment_control] No testbed map, using defaults")
247            self.tbmap = { 
248                    'deter':'https://users.isi.deterlab.net:23235',
249                    'emulab':'https://users.isi.deterlab.net:23236',
250                    'ucb':'https://users.isi.deterlab.net:23237',
251                    }
252
253        if accessdb_file:
254                self.read_accessdb(accessdb_file)
255        else:
256            raise service_error(service_error.internal,
257                    "No accessdb specified in config")
258
259        # Grab saved state.  OK to do this w/o locking because it's read only
260        # and only one thread should be in existence that can see self.state at
261        # this point.
262        if self.state_filename:
263            self.read_state()
264
265        # Dispatch tables
266        self.soap_services = {\
267                'Create': soap_handler('Create', self.create_experiment),
268                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
269                'Vis': soap_handler('Vis', self.get_vis),
270                'Info': soap_handler('Info', self.get_info),
271                'Terminate': soap_handler('Terminate', 
272                    self.terminate_experiment),
273        }
274
275        self.xmlrpc_services = {\
276                'Create': xmlrpc_handler('Create', self.create_experiment),
277                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
278                'Vis': xmlrpc_handler('Vis', self.get_vis),
279                'Info': xmlrpc_handler('Info', self.get_info),
280                'Terminate': xmlrpc_handler('Terminate',
281                    self.terminate_experiment),
282        }
283
284    def copy_file(self, src, dest, size=1024):
285        """
286        Exceedingly simple file copy.
287        """
288        s = open(src,'r')
289        d = open(dest, 'w')
290
291        buf = "x"
292        while buf != "":
293            buf = s.read(size)
294            d.write(buf)
295        s.close()
296        d.close()
297
298    # Call while holding self.state_lock
299    def write_state(self):
300        """
301        Write a new copy of experiment state after copying the existing state
302        to a backup.
303
304        State format is a simple pickling of the state dictionary.
305        """
306        if os.access(self.state_filename, os.W_OK):
307            self.copy_file(self.state_filename, \
308                    "%s.bak" % self.state_filename)
309        try:
310            f = open(self.state_filename, 'w')
311            pickle.dump(self.state, f)
312        except IOError, e:
313            self.log.error("Can't write file %s: %s" % \
314                    (self.state_filename, e))
315        except pickle.PicklingError, e:
316            self.log.error("Pickling problem: %s" % e)
317        except TypeError, e:
318            self.log.error("Pickling problem (TypeError): %s" % e)
319
320    # Call while holding self.state_lock
321    def read_state(self):
322        """
323        Read a new copy of experiment state.  Old state is overwritten.
324
325        State format is a simple pickling of the state dictionary.
326        """
327        try:
328            f = open(self.state_filename, "r")
329            self.state = pickle.load(f)
330            self.log.debug("[read_state]: Read state from %s" % \
331                    self.state_filename)
332        except IOError, e:
333            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
334                    % (self.state_filename, e))
335        except pickle.UnpicklingError, e:
336            self.log.warning(("[read_state]: No saved state: " + \
337                    "Unpickling failed: %s") % e)
338       
339        for k in self.state.keys():
340            try:
341                # This list should only have one element in it, but phrasing it
342                # as a for loop doesn't cost much, really.  We have to find the
343                # fedid elements anyway.
344                for eid in [ f['fedid'] \
345                        for f in self.state[k]['experimentID']\
346                            if f.has_key('fedid') ]:
347                    self.auth.set_attribute(self.state[k]['owner'], eid)
348            except KeyError, e:
349                self.log.warning("[read_state]: State ownership or identity " +\
350                        "misformatted in %s: %s" % (self.state_filename, e))
351
352
353    def read_accessdb(self, accessdb_file):
354        """
355        Read the mapping from fedids that can create experiments to their name
356        in the 3-level access namespace.  All will be asserted from this
357        testbed and can include the local username and porject that will be
358        asserted on their behalf by this fedd.  Each fedid is also added to the
359        authorization system with the "create" attribute.
360        """
361        self.accessdb = {}
362        # These are the regexps for parsing the db
363        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
364        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
365                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
366        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
367                "\s*->\s*(" + name_expr + ")\s*$")
368        lineno = 0
369
370        # Parse the mappings and store in self.authdb, a dict of
371        # fedid -> (proj, user)
372        try:
373            f = open(accessdb_file, "r")
374            for line in f:
375                lineno += 1
376                line = line.strip()
377                if len(line) == 0 or line.startswith('#'):
378                    continue
379                m = project_line.match(line)
380                if m:
381                    fid = fedid(hexstr=m.group(1))
382                    project, user = m.group(2,3)
383                    if not self.accessdb.has_key(fid):
384                        self.accessdb[fid] = []
385                    self.accessdb[fid].append((project, user))
386                    continue
387
388                m = user_line.match(line)
389                if m:
390                    fid = fedid(hexstr=m.group(1))
391                    project = None
392                    user = m.group(2)
393                    if not self.accessdb.has_key(fid):
394                        self.accessdb[fid] = []
395                    self.accessdb[fid].append((project, user))
396                    continue
397                self.log.warn("[experiment_control] Error parsing access " +\
398                        "db %s at line %d" %  (accessdb_file, lineno))
399        except IOError:
400            raise service_error(service_error.internal,
401                    "Error opening/reading %s as experiment " +\
402                            "control accessdb" %  accessdb_file)
403        f.close()
404
405        # Initialize the authorization attributes
406        for fid in self.accessdb.keys():
407            self.auth.set_attribute(fid, 'create')
408
409    def read_mapdb(self, file):
410        """
411        Read a simple colon separated list of mappings for the
412        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
413        """
414
415        self.tbmap = { }
416        lineno =0
417        try:
418            f = open(file, "r")
419            for line in f:
420                lineno += 1
421                line = line.strip()
422                if line.startswith('#') or len(line) == 0:
423                    continue
424                try:
425                    label, url = line.split(':', 1)
426                    self.tbmap[label] = url
427                except ValueError, e:
428                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
429                            "map db: %s %s" % (lineno, line, e))
430        except IOError, e:
431            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
432                    "open %s: %s" % (file, e))
433        f.close()
434
435    def scp_file(self, file, user, host, dest=""):
436        """
437        scp a file to the remote host.  If debug is set the action is only
438        logged.
439        """
440
441        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
442                "%s@%s:%s" % (user, host, dest)]
443        rv = 0
444
445        try:
446            dnull = open("/dev/null", "r")
447        except IOError:
448            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
449            dnull = Null
450
451        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
452        if not self.debug:
453            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
454            else: rv = call(scp_cmd)
455
456        return rv == 0
457
458    def ssh_cmd(self, user, host, cmd, wname=None):
459        """
460        Run a remote command on host as user.  If debug is set, the action is
461        only logged.
462        """
463        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
464                user, host, cmd)
465
466        try:
467            dnull = open("/dev/null", "r")
468        except IOError:
469            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
470            dnull = Null
471
472        self.log.debug("[ssh_cmd]: %s" % sh_str)
473        if not self.debug:
474            if dnull:
475                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
476            else:
477                sub = Popen(sh_str, shell=True)
478            return sub.wait() == 0
479        else:
480            return True
481
482    def ship_configs(self, host, user, src_dir, dest_dir):
483        """
484        Copy federant-specific configuration files to the federant.
485        """
486        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
487            return False
488        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
489            return False
490
491        for f in os.listdir(src_dir):
492            if os.path.isdir(f):
493                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
494                        "%s/%s" % (dest_dir, f)):
495                    return False
496            else:
497                if not self.scp_file("%s/%s" % (src_dir, f), 
498                        user, host, dest_dir):
499                    return False
500        return True
501
502    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
503        """
504        Start a sub-experiment on a federant.
505
506        Get the current state, modify or create as appropriate, ship data and
507        configs and start the experiment.  There are small ordering differences
508        based on the initial state of the sub-experiment.
509        """
510        # ops node in the federant
511        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
512        user = tbparams[tb]['user']     # federant user
513        pid = tbparams[tb]['project']   # federant project
514        # XXX
515        base_confs = ( "hosts",)
516        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
517        # command to test experiment state
518        expinfo_exec = "/usr/testbed/bin/expinfo" 
519        # Configuration directories on the remote machine
520        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
521        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
522        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
523        # Regular expressions to parse the expinfo response
524        state_re = re.compile("State:\s+(\w+)")
525        no_exp_re = re.compile("^No\s+such\s+experiment")
526        state = None    # Experiment state parsed from expinfo
527        # The expinfo ssh command
528        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
529
530        # Get status
531        self.log.debug("[start_segment]: %s"% " ".join(cmd))
532        dev_null = None
533        try:
534            dev_null = open("/dev/null", "a")
535        except IOError, e:
536            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
537
538        if self.debug:
539            state = 'swapped'
540            rv = 0
541        else:
542            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
543            for line in status.stdout:
544                m = state_re.match(line)
545                if m: state = m.group(1)
546                else:
547                    m = no_exp_re.match(line)
548                    if m: state = "none"
549            rv = status.wait()
550
551        # If the experiment is not present the subcommand returns a non-zero
552        # return value.  If we successfully parsed a "none" outcome, ignore the
553        # return code.
554        if rv != 0 and state != "none":
555            raise service_error(service_error.internal,
556                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
557
558        self.log.debug("[start_segment]: %s: %s" % (tb, state))
559        self.log.info("[start_segment]:transferring experiment to %s" % tb)
560
561        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
562            return False
563        # Clear the federation files
564        if not self.ssh_cmd(user, host, 
565                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
566            return False
567        if not self.ssh_cmd(user, host, 
568                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
569            return False
570        # Clear and create the tarfiles and rpm directories
571        for d in (tarfiles_dir, rpms_dir):
572            if not self.ssh_cmd(user, host, 
573                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
574                return False
575            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
576                    "create tarfiles"):
577                return False
578       
579        if state == 'active':
580            # Remote experiment is active.  Modify it.
581            for f in base_confs:
582                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
583                        "%s/%s" % (proj_dir, f)):
584                    return False
585            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
586                    proj_dir):
587                return False
588            if os.path.isdir("%s/tarfiles" % tmpdir):
589                if not self.ship_configs(host, user,
590                        "%s/tarfiles" % tmpdir, tarfiles_dir):
591                    return False
592            if os.path.isdir("%s/rpms" % tmpdir):
593                if not self.ship_configs(host, user,
594                        "%s/rpms" % tmpdir, tarfiles_dir):
595                    return False
596            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
597            if not self.ssh_cmd(user, host,
598                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
599                            (pid, eid, tclfile), "modexp"):
600                return False
601            return True
602        elif state == "swapped":
603            # Remote experiment swapped out.  Modify it and swap it in.
604            for f in base_confs:
605                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
606                        "%s/%s" % (proj_dir, f)):
607                    return False
608            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
609                    proj_dir):
610                return False
611            if os.path.isdir("%s/tarfiles" % tmpdir):
612                if not self.ship_configs(host, user,
613                        "%s/tarfiles" % tmpdir, tarfiles_dir):
614                    return False
615            if os.path.isdir("%s/rpms" % tmpdir):
616                if not self.ship_configs(host, user,
617                        "%s/rpms" % tmpdir, tarfiles_dir):
618                    return False
619            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
620            if not self.ssh_cmd(user, host,
621                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
622                    "modexp"):
623                return False
624            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
625            if not self.ssh_cmd(user, host,
626                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
627                    "swapexp"):
628                return False
629            return True
630        elif state == "none":
631            # No remote experiment.  Create one.  We do this in 2 steps so we
632            # can put the configuration files and scripts into the new
633            # experiment directories.
634
635            # Tarfiles must be present for creation to work
636            if os.path.isdir("%s/tarfiles" % tmpdir):
637                if not self.ship_configs(host, user,
638                        "%s/tarfiles" % tmpdir, tarfiles_dir):
639                    return False
640            if os.path.isdir("%s/rpms" % tmpdir):
641                if not self.ship_configs(host, user,
642                        "%s/rpms" % tmpdir, tarfiles_dir):
643                    return False
644            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
645            if not self.ssh_cmd(user, host,
646                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
647                            (pid, eid, tclfile), "startexp"):
648                return False
649            # After startexp the per-experiment directories exist
650            for f in base_confs:
651                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
652                        "%s/%s" % (proj_dir, f)):
653                    return False
654            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
655                    proj_dir):
656                return False
657            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
658            if not self.ssh_cmd(user, host,
659                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
660                    "swapexp"):
661                return False
662            return True
663        else:
664            self.log.debug("[start_segment]:unknown state %s" % state)
665            return False
666
667    def stop_segment(self, tb, eid, tbparams):
668        """
669        Stop a sub experiment by calling swapexp on the federant
670        """
671        user = tbparams[tb]['user']
672        host = tbparams[tb]['host']
673        pid = tbparams[tb]['project']
674
675        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
676        return self.ssh_cmd(user, host,
677                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
678
679       
680    def generate_ssh_keys(self, dest, type="rsa" ):
681        """
682        Generate a set of keys for the gateways to use to talk.
683
684        Keys are of type type and are stored in the required dest file.
685        """
686        valid_types = ("rsa", "dsa")
687        t = type.lower();
688        if t not in valid_types: raise ValueError
689        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
690
691        try:
692            trace = open("/dev/null", "w")
693        except IOError:
694            raise service_error(service_error.internal,
695                    "Cannot open /dev/null??");
696
697        # May raise CalledProcessError
698        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
699        rv = call(cmd, stdout=trace, stderr=trace)
700        if rv != 0:
701            raise service_error(service_error.internal, 
702                    "Cannot generate nonce ssh keys.  %s return code %d" \
703                            % (self.ssh_keygen, rv))
704
705    def gentopo(self, str):
706        """
707        Generate the topology dtat structure from the splitter's XML
708        representation of it.
709
710        The topology XML looks like:
711            <experiment>
712                <nodes>
713                    <node><vname></vname><ips>ip1:ip2</ips></node>
714                </nodes>
715                <lans>
716                    <lan>
717                        <vname></vname><vnode></vnode><ip></ip>
718                        <bandwidth></bandwidth><member>node:port</member>
719                    </lan>
720                </lans>
721        """
722        class topo_parse:
723            """
724            Parse the topology XML and create the dats structure.
725            """
726            def __init__(self):
727                # Typing of the subelements for data conversion
728                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
729                self.int_subelements = ( 'bandwidth',)
730                self.float_subelements = ( 'delay',)
731                # The final data structure
732                self.nodes = [ ]
733                self.lans =  [ ]
734                self.topo = { \
735                        'node': self.nodes,\
736                        'lan' : self.lans,\
737                    }
738                self.element = { }  # Current element being created
739                self.chars = ""     # Last text seen
740
741            def end_element(self, name):
742                # After each sub element the contents is added to the current
743                # element or to the appropriate list.
744                if name == 'node':
745                    self.nodes.append(self.element)
746                    self.element = { }
747                elif name == 'lan':
748                    self.lans.append(self.element)
749                    self.element = { }
750                elif name in self.str_subelements:
751                    self.element[name] = self.chars
752                    self.chars = ""
753                elif name in self.int_subelements:
754                    self.element[name] = int(self.chars)
755                    self.chars = ""
756                elif name in self.float_subelements:
757                    self.element[name] = float(self.chars)
758                    self.chars = ""
759
760            def found_chars(self, data):
761                self.chars += data.rstrip()
762
763
764        tp = topo_parse();
765        parser = xml.parsers.expat.ParserCreate()
766        parser.EndElementHandler = tp.end_element
767        parser.CharacterDataHandler = tp.found_chars
768
769        parser.Parse(str)
770
771        return tp.topo
772       
773
774    def genviz(self, topo):
775        """
776        Generate the visualization the virtual topology
777        """
778
779        neato = "/usr/local/bin/neato"
780        # These are used to parse neato output and to create the visualization
781        # file.
782        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
783        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
784                "%s</type></node>"
785
786        try:
787            # Node names
788            nodes = [ n['vname'] for n in topo['node'] ]
789            topo_lans = topo['lan']
790        except KeyError:
791            raise service_error(service_error.internal, "Bad topology")
792
793        lans = { }
794        links = { }
795
796        # Walk through the virtual topology, organizing the connections into
797        # 2-node connections (links) and more-than-2-node connections (lans).
798        # When a lan is created, it's added to the list of nodes (there's a
799        # node in the visualization for the lan).
800        for l in topo_lans:
801            if links.has_key(l['vname']):
802                if len(links[l['vname']]) < 2:
803                    links[l['vname']].append(l['vnode'])
804                else:
805                    nodes.append(l['vname'])
806                    lans[l['vname']] = links[l['vname']]
807                    del links[l['vname']]
808                    lans[l['vname']].append(l['vnode'])
809            elif lans.has_key(l['vname']):
810                lans[l['vname']].append(l['vnode'])
811            else:
812                links[l['vname']] = [ l['vnode'] ]
813
814
815        # Open up a temporary file for dot to turn into a visualization
816        try:
817            df, dotname = tempfile.mkstemp()
818            dotfile = os.fdopen(df, 'w')
819        except IOError:
820            raise service_error(service_error.internal,
821                    "Failed to open file in genviz")
822
823        # Generate a dot/neato input file from the links, nodes and lans
824        try:
825            print >>dotfile, "graph G {"
826            for n in nodes:
827                print >>dotfile, '\t"%s"' % n
828            for l in links.keys():
829                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
830            for l in lans.keys():
831                for n in lans[l]:
832                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
833            print >>dotfile, "}"
834            dotfile.close()
835        except TypeError:
836            raise service_error(service_error.internal,
837                    "Single endpoint link in vtopo")
838        except IOError:
839            raise service_error(service_error.internal, "Cannot write dot file")
840
841        # Use dot to create a visualization
842        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
843                '-Gpack=true', dotname], stdout=PIPE)
844
845        # Translate dot to vis format
846        vis_nodes = [ ]
847        vis = { 'node': vis_nodes }
848        for line in dot.stdout:
849            m = vis_re.match(line)
850            if m:
851                vn = m.group(1)
852                vis_node = {'name': vn, \
853                        'x': float(m.group(2)),\
854                        'y' : float(m.group(3)),\
855                    }
856                if vn in links.keys() or vn in lans.keys():
857                    vis_node['type'] = 'lan'
858                else:
859                    vis_node['type'] = 'node'
860                vis_nodes.append(vis_node)
861        rv = dot.wait()
862
863        os.remove(dotname)
864        if rv == 0 : return vis
865        else: return None
866
867    def get_access(self, tb, nodes, user, tbparam, master, export_project,
868            access_user):
869        """
870        Get access to testbed through fedd and set the parameters for that tb
871        """
872
873        translate_attr = {
874            'slavenodestartcmd': 'expstart',
875            'slaveconnectorstartcmd': 'gwstart',
876            'masternodestartcmd': 'mexpstart',
877            'masterconnectorstartcmd': 'mgwstart',
878            'connectorimage': 'gwimage',
879            'connectortype': 'gwtype',
880            'tunnelcfg': 'tun',
881            'smbshare': 'smbshare',
882        }
883
884        uri = self.tbmap.get(tb, None)
885        if not uri:
886            raise service_error(serice_error.server_config, 
887                    "Unknown testbed: %s" % tb)
888
889        # currently this lumps all users into one service access group
890        service_keys = [ a for u in user \
891                for a in u.get('access', []) \
892                    if a.has_key('sshPubkey')]
893
894        if len(service_keys) == 0:
895            raise service_error(service_error.req, 
896                    "Must have at least one SSH pubkey for services")
897
898
899        for p, u in access_user:
900            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
901                    "to %s") %  ((p or "None"), u, uri))
902
903            if p:
904                # Request with user and project specified
905                req = {\
906                        'destinationTestbed' : { 'uri' : uri },
907                        'project': { 
908                            'name': {'localname': p},
909                            'user': [ {'userID': { 'localname': u } } ],
910                            },
911                        'user':  user,
912                        'allocID' : { 'localname': 'test' },
913                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
914                        'serviceAccess' : service_keys
915                    }
916            else:
917                # Request with only user specified
918                req = {\
919                        'destinationTestbed' : { 'uri' : uri },
920                        'user':  [ {'userID': { 'localname': u } } ],
921                        'allocID' : { 'localname': 'test' },
922                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
923                        'serviceAccess' : service_keys
924                    }
925
926            if tb == master:
927                # NB, the export_project parameter is a dict that includes
928                # the type
929                req['exportProject'] = export_project
930
931            # node resources if any
932            if nodes != None and len(nodes) > 0:
933                rnodes = [ ]
934                for n in nodes:
935                    rn = { }
936                    image, hw, count = n.split(":")
937                    if image: rn['image'] = [ image ]
938                    if hw: rn['hardware'] = [ hw ]
939                    if count: rn['count'] = int(count)
940                    rnodes.append(rn)
941                req['resources']= { }
942                req['resources']['node'] = rnodes
943
944            try:
945                r = self.call_RequestAccess(uri, req, 
946                        self.cert_file, self.cert_pwd, self.trusted_certs)
947            except service_error, e:
948                if e.code == service_error.access:
949                    self.log.debug("[get_access] Access denied")
950                    r = None
951                    continue
952                else:
953                    raise e
954
955            if r.has_key('RequestAccessResponseBody'):
956                # Through to here we have a valid response, not a fault.
957                # Access denied is a fault, so something better or worse than
958                # access denied has happened.
959                r = r['RequestAccessResponseBody']
960                self.log.debug("[get_access] Access granted")
961                break
962            else:
963                raise service_error(service_error.protocol,
964                        "Bad proxy response")
965       
966        if not r:
967            raise service_error(service_error.access, 
968                    "Access denied by %s (%s)" % (tb, uri))
969
970        e = r['emulab']
971        p = e['project']
972        tbparam[tb] = { 
973                "boss": e['boss'],
974                "host": e['ops'],
975                "domain": e['domain'],
976                "fs": e['fileServer'],
977                "eventserver": e['eventServer'],
978                "project": unpack_id(p['name']),
979                "emulab" : e,
980                "allocID" : r['allocID'],
981                }
982        # Make the testbed name be the label the user applied
983        p['testbed'] = {'localname': tb }
984
985        for u in p['user']:
986            tbparam[tb]['user'] = unpack_id(u['userID'])
987
988        for a in e['fedAttr']:
989            if a['attribute']:
990                key = translate_attr.get(a['attribute'].lower(), None)
991                if key:
992                    tbparam[tb][key]= a['value']
993       
994    def release_access(self, tb, aid):
995        """
996        Release access to testbed through fedd
997        """
998
999        uri = self.tbmap.get(tb, None)
1000        if not uri:
1001            raise service_error(serice_error.server_config, 
1002                    "Unknown testbed: %s" % tb)
1003
1004        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1005                self.cert_file, self.cert_pwd, self.trusted_certs)
1006
1007        # better error coding
1008
1009    def remote_splitter(self, uri, desc, master):
1010
1011        req = {
1012                'description' : { 'ns2description': desc },
1013                'master': master,
1014                'include_fedkit': bool(self.fedkit)
1015            }
1016
1017        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1018                self.trusted_certs)
1019
1020        if r.has_key('Ns2SplitResponseBody'):
1021            r = r['Ns2SplitResponseBody']
1022            if r.has_key('output'):
1023                return r['output'].splitlines()
1024            else:
1025                raise service_error(service_error.protocol, 
1026                        "Bad splitter response (no output)")
1027        else:
1028            raise service_error(service_error.protocol, "Bad splitter response")
1029       
1030    class current_testbed:
1031        """
1032        Object for collecting the current testbed description.  The testbed
1033        description is saved to a file with the local testbed variables
1034        subsittuted line by line.
1035        """
1036        def __init__(self, eid, tmpdir, fedkit):
1037            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1038            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1039            self.current_testbed = None
1040            self.testbed_file = None
1041
1042            self.def_expstart = \
1043                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1044            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1045            self.def_gwstart = \
1046                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1047            self.def_mgwstart = \
1048                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1049            self.def_gwimage = "FBSD61-TUNNEL2";
1050            self.def_gwtype = "pc";
1051
1052            self.eid = eid
1053            self.tmpdir = tmpdir
1054            self.fedkit = fedkit
1055
1056        def __call__(self, line, master, allocated, tbparams):
1057            # Capture testbed topology descriptions
1058            if self.current_testbed == None:
1059                m = self.begin_testbed.match(line)
1060                if m != None:
1061                    self.current_testbed = m.group(1)
1062                    if self.current_testbed == None:
1063                        raise service_error(service_error.req,
1064                                "Bad request format (unnamed testbed)")
1065                    allocated[self.current_testbed] = \
1066                            allocated.get(self.current_testbed,0) + 1
1067                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1068                    if not os.path.exists(tb_dir):
1069                        try:
1070                            os.mkdir(tb_dir)
1071                        except IOError:
1072                            raise service_error(service_error.internal,
1073                                    "Cannot create %s" % tb_dir)
1074                    try:
1075                        self.testbed_file = open("%s/%s.%s.tcl" %
1076                                (tb_dir, self.eid, self.current_testbed), 'w')
1077                    except IOError:
1078                        self.testbed_file = None
1079                    return True
1080                else: return False
1081            else:
1082                m = self.end_testbed.match(line)
1083                if m != None:
1084                    if m.group(1) != self.current_testbed:
1085                        raise service_error(service_error.internal, 
1086                                "Mismatched testbed markers!?")
1087                    if self.testbed_file != None: 
1088                        self.testbed_file.close()
1089                        self.testbed_file = None
1090                    self.current_testbed = None
1091                elif self.testbed_file:
1092                    # Substitute variables and put the line into the local
1093                    # testbed file.
1094                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1095                            self.def_gwtype)
1096                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1097                            self.def_gwimage)
1098                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1099                            self.def_mgwstart)
1100                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1101                            self.def_mexpstart)
1102                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1103                            self.def_gwstart)
1104                    expstart = tbparams[self.current_testbed].get('expstart', 
1105                            self.def_expstart)
1106                    project = tbparams[self.current_testbed].get('project')
1107                    line = re.sub("GWTYPE", gwtype, line)
1108                    line = re.sub("GWIMAGE", gwimage, line)
1109                    if self.current_testbed == master:
1110                        line = re.sub("GWSTART", mgwstart, line)
1111                        line = re.sub("EXPSTART", mexpstart, line)
1112                    else:
1113                        line = re.sub("GWSTART", gwstart, line)
1114                        line = re.sub("EXPSTART", expstart, line)
1115                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1116                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1117                    line = re.sub("EID", self.eid, line)
1118                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1119                            (project, self.eid), line)
1120                    if self.fedkit:
1121                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1122                                line)
1123                    print >>self.testbed_file, line
1124                return True
1125
1126    class allbeds:
1127        """
1128        Process the Allbeds section.  Get access to each federant and save the
1129        parameters in tbparams
1130        """
1131        def __init__(self, get_access):
1132            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1133            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1134            self.in_allbeds = False
1135            self.get_access = get_access
1136
1137        def __call__(self, line, user, tbparams, master, export_project,
1138                access_user):
1139            # Testbed access parameters
1140            if not self.in_allbeds:
1141                if self.begin_allbeds.match(line):
1142                    self.in_allbeds = True
1143                    return True
1144                else:
1145                    return False
1146            else:
1147                if self.end_allbeds.match(line):
1148                    self.in_allbeds = False
1149                else:
1150                    nodes = line.split('|')
1151                    tb = nodes.pop(0)
1152                    self.get_access(tb, nodes, user, tbparams, master,
1153                            export_project, access_user)
1154                return True
1155
1156    class gateways:
1157        def __init__(self, eid, master, tmpdir, gw_pubkey,
1158                gw_secretkey, copy_file, fedkit):
1159            self.begin_gateways = \
1160                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1161            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1162            self.current_gateways = None
1163            self.control_gateway = None
1164            self.active_end = { }
1165
1166            self.eid = eid
1167            self.master = master
1168            self.tmpdir = tmpdir
1169            self.gw_pubkey_base = gw_pubkey
1170            self.gw_secretkey_base = gw_secretkey
1171
1172            self.copy_file = copy_file
1173            self.fedkit = fedkit
1174
1175
1176        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1177                active_end, tbparams, dtb, myname, desthost, type):
1178            """
1179            Produce a gateway configuration file from a gateways line.
1180            """
1181
1182            sproject = tbparams[gw].get('project', 'project')
1183            dproject = tbparams[dtb].get('project', 'project')
1184            sdomain = ".%s.%s%s" % (eid, sproject,
1185                    tbparams[gw].get('domain', ".example.com"))
1186            ddomain = ".%s.%s%s" % (eid, dproject,
1187                    tbparams[dtb].get('domain', ".example.com"))
1188            boss = tbparams[master].get('boss', "boss")
1189            fs = tbparams[master].get('fs', "fs")
1190            event_server = "%s%s" % \
1191                    (tbparams[gw].get('eventserver', "event_server"),
1192                            tbparams[gw].get('domain', "example.com"))
1193            remote_event_server = "%s%s" % \
1194                    (tbparams[dtb].get('eventserver', "event_server"),
1195                            tbparams[dtb].get('domain', "example.com"))
1196            seer_control = "%s%s" % \
1197                    (tbparams[gw].get('control', "control"), sdomain)
1198
1199            if self.fedkit:
1200                remote_script_dir = "/usr/local/federation/bin"
1201                local_script_dir = "/usr/local/federation/bin"
1202            else:
1203                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1204                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1205
1206            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1207            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1208            tunnel_cfg = tbparams[gw].get("tun", "false")
1209
1210            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1211            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1212
1213            # translate to lower case so the `hostname` hack for specifying
1214            # configuration files works.
1215            conf_file = conf_file.lower();
1216            remote_conf_file = remote_conf_file.lower();
1217
1218            if dtb == master:
1219                active = "false"
1220            elif gw == master:
1221                active = "true"
1222            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1223                active = "false"
1224            else:
1225                active_end['%s-%s' % (gw, dtb)] = 1
1226                active = "true"
1227
1228            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1229            print >>gwconfig, "Active: %s" % active
1230            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1231            print >>gwconfig, "BossName: %s" % boss
1232            print >>gwconfig, "FsName: %s" % fs
1233            print >>gwconfig, "EventServerName: %s" % event_server
1234            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1235            print >>gwconfig, "SeerControl: %s" % seer_control
1236            print >>gwconfig, "Type: %s" % type
1237            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1238            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1239                    local_script_dir
1240            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1241            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1242            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1243                    (remote_conf_dir, remote_conf_file)
1244            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1245            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1246            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1247            gwconfig.close()
1248
1249            return active == "true"
1250
1251        def __call__(self, line, allocated, tbparams):
1252            # Process gateways
1253            if not self.current_gateways:
1254                m = self.begin_gateways.match(line)
1255                if m:
1256                    self.current_gateways = m.group(1)
1257                    if allocated.has_key(self.current_gateways):
1258                        # This test should always succeed
1259                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1260                        if not os.path.exists(tb_dir):
1261                            try:
1262                                os.mkdir(tb_dir)
1263                            except IOError:
1264                                raise service_error(service_error.internal,
1265                                        "Cannot create %s" % tb_dir)
1266                    else:
1267                        # XXX
1268                        self.log.error("[gateways]: Ignoring gateways for " + \
1269                                "unknown testbed %s" % self.current_gateways)
1270                        self.current_gateways = None
1271                    return True
1272                else:
1273                    return False
1274            else:
1275                m = self.end_gateways.match(line)
1276                if m :
1277                    if m.group(1) != self.current_gateways:
1278                        raise service_error(service_error.internal,
1279                                "Mismatched gateway markers!?")
1280                    if self.control_gateway:
1281                        try:
1282                            cc = open("%s/%s/client.conf" %
1283                                    (self.tmpdir, self.current_gateways), 'w')
1284                            print >>cc, "ControlGateway: %s" % \
1285                                    self.control_gateway
1286                            if tbparams[self.master].has_key('smbshare'):
1287                                print >>cc, "SMBSHare: %s" % \
1288                                        tbparams[self.master]['smbshare']
1289                            print >>cc, "ProjectUser: %s" % \
1290                                    tbparams[self.master]['user']
1291                            print >>cc, "ProjectName: %s" % \
1292                                    tbparams[self.master]['project']
1293                            cc.close()
1294                        except IOError:
1295                            raise service_error(service_error.internal,
1296                                    "Error creating client config")
1297                        try:
1298                            cc = open("%s/%s/seer.conf" %
1299                                    (self.tmpdir, self.current_gateways),
1300                                    'w')
1301                            if self.current_gateways != self.master:
1302                                print >>cc, "ControlNode: %s" % \
1303                                        self.control_gateway
1304                            print >>cc, "ExperimentID: %s/%s" % \
1305                                    ( tbparams[self.master]['project'], \
1306                                    self.eid )
1307                            cc.close()
1308                        except IOError:
1309                            raise service_error(service_error.internal,
1310                                    "Error creating seer config")
1311                    else:
1312                        debug.error("[gateways]: No control gateway for %s" %\
1313                                    self.current_gateways)
1314                    self.current_gateways = None
1315                else:
1316                    dtb, myname, desthost, type = line.split(" ")
1317
1318                    if type == "control" or type == "both":
1319                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1320                                self.eid, 
1321                                tbparams[self.current_gateways]['project'],
1322                                tbparams[self.current_gateways]['domain'])
1323                    try:
1324                        active = self.gateway_conf_file(self.current_gateways,
1325                                self.master, self.eid, self.gw_pubkey_base,
1326                                self.gw_secretkey_base,
1327                                self.active_end, tbparams, dtb, myname,
1328                                desthost, type)
1329                    except IOError, e:
1330                        raise service_error(service_error.internal,
1331                                "Failed to write config file for %s" % \
1332                                        self.current_gateway)
1333           
1334                    gw_pubkey = "%s/keys/%s" % \
1335                            (self.tmpdir, self.gw_pubkey_base)
1336                    gw_secretkey = "%s/keys/%s" % \
1337                            (self.tmpdir, self.gw_secretkey_base)
1338
1339                    pkfile = "%s/%s/%s" % \
1340                            ( self.tmpdir, self.current_gateways, 
1341                                    self.gw_pubkey_base)
1342                    skfile = "%s/%s/%s" % \
1343                            ( self.tmpdir, self.current_gateways, 
1344                                    self.gw_secretkey_base)
1345
1346                    if not os.path.exists(pkfile):
1347                        try:
1348                            self.copy_file(gw_pubkey, pkfile)
1349                        except IOError:
1350                            service_error(service_error.internal,
1351                                    "Failed to copy pubkey file")
1352
1353                    if active and not os.path.exists(skfile):
1354                        try:
1355                            self.copy_file(gw_secretkey, skfile)
1356                        except IOError:
1357                            service_error(service_error.internal,
1358                                    "Failed to copy secretkey file")
1359                return True
1360
1361    class shunt_to_file:
1362        """
1363        Simple class to write data between two regexps to a file.
1364        """
1365        def __init__(self, begin, end, filename):
1366            """
1367            Begin shunting on a match of begin, stop on end, send data to
1368            filename.
1369            """
1370            self.begin = re.compile(begin)
1371            self.end = re.compile(end)
1372            self.in_shunt = False
1373            self.file = None
1374            self.filename = filename
1375
1376        def __call__(self, line):
1377            """
1378            Call this on each line in the input that may be shunted.
1379            """
1380            if not self.in_shunt:
1381                if self.begin.match(line):
1382                    self.in_shunt = True
1383                    try:
1384                        self.file = open(self.filename, "w")
1385                    except:
1386                        self.file = None
1387                        raise
1388                    return True
1389                else:
1390                    return False
1391            else:
1392                if self.end.match(line):
1393                    if self.file: 
1394                        self.file.close()
1395                        self.file = None
1396                    self.in_shunt = False
1397                else:
1398                    if self.file:
1399                        print >>self.file, line
1400                return True
1401
1402    class shunt_to_list:
1403        """
1404        Same interface as shunt_to_file.  Data collected in self.list, one list
1405        element per line.
1406        """
1407        def __init__(self, begin, end):
1408            self.begin = re.compile(begin)
1409            self.end = re.compile(end)
1410            self.in_shunt = False
1411            self.list = [ ]
1412       
1413        def __call__(self, line):
1414            if not self.in_shunt:
1415                if self.begin.match(line):
1416                    self.in_shunt = True
1417                    return True
1418                else:
1419                    return False
1420            else:
1421                if self.end.match(line):
1422                    self.in_shunt = False
1423                else:
1424                    self.list.append(line)
1425                return True
1426
1427    class shunt_to_string:
1428        """
1429        Same interface as shunt_to_file.  Data collected in self.str, all in
1430        one string.
1431        """
1432        def __init__(self, begin, end):
1433            self.begin = re.compile(begin)
1434            self.end = re.compile(end)
1435            self.in_shunt = False
1436            self.str = ""
1437       
1438        def __call__(self, line):
1439            if not self.in_shunt:
1440                if self.begin.match(line):
1441                    self.in_shunt = True
1442                    return True
1443                else:
1444                    return False
1445            else:
1446                if self.end.match(line):
1447                    self.in_shunt = False
1448                else:
1449                    self.str += line
1450                return True
1451
1452    def create_experiment(self, req, fid):
1453        """
1454        The external interface to experiment creation called from the
1455        dispatcher.
1456
1457        Creates a working directory, splits the incoming description using the
1458        splitter script and parses out the avrious subsections using the
1459        lcasses above.  Once each sub-experiment is created, use pooled threads
1460        to instantiate them and start it all up.
1461        """
1462
1463        if not self.auth.check_attribute(fid, 'create'):
1464            raise service_error(service_error.access, "Create access denied")
1465
1466        try:
1467            tmpdir = tempfile.mkdtemp(prefix="split-")
1468        except IOError:
1469            raise service_error(service_error.internal, "Cannot create tmp dir")
1470
1471        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1472        gw_secretkey_base = "fed.%s" % self.ssh_type
1473        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1474        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1475        tclfile = tmpdir + "/experiment.tcl"
1476        tbparams = { }
1477        try:
1478            access_user = self.accessdb[fid]
1479        except KeyError:
1480            raise service_error(service_error.internal,
1481                    "Access map and authorizer out of sync in " + \
1482                            "create_experiment for fedid %s"  % fid)
1483
1484        pid = "dummy"
1485        gid = "dummy"
1486        # XXX
1487        fail_soft = False
1488
1489        try:
1490            os.mkdir(tmpdir+"/keys")
1491        except OSError:
1492            raise service_error(service_error.internal,
1493                    "Can't make temporary dir")
1494
1495        req = req.get('CreateRequestBody', None)
1496        if not req:
1497            raise service_error(service_error.req,
1498                    "Bad request format (no CreateRequestBody)")
1499        # The tcl parser needs to read a file so put the content into that file
1500        descr=req.get('experimentdescription', None)
1501        if descr:
1502            file_content=descr.get('ns2description', None)
1503            if file_content:
1504                try:
1505                    f = open(tclfile, 'w')
1506                    f.write(file_content)
1507                    f.close()
1508                except IOError:
1509                    raise service_error(service_error.internal,
1510                            "Cannot write temp experiment description")
1511            else:
1512                raise service_error(service_error.req, 
1513                        "Only ns2descriptions supported")
1514        else:
1515            raise service_error(service_error.req, "No experiment description")
1516
1517        if req.has_key('experimentID') and \
1518                req['experimentID'].has_key('localname'):
1519            eid = req['experimentID']['localname']
1520            self.state_lock.acquire()
1521            while (self.state.has_key(eid)):
1522                eid += random.choice(string.ascii_letters)
1523            # To avoid another thread picking this localname
1524            self.state[eid] = "placeholder"
1525            self.state_lock.release()
1526        else:
1527            eid = self.exp_stem
1528            for i in range(0,5):
1529                eid += random.choice(string.ascii_letters)
1530            self.state_lock.acquire()
1531            while (self.state.has_key(eid)):
1532                eid = self.exp_stem
1533                for i in range(0,5):
1534                    eid += random.choice(string.ascii_letters)
1535            # To avoid another thread picking this localname
1536            self.state[eid] = "placeholder"
1537            self.state_lock.release()
1538
1539        try:
1540            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1541        except ValueError:
1542            raise service_error(service_error.server_config, 
1543                    "Bad key type (%s)" % self.ssh_type)
1544
1545        user = req.get('user', None)
1546        if user == None:
1547            raise service_error(service_error.req, "No user")
1548
1549        master = req.get('master', None)
1550        if not master:
1551            raise service_error(service_error.req, "No master testbed label")
1552        export_project = req.get('exportProject', None)
1553        if not export_project:
1554            raise service_error(service_error.req, "No export project")
1555       
1556        if self.splitter_url:
1557            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1558            split_data = self.remote_splitter(self.splitter_url, file_content,
1559                    master)
1560        else:
1561            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1562                str(self.muxmax), '-m', master]
1563
1564            if self.fedkit:
1565                tclcmd.append('-k')
1566
1567            tclcmd.extend([pid, gid, eid, tclfile])
1568
1569            self.log.debug("running local splitter %s", " ".join(tclcmd))
1570            tclparser = Popen(tclcmd, stdout=PIPE)
1571            split_data = tclparser.stdout
1572
1573        allocated = { }     # Testbeds we can access
1574        started = { }       # Testbeds where a sub-experiment started
1575                            # successfully
1576
1577        # Objects to parse the splitter output (defined above)
1578        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1579        parse_allbeds = self.allbeds(self.get_access)
1580        parse_gateways = self.gateways(eid, master, tmpdir,
1581                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1582        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1583                    "^#\s+End\s+Vtopo")
1584        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1585                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1586        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1587                "^#\s+End\s+tarfiles")
1588        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1589                "^#\s+End\s+rpms")
1590
1591        try: 
1592            # Working on the split data
1593            for line in split_data:
1594                line = line.rstrip()
1595                if parse_current_testbed(line, master, allocated, tbparams):
1596                    continue
1597                elif parse_allbeds(line, user, tbparams, master, export_project,
1598                        access_user):
1599                    continue
1600                elif parse_gateways(line, allocated, tbparams):
1601                    continue
1602                elif parse_vtopo(line):
1603                    continue
1604                elif parse_hostnames(line):
1605                    continue
1606                elif parse_tarfiles(line):
1607                    continue
1608                elif parse_rpms(line):
1609                    continue
1610                else:
1611                    raise service_error(service_error.internal, 
1612                            "Bad tcl parse? %s" % line)
1613        except service_error, e:
1614            # If something goes wrong in the parse (usually an access error)
1615            # clear the placeholder state
1616            self.state_lock.acquire()
1617            del self.state[eid]
1618            self.state_lock.release()
1619            raise e
1620       
1621        # Virtual topology and visualization
1622        vtopo = self.gentopo(parse_vtopo.str)
1623        if not vtopo:
1624            raise service_error(service_error.internal, 
1625                    "Failed to generate virtual topology")
1626
1627        vis = self.genviz(vtopo)
1628        if not vis:
1629            raise service_error(service_error.internal, 
1630                    "Failed to generate visualization")
1631
1632        # save federant information
1633        for k in allocated.keys():
1634            tbparams[k]['federant'] = {\
1635                    'name': [ { 'localname' : eid} ],\
1636                    'emulab': tbparams[k]['emulab'],\
1637                    'allocID' : tbparams[k]['allocID'],\
1638                    'master' : k == master,\
1639                }
1640
1641
1642        # Copy tarfiles and rpms needed at remote sites into a staging area
1643        try:
1644            if self.fedkit:
1645                parse_tarfiles.list.append(self.fedkit)
1646            for t in parse_tarfiles.list:
1647                if not os.path.exists("%s/tarfiles" % tmpdir):
1648                    os.mkdir("%s/tarfiles" % tmpdir)
1649                self.copy_file(t, "%s/tarfiles/%s" % \
1650                        (tmpdir, os.path.basename(t)))
1651            for r in parse_rpms.list:
1652                if not os.path.exists("%s/rpms" % tmpdir):
1653                    os.mkdir("%s/rpms" % tmpdir)
1654                self.copy_file(r, "%s/rpms/%s" % \
1655                        (tmpdir, os.path.basename(r)))
1656        except IOError, e:
1657            raise service_error(service_error.internal, 
1658                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1659
1660        thread_pool_info = self.thread_pool()
1661        threads = [ ]
1662
1663        for tb in [ k for k in allocated.keys() if k != master]:
1664            # Wait until we have a free slot to start the next testbed load
1665            thread_pool_info.acquire()
1666            while thread_pool_info.started - \
1667                    thread_pool_info.terminated >= self.nthreads:
1668                thread_pool_info.wait()
1669            thread_pool_info.release()
1670
1671            # Create and start a thread to start the segment, and save it to
1672            # get the return value later
1673            t  = self.pooled_thread(target=self.start_segment, 
1674                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1675                    pdata=thread_pool_info, trace_file=self.trace_file)
1676            threads.append(t)
1677            t.start()
1678
1679        # Wait until all finish (the first clause of the while is to make sure
1680        # one starts)
1681        thread_pool_info.acquire()
1682        while thread_pool_info.started == 0 or \
1683                thread_pool_info.started > thread_pool_info.terminated:
1684            thread_pool_info.wait()
1685        thread_pool_info.release()
1686
1687        # If none failed, start the master
1688        failed = [ t.getName() for t in threads if not t.rv ]
1689
1690        if len(failed) == 0:
1691            if not self.start_segment(master, eid, tbparams, tmpdir):
1692                failed.append(master)
1693
1694        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1695        # If one failed clean up, unless fail_soft is set
1696        if failed:
1697            if not fail_soft:
1698                for tb in succeeded:
1699                    self.stop_segment(tb, eid, tbparams)
1700                # Remove the placeholder
1701                self.state_lock.acquire()
1702                del self.state[eid]
1703                self.state_lock.release()
1704
1705                raise service_error(service_error.federant,
1706                    "Swap in failed on %s" % ",".join(failed))
1707        else:
1708            self.log.info("[start_segment]: Experiment %s started" % eid)
1709
1710        # Generate an ID for the experiment (slice) and a certificate that the
1711        # allocator can use to prove they own it.  We'll ship it back through
1712        # the encrypted connection.
1713        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1714
1715        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1716
1717        # Walk up tmpdir, deleting as we go
1718        for path, dirs, files in os.walk(tmpdir, topdown=False):
1719            for f in files:
1720                os.remove(os.path.join(path, f))
1721            for d in dirs:
1722                os.rmdir(os.path.join(path, d))
1723        os.rmdir(tmpdir)
1724
1725        # The deepcopy prevents the allocation ID and other binaries from being
1726        # translated into other formats
1727        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1728                for tb in tbparams.keys() \
1729                    if tbparams[tb].has_key('federant') ],\
1730                    'vtopo': vtopo,\
1731                    'vis' : vis,
1732                    'experimentID' : [\
1733                            { 'fedid': copy.copy(expid) }, \
1734                            { 'localname': eid },\
1735                        ],\
1736                    'experimentAccess': { 'X509' : expcert },\
1737                }
1738
1739        # Insert the experiment into our state and update the disk copy
1740        self.state_lock.acquire()
1741        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1742                for tb in tbparams.keys() \
1743                    if tbparams[tb].has_key('federant') ],\
1744                    'vtopo': vtopo,\
1745                    'vis' : vis,
1746                    'owner': fid,
1747                    'experimentID' : [\
1748                            { 'fedid': expid }, { 'localname': eid },\
1749                        ],\
1750                }
1751        self.state[eid] = self.state[expid]
1752        if self.state_filename: self.write_state()
1753        self.state_lock.release()
1754
1755        self.auth.set_attribute(fid, expid)
1756        self.auth.set_attribute(expid, expid)
1757
1758        if not failed:
1759            return resp
1760        else:
1761            raise service_error(service_error.partial, \
1762                    "Partial swap in on %s" % ",".join(succeeded))
1763
1764    def check_experiment_access(self, fid, key):
1765        """
1766        Confirm that the fid has access to the experiment.  Though a request
1767        may be made in terms of a local name, the access attribute is always
1768        the experiment's fedid.
1769        """
1770        if not isinstance(key, fedid):
1771            self.state_lock.acquire()
1772            if self.state.has_key(key):
1773                try:
1774                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1775                            if f.has_key('fedid') ]
1776                except KeyError:
1777                    self.state_lock.release()
1778                    raise service_error(service_error.internal, 
1779                            "No fedid for experiment %s when checking " +\
1780                                    "access(!?)" % key)
1781                if len(kl) == 1:
1782                    key = kl[0]
1783                else:
1784                    self.state_lock.release()
1785                    raise service_error(service_error.internal, 
1786                            "multiple fedids for experiment %s when " +\
1787                                    "checking access(!?)" % key)
1788            else:
1789                self.state_lock.release()
1790                raise service_error(service_error.access, "Access Denied")
1791            self.state_lock.release()
1792
1793        if self.auth.check_attribute(fid, key):
1794            return True
1795        else:
1796            raise service_error(service_error.access, "Access Denied")
1797
1798
1799
1800    def get_vtopo(self, req, fid):
1801        """
1802        Return the stored virtual topology for this experiment
1803        """
1804        rv = None
1805
1806        req = req.get('VtopoRequestBody', None)
1807        if not req:
1808            raise service_error(service_error.req,
1809                    "Bad request format (no VtopoRequestBody)")
1810        exp = req.get('experiment', None)
1811        if exp:
1812            if exp.has_key('fedid'):
1813                key = exp['fedid']
1814                keytype = "fedid"
1815            elif exp.has_key('localname'):
1816                key = exp['localname']
1817                keytype = "localname"
1818            else:
1819                raise service_error(service_error.req, "Unknown lookup type")
1820        else:
1821            raise service_error(service_error.req, "No request?")
1822
1823        self.check_experiment_access(fid, key)
1824
1825        self.state_lock.acquire()
1826        if self.state.has_key(key):
1827            rv = { 'experiment' : {keytype: key },\
1828                    'vtopo': self.state[key]['vtopo'],\
1829                }
1830        self.state_lock.release()
1831
1832        if rv: return rv
1833        else: raise service_error(service_error.req, "No such experiment")
1834
1835    def get_vis(self, req, fid):
1836        """
1837        Return the stored visualization for this experiment
1838        """
1839        rv = None
1840
1841        req = req.get('VisRequestBody', None)
1842        if not req:
1843            raise service_error(service_error.req,
1844                    "Bad request format (no VisRequestBody)")
1845        exp = req.get('experiment', None)
1846        if exp:
1847            if exp.has_key('fedid'):
1848                key = exp['fedid']
1849                keytype = "fedid"
1850            elif exp.has_key('localname'):
1851                key = exp['localname']
1852                keytype = "localname"
1853            else:
1854                raise service_error(service_error.req, "Unknown lookup type")
1855        else:
1856            raise service_error(service_error.req, "No request?")
1857
1858        self.check_experiment_access(fid, key)
1859
1860        self.state_lock.acquire()
1861        if self.state.has_key(key):
1862            rv =  { 'experiment' : {keytype: key },\
1863                    'vis': self.state[key]['vis'],\
1864                    }
1865        self.state_lock.release()
1866
1867        if rv: return rv
1868        else: raise service_error(service_error.req, "No such experiment")
1869
1870    def get_info(self, req, fid):
1871        """
1872        Return all the stored info about this experiment
1873        """
1874        rv = None
1875
1876        req = req.get('InfoRequestBody', None)
1877        if not req:
1878            raise service_error(service_error.req,
1879                    "Bad request format (no VisRequestBody)")
1880        exp = req.get('experiment', None)
1881        if exp:
1882            if exp.has_key('fedid'):
1883                key = exp['fedid']
1884                keytype = "fedid"
1885            elif exp.has_key('localname'):
1886                key = exp['localname']
1887                keytype = "localname"
1888            else:
1889                raise service_error(service_error.req, "Unknown lookup type")
1890        else:
1891            raise service_error(service_error.req, "No request?")
1892
1893        self.check_experiment_access(fid, key)
1894
1895        # The state may be massaged by the service function that called
1896        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1897        # state.
1898        self.state_lock.acquire()
1899        if self.state.has_key(key):
1900            rv = copy.deepcopy(self.state[key])
1901        self.state_lock.release()
1902
1903        if rv: return rv
1904        else: raise service_error(service_error.req, "No such experiment")
1905
1906
1907    def terminate_experiment(self, req, fid):
1908        """
1909        Swap this experiment out on the federants and delete the shared
1910        information
1911        """
1912        tbparams = { }
1913        req = req.get('TerminateRequestBody', None)
1914        if not req:
1915            raise service_error(service_error.req,
1916                    "Bad request format (no TerminateRequestBody)")
1917        exp = req.get('experiment', None)
1918        if exp:
1919            if exp.has_key('fedid'):
1920                key = exp['fedid']
1921                keytype = "fedid"
1922            elif exp.has_key('localname'):
1923                key = exp['localname']
1924                keytype = "localname"
1925            else:
1926                raise service_error(service_error.req, "Unknown lookup type")
1927        else:
1928            raise service_error(service_error.req, "No request?")
1929
1930        self.check_experiment_access(fid, key)
1931
1932        self.state_lock.acquire()
1933        fed_exp = self.state.get(key, None)
1934
1935        if fed_exp:
1936            # This branch of the conditional holds the lock to generate a
1937            # consistent temporary tbparams variable to deallocate experiments.
1938            # It releases the lock to do the deallocations and reacquires it to
1939            # remove the experiment state when the termination is complete.
1940            ids = []
1941            #  experimentID is a list of dicts that are self-describing
1942            #  identifiers.  This finds all the fedids and localnames - the
1943            #  keys of self.state - and puts them into ids.
1944            for id in fed_exp.get('experimentID', []):
1945                if id.has_key('fedid'): ids.append(id['fedid'])
1946                if id.has_key('localname'): ids.append(id['localname'])
1947
1948            # Construct enough of the tbparams to make the stop_segment calls
1949            # work
1950            for fed in fed_exp['federant']:
1951                try:
1952                    for e in fed['name']:
1953                        eid = e.get('localname', None)
1954                        if eid: break
1955                    else:
1956                        continue
1957
1958                    p = fed['emulab']['project']
1959
1960                    project = p['name']['localname']
1961                    tb = p['testbed']['localname']
1962                    user = p['user'][0]['userID']['localname']
1963
1964                    domain = fed['emulab']['domain']
1965                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1966                    aid = fed['allocID']
1967                except KeyError, e:
1968                    continue
1969                tbparams[tb] = {\
1970                        'user': user,\
1971                        'domain': domain,\
1972                        'project': project,\
1973                        'host': host,\
1974                        'eid': eid,\
1975                        'aid': aid,\
1976                    }
1977            self.state_lock.release()
1978
1979            # Stop everyone.
1980            for tb in tbparams.keys():
1981                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1982
1983            # release the allocations
1984            for tb in tbparams.keys():
1985                self.release_access(tb, tbparams[tb]['aid'])
1986
1987            # Remove the terminated experiment
1988            self.state_lock.acquire()
1989            for id in ids:
1990                if self.state.has_key(id): del self.state[id]
1991
1992            if self.state_filename: self.write_state()
1993            self.state_lock.release()
1994
1995            return { 'experiment': exp }
1996        else:
1997            # Don't forget to release the lock
1998            self.state_lock.release()
1999            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.