source: fedd/federation/experiment_control.py @ 77a7634

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since 77a7634 was 77a7634, checked in by Ted Faber <faber@…>, 15 years ago

local tcl splitter setting

  • Property mode set to 100644
File size: 59.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from util import *
22from fedid import fedid, generate_fedid
23from remote_service import xmlrpc_handler, soap_handler, service_caller
24from service_error import service_error
25
26
27class nullHandler(logging.Handler):
28    def emit(self, record): pass
29
30fl = logging.getLogger("fedd.experiment_control")
31fl.addHandler(nullHandler())
32
33class experiment_control_local:
34    """
35    Control of experiments that this system can directly access.
36
37    Includes experiment creation, termination and information dissemination.
38    Thred safe.
39    """
40   
41    class thread_pool:
42        """
43        A class to keep track of a set of threads all invoked for the same
44        task.  Manages the mutual exclusion of the states.
45        """
46        def __init__(self):
47            """
48            Start a pool.
49            """
50            self.changed = Condition()
51            self.started = 0
52            self.terminated = 0
53
54        def acquire(self):
55            """
56            Get the pool's lock.
57            """
58            self.changed.acquire()
59
60        def release(self):
61            """
62            Release the pool's lock.
63            """
64            self.changed.release()
65
66        def wait(self, timeout = None):
67            """
68            Wait for a pool thread to start or stop.
69            """
70            self.changed.wait(timeout)
71
72        def start(self):
73            """
74            Called by a pool thread to report starting.
75            """
76            self.changed.acquire()
77            self.started += 1
78            self.changed.notifyAll()
79            self.changed.release()
80
81        def terminate(self):
82            """
83            Called by a pool thread to report finishing.
84            """
85            self.changed.acquire()
86            self.terminated += 1
87            self.changed.notifyAll()
88            self.changed.release()
89
90        def clear(self):
91            """
92            Clear all pool data.
93            """
94            self.changed.acquire()
95            self.started = 0
96            self.terminated =0
97            self.changed.notifyAll()
98            self.changed.release()
99
100    class pooled_thread(Thread):
101        """
102        One of a set of threads dedicated to a specific task.  Uses the
103        thread_pool class above for coordination.
104        """
105        def __init__(self, group=None, target=None, name=None, args=(), 
106                kwargs={}, pdata=None, trace_file=None):
107            Thread.__init__(self, group, target, name, args, kwargs)
108            self.rv = None          # Return value of the ops in this thread
109            self.exception = None   # Exception that terminated this thread
110            self.target=target      # Target function to run on start()
111            self.args = args        # Args to pass to target
112            self.kwargs = kwargs    # Additional kw args
113            self.pdata = pdata      # thread_pool for this class
114            # Logger for this thread
115            self.log = logging.getLogger("fedd.experiment_control")
116       
117        def run(self):
118            """
119            Emulate Thread.run, except add pool data manipulation and error
120            logging.
121            """
122            if self.pdata:
123                self.pdata.start()
124
125            if self.target:
126                try:
127                    self.rv = self.target(*self.args, **self.kwargs)
128                except service_error, s:
129                    self.exception = s
130                    self.log.error("Thread exception: %s %s" % \
131                            (s.code_string(), s.desc))
132                except:
133                    self.exception = sys.exc_info()[1]
134                    self.log.error(("Unexpected thread exception: %s" +\
135                            "Trace %s") % (self.exception,\
136                                traceback.format_exc()))
137            if self.pdata:
138                self.pdata.terminate()
139
140    call_RequestAccess = service_caller('RequestAccess')
141    call_ReleaseAccess = service_caller('ReleaseAccess')
142    call_Ns2Split = service_caller('Ns2Split')
143
144    def __init__(self, config=None, auth=None):
145        """
146        Intialize the various attributes, most from the config object
147        """
148        self.thread_with_rv = experiment_control_local.pooled_thread
149        self.thread_pool = experiment_control_local.thread_pool
150
151        self.cert_file = None
152        self.cert_pwd = None
153        self.trusted_certs = None
154
155        # Walk through the various relevant certificat specifying config
156        # attributes until the local certificate attributes can be resolved.
157        # The walk is from most specific to most general specification.
158        for s in ("experiment_control", "globals"):
159            if config.has_section(s): 
160                if config.has_option(s, "cert_file"):
161                    if not self.cert_file:
162                        self.cert_file = config.get(s, "cert_file")
163                        self.cert_pwd = config.get(s, "cert_pwd")
164
165                if config.has_option(s, "trusted_certs"):
166                    if not self.trusted_certs:
167                        self.trusted_certs = config.get(s, "trusted_certs")
168
169
170        self.exp_stem = "fed-stem"
171        self.log = logging.getLogger("fedd.experiment_control")
172        set_log_level(config, "experiment_control", self.log)
173        self.muxmax = 2
174        self.nthreads = 2
175        self.randomize_experiments = False
176
177        self.scp_exec = "/usr/bin/scp"
178        self.splitter = None
179        self.ssh_exec="/usr/bin/ssh"
180        self.ssh_keygen = "/usr/bin/ssh-keygen"
181        self.ssh_identity_file = None
182
183
184        self.debug = config.getboolean("experiment_control", "create_debug")
185        self.state_filename = config.get("experiment_control", 
186                "experiment_state_file")
187        self.splitter_url = config.get("experiment_control", "splitter_url")
188        self.fedkit = config.get("experiment_control", "fedkit")
189        accessdb_file = config.get("experiment_control", "accessdb")
190
191        self.ssh_pubkey_file = config.get("experiment_control", 
192                "ssh_pubkey_file")
193        self.ssh_privkey_file = config.get("experiment_control",
194                "ssh_privkey_file")
195        # NB for internal master/slave ops, not experiment setup
196        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
197        self.state = { }
198        self.state_lock = Lock()
199        self.tclsh = "/usr/local/bin/otclsh"
200        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
201                config.get("experiment_control", "tcl_splitter",
202                        "/usr/testbed/lib/ns2ir/parse.tcl")
203        mapdb_file = config.get("experiment_control", "mapdb")
204        self.trace_file = sys.stderr
205
206        self.def_expstart = \
207                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
208                "/tmp/federate";
209        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
210                "FEDDIR/hosts";
211        self.def_gwstart = \
212                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
213                "/tmp/bridge.log";
214        self.def_mgwstart = \
215                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
216                "/tmp/bridge.log";
217        self.def_gwimage = "FBSD61-TUNNEL2";
218        self.def_gwtype = "pc";
219
220        if auth:
221            self.auth = auth
222        else:
223            self.log.error(\
224                    "[access]: No authorizer initialized, creating local one.")
225            auth = authorizer()
226
227
228        if self.ssh_pubkey_file:
229            try:
230                f = open(self.ssh_pubkey_file, 'r')
231                self.ssh_pubkey = f.read()
232                f.close()
233            except IOError:
234                raise service_error(service_error.internal,
235                        "Cannot read sshpubkey")
236        else:
237            raise service_error(service_error.internal, 
238                    "No SSH public key file?")
239
240        if not self.ssh_privkey_file:
241            raise service_error(service_error.internal, 
242                    "No SSH public key file?")
243
244
245        if mapdb_file:
246            self.read_mapdb(mapdb_file)
247        else:
248            self.log.warn("[experiment_control] No testbed map, using defaults")
249            self.tbmap = { 
250                    'deter':'https://users.isi.deterlab.net:23235',
251                    'emulab':'https://users.isi.deterlab.net:23236',
252                    'ucb':'https://users.isi.deterlab.net:23237',
253                    }
254
255        if accessdb_file:
256                self.read_accessdb(accessdb_file)
257        else:
258            raise service_error(service_error.internal,
259                    "No accessdb specified in config")
260
261        # Grab saved state.  OK to do this w/o locking because it's read only
262        # and only one thread should be in existence that can see self.state at
263        # this point.
264        if self.state_filename:
265            self.read_state()
266
267        # Dispatch tables
268        self.soap_services = {\
269                'Create': soap_handler('Create', self.create_experiment),
270                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
271                'Vis': soap_handler('Vis', self.get_vis),
272                'Info': soap_handler('Info', self.get_info),
273                'Terminate': soap_handler('Terminate', 
274                    self.terminate_experiment),
275        }
276
277        self.xmlrpc_services = {\
278                'Create': xmlrpc_handler('Create', self.create_experiment),
279                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
280                'Vis': xmlrpc_handler('Vis', self.get_vis),
281                'Info': xmlrpc_handler('Info', self.get_info),
282                'Terminate': xmlrpc_handler('Terminate',
283                    self.terminate_experiment),
284        }
285
286    def copy_file(self, src, dest, size=1024):
287        """
288        Exceedingly simple file copy.
289        """
290        s = open(src,'r')
291        d = open(dest, 'w')
292
293        buf = "x"
294        while buf != "":
295            buf = s.read(size)
296            d.write(buf)
297        s.close()
298        d.close()
299
300    # Call while holding self.state_lock
301    def write_state(self):
302        """
303        Write a new copy of experiment state after copying the existing state
304        to a backup.
305
306        State format is a simple pickling of the state dictionary.
307        """
308        if os.access(self.state_filename, os.W_OK):
309            self.copy_file(self.state_filename, \
310                    "%s.bak" % self.state_filename)
311        try:
312            f = open(self.state_filename, 'w')
313            pickle.dump(self.state, f)
314        except IOError, e:
315            self.log.error("Can't write file %s: %s" % \
316                    (self.state_filename, e))
317        except pickle.PicklingError, e:
318            self.log.error("Pickling problem: %s" % e)
319        except TypeError, e:
320            self.log.error("Pickling problem (TypeError): %s" % e)
321
322    # Call while holding self.state_lock
323    def read_state(self):
324        """
325        Read a new copy of experiment state.  Old state is overwritten.
326
327        State format is a simple pickling of the state dictionary.
328        """
329        try:
330            f = open(self.state_filename, "r")
331            self.state = pickle.load(f)
332            self.log.debug("[read_state]: Read state from %s" % \
333                    self.state_filename)
334        except IOError, e:
335            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
336                    % (self.state_filename, e))
337        except pickle.UnpicklingError, e:
338            self.log.warning(("[read_state]: No saved state: " + \
339                    "Unpickling failed: %s") % e)
340       
341        for k in self.state.keys():
342            try:
343                # This list should only have one element in it, but phrasing it
344                # as a for loop doesn't cost much, really.  We have to find the
345                # fedid elements anyway.
346                for eid in [ f['fedid'] \
347                        for f in self.state[k]['experimentID']\
348                            if f.has_key('fedid') ]:
349                    self.auth.set_attribute(self.state[k]['owner'], eid)
350            except KeyError, e:
351                self.log.warning("[read_state]: State ownership or identity " +\
352                        "misformatted in %s: %s" % (self.state_filename, e))
353
354
355    def read_accessdb(self, accessdb_file):
356        """
357        Read the mapping from fedids that can create experiments to their name
358        in the 3-level access namespace.  All will be asserted from this
359        testbed and can include the local username and porject that will be
360        asserted on their behalf by this fedd.  Each fedid is also added to the
361        authorization system with the "create" attribute.
362        """
363        self.accessdb = {}
364        # These are the regexps for parsing the db
365        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
366        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
367                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
368        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
369                "\s*->\s*(" + name_expr + ")\s*$")
370        lineno = 0
371
372        # Parse the mappings and store in self.authdb, a dict of
373        # fedid -> (proj, user)
374        try:
375            f = open(accessdb_file, "r")
376            for line in f:
377                lineno += 1
378                line = line.strip()
379                if len(line) == 0 or line.startswith('#'):
380                    continue
381                m = project_line.match(line)
382                if m:
383                    fid = fedid(hexstr=m.group(1))
384                    project, user = m.group(2,3)
385                    if not self.accessdb.has_key(fid):
386                        self.accessdb[fid] = []
387                    self.accessdb[fid].append((project, user))
388                    continue
389
390                m = user_line.match(line)
391                if m:
392                    fid = fedid(hexstr=m.group(1))
393                    project = None
394                    user = m.group(2)
395                    if not self.accessdb.has_key(fid):
396                        self.accessdb[fid] = []
397                    self.accessdb[fid].append((project, user))
398                    continue
399                self.log.warn("[experiment_control] Error parsing access " +\
400                        "db %s at line %d" %  (accessdb_file, lineno))
401        except IOError:
402            raise service_error(service_error.internal,
403                    "Error opening/reading %s as experiment " +\
404                            "control accessdb" %  accessdb_file)
405        f.close()
406
407        # Initialize the authorization attributes
408        for fid in self.accessdb.keys():
409            self.auth.set_attribute(fid, 'create')
410
411    def read_mapdb(self, file):
412        """
413        Read a simple colon separated list of mappings for the
414        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
415        """
416
417        self.tbmap = { }
418        lineno =0
419        try:
420            f = open(file, "r")
421            for line in f:
422                lineno += 1
423                line = line.strip()
424                if line.startswith('#') or len(line) == 0:
425                    continue
426                try:
427                    label, url = line.split(':', 1)
428                    self.tbmap[label] = url
429                except ValueError, e:
430                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
431                            "map db: %s %s" % (lineno, line, e))
432        except IOError, e:
433            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
434                    "open %s: %s" % (file, e))
435        f.close()
436
437    def scp_file(self, file, user, host, dest=""):
438        """
439        scp a file to the remote host.  If debug is set the action is only
440        logged.
441        """
442
443        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
444                "%s@%s:%s" % (user, host, dest)]
445        rv = 0
446
447        try:
448            dnull = open("/dev/null", "r")
449        except IOError:
450            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
451            dnull = Null
452
453        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
454        if not self.debug:
455            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
456            else: rv = call(scp_cmd)
457
458        return rv == 0
459
460    def ssh_cmd(self, user, host, cmd, wname=None):
461        """
462        Run a remote command on host as user.  If debug is set, the action is
463        only logged.
464        """
465        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
466                user, host, cmd)
467
468        try:
469            dnull = open("/dev/null", "r")
470        except IOError:
471            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
472            dnull = Null
473
474        self.log.debug("[ssh_cmd]: %s" % sh_str)
475        if not self.debug:
476            if dnull:
477                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
478            else:
479                sub = Popen(sh_str, shell=True)
480            return sub.wait() == 0
481        else:
482            return True
483
484    def ship_configs(self, host, user, src_dir, dest_dir):
485        """
486        Copy federant-specific configuration files to the federant.
487        """
488        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
489            return False
490        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
491            return False
492
493        for f in os.listdir(src_dir):
494            if os.path.isdir(f):
495                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
496                        "%s/%s" % (dest_dir, f)):
497                    return False
498            else:
499                if not self.scp_file("%s/%s" % (src_dir, f), 
500                        user, host, dest_dir):
501                    return False
502        return True
503
504    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
505        """
506        Start a sub-experiment on a federant.
507
508        Get the current state, modify or create as appropriate, ship data and
509        configs and start the experiment.  There are small ordering differences
510        based on the initial state of the sub-experiment.
511        """
512        # ops node in the federant
513        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
514        user = tbparams[tb]['user']     # federant user
515        pid = tbparams[tb]['project']   # federant project
516        # XXX
517        base_confs = ( "hosts",)
518        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
519        # command to test experiment state
520        expinfo_exec = "/usr/testbed/bin/expinfo" 
521        # Configuration directories on the remote machine
522        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
523        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
524        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
525        # Regular expressions to parse the expinfo response
526        state_re = re.compile("State:\s+(\w+)")
527        no_exp_re = re.compile("^No\s+such\s+experiment")
528        state = None    # Experiment state parsed from expinfo
529        # The expinfo ssh command
530        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
531
532        # Get status
533        self.log.debug("[start_segment]: %s"% " ".join(cmd))
534        dev_null = None
535        try:
536            dev_null = open("/dev/null", "a")
537        except IOError, e:
538            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
539
540        if self.debug:
541            state = 'swapped'
542            rv = 0
543        else:
544            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
545            for line in status.stdout:
546                m = state_re.match(line)
547                if m: state = m.group(1)
548                else:
549                    m = no_exp_re.match(line)
550                    if m: state = "none"
551            rv = status.wait()
552
553        # If the experiment is not present the subcommand returns a non-zero
554        # return value.  If we successfully parsed a "none" outcome, ignore the
555        # return code.
556        if rv != 0 and state != "none":
557            raise service_error(service_error.internal,
558                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
559
560        self.log.debug("[start_segment]: %s: %s" % (tb, state))
561        self.log.info("[start_segment]:transferring experiment to %s" % tb)
562
563        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
564            return False
565        # Clear the federation files
566        if not self.ssh_cmd(user, host, 
567                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
568            return False
569        if not self.ssh_cmd(user, host, 
570                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
571            return False
572        # Clear and create the tarfiles and rpm directories
573        for d in (tarfiles_dir, rpms_dir):
574            if not self.ssh_cmd(user, host, 
575                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
576                return False
577            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
578                    "create tarfiles"):
579                return False
580       
581        if state == 'active':
582            # Remote experiment is active.  Modify it.
583            for f in base_confs:
584                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
585                        "%s/%s" % (proj_dir, f)):
586                    return False
587            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
588                    proj_dir):
589                return False
590            if os.path.isdir("%s/tarfiles" % tmpdir):
591                if not self.ship_configs(host, user,
592                        "%s/tarfiles" % tmpdir, tarfiles_dir):
593                    return False
594            if os.path.isdir("%s/rpms" % tmpdir):
595                if not self.ship_configs(host, user,
596                        "%s/rpms" % tmpdir, tarfiles_dir):
597                    return False
598            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
599            if not self.ssh_cmd(user, host,
600                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
601                            (pid, eid, tclfile), "modexp"):
602                return False
603            return True
604        elif state == "swapped":
605            # Remote experiment swapped out.  Modify it and swap it in.
606            for f in base_confs:
607                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
608                        "%s/%s" % (proj_dir, f)):
609                    return False
610            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
611                    proj_dir):
612                return False
613            if os.path.isdir("%s/tarfiles" % tmpdir):
614                if not self.ship_configs(host, user,
615                        "%s/tarfiles" % tmpdir, tarfiles_dir):
616                    return False
617            if os.path.isdir("%s/rpms" % tmpdir):
618                if not self.ship_configs(host, user,
619                        "%s/rpms" % tmpdir, tarfiles_dir):
620                    return False
621            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
622            if not self.ssh_cmd(user, host,
623                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
624                    "modexp"):
625                return False
626            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
627            if not self.ssh_cmd(user, host,
628                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
629                    "swapexp"):
630                return False
631            return True
632        elif state == "none":
633            # No remote experiment.  Create one.  We do this in 2 steps so we
634            # can put the configuration files and scripts into the new
635            # experiment directories.
636
637            # Tarfiles must be present for creation to work
638            if os.path.isdir("%s/tarfiles" % tmpdir):
639                if not self.ship_configs(host, user,
640                        "%s/tarfiles" % tmpdir, tarfiles_dir):
641                    return False
642            if os.path.isdir("%s/rpms" % tmpdir):
643                if not self.ship_configs(host, user,
644                        "%s/rpms" % tmpdir, tarfiles_dir):
645                    return False
646            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
647            if not self.ssh_cmd(user, host,
648                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
649                            (pid, eid, tclfile), "startexp"):
650                return False
651            # After startexp the per-experiment directories exist
652            for f in base_confs:
653                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
654                        "%s/%s" % (proj_dir, f)):
655                    return False
656            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
657                    proj_dir):
658                return False
659            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
660            if not self.ssh_cmd(user, host,
661                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
662                    "swapexp"):
663                return False
664            return True
665        else:
666            self.log.debug("[start_segment]:unknown state %s" % state)
667            return False
668
669    def stop_segment(self, tb, eid, tbparams):
670        """
671        Stop a sub experiment by calling swapexp on the federant
672        """
673        user = tbparams[tb]['user']
674        host = tbparams[tb]['host']
675        pid = tbparams[tb]['project']
676
677        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
678        return self.ssh_cmd(user, host,
679                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
680
681       
682    def generate_ssh_keys(self, dest, type="rsa" ):
683        """
684        Generate a set of keys for the gateways to use to talk.
685
686        Keys are of type type and are stored in the required dest file.
687        """
688        valid_types = ("rsa", "dsa")
689        t = type.lower();
690        if t not in valid_types: raise ValueError
691        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
692
693        try:
694            trace = open("/dev/null", "w")
695        except IOError:
696            raise service_error(service_error.internal,
697                    "Cannot open /dev/null??");
698
699        # May raise CalledProcessError
700        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
701        rv = call(cmd, stdout=trace, stderr=trace)
702        if rv != 0:
703            raise service_error(service_error.internal, 
704                    "Cannot generate nonce ssh keys.  %s return code %d" \
705                            % (self.ssh_keygen, rv))
706
707    def gentopo(self, str):
708        """
709        Generate the topology dtat structure from the splitter's XML
710        representation of it.
711
712        The topology XML looks like:
713            <experiment>
714                <nodes>
715                    <node><vname></vname><ips>ip1:ip2</ips></node>
716                </nodes>
717                <lans>
718                    <lan>
719                        <vname></vname><vnode></vnode><ip></ip>
720                        <bandwidth></bandwidth><member>node:port</member>
721                    </lan>
722                </lans>
723        """
724        class topo_parse:
725            """
726            Parse the topology XML and create the dats structure.
727            """
728            def __init__(self):
729                # Typing of the subelements for data conversion
730                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
731                self.int_subelements = ( 'bandwidth',)
732                self.float_subelements = ( 'delay',)
733                # The final data structure
734                self.nodes = [ ]
735                self.lans =  [ ]
736                self.topo = { \
737                        'node': self.nodes,\
738                        'lan' : self.lans,\
739                    }
740                self.element = { }  # Current element being created
741                self.chars = ""     # Last text seen
742
743            def end_element(self, name):
744                # After each sub element the contents is added to the current
745                # element or to the appropriate list.
746                if name == 'node':
747                    self.nodes.append(self.element)
748                    self.element = { }
749                elif name == 'lan':
750                    self.lans.append(self.element)
751                    self.element = { }
752                elif name in self.str_subelements:
753                    self.element[name] = self.chars
754                    self.chars = ""
755                elif name in self.int_subelements:
756                    self.element[name] = int(self.chars)
757                    self.chars = ""
758                elif name in self.float_subelements:
759                    self.element[name] = float(self.chars)
760                    self.chars = ""
761
762            def found_chars(self, data):
763                self.chars += data.rstrip()
764
765
766        tp = topo_parse();
767        parser = xml.parsers.expat.ParserCreate()
768        parser.EndElementHandler = tp.end_element
769        parser.CharacterDataHandler = tp.found_chars
770
771        parser.Parse(str)
772
773        return tp.topo
774       
775
776    def genviz(self, topo):
777        """
778        Generate the visualization the virtual topology
779        """
780
781        neato = "/usr/local/bin/neato"
782        # These are used to parse neato output and to create the visualization
783        # file.
784        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
785        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
786                "%s</type></node>"
787
788        try:
789            # Node names
790            nodes = [ n['vname'] for n in topo['node'] ]
791            topo_lans = topo['lan']
792        except KeyError:
793            raise service_error(service_error.internal, "Bad topology")
794
795        lans = { }
796        links = { }
797
798        # Walk through the virtual topology, organizing the connections into
799        # 2-node connections (links) and more-than-2-node connections (lans).
800        # When a lan is created, it's added to the list of nodes (there's a
801        # node in the visualization for the lan).
802        for l in topo_lans:
803            if links.has_key(l['vname']):
804                if len(links[l['vname']]) < 2:
805                    links[l['vname']].append(l['vnode'])
806                else:
807                    nodes.append(l['vname'])
808                    lans[l['vname']] = links[l['vname']]
809                    del links[l['vname']]
810                    lans[l['vname']].append(l['vnode'])
811            elif lans.has_key(l['vname']):
812                lans[l['vname']].append(l['vnode'])
813            else:
814                links[l['vname']] = [ l['vnode'] ]
815
816
817        # Open up a temporary file for dot to turn into a visualization
818        try:
819            df, dotname = tempfile.mkstemp()
820            dotfile = os.fdopen(df, 'w')
821        except IOError:
822            raise service_error(service_error.internal,
823                    "Failed to open file in genviz")
824
825        # Generate a dot/neato input file from the links, nodes and lans
826        try:
827            print >>dotfile, "graph G {"
828            for n in nodes:
829                print >>dotfile, '\t"%s"' % n
830            for l in links.keys():
831                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
832            for l in lans.keys():
833                for n in lans[l]:
834                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
835            print >>dotfile, "}"
836            dotfile.close()
837        except TypeError:
838            raise service_error(service_error.internal,
839                    "Single endpoint link in vtopo")
840        except IOError:
841            raise service_error(service_error.internal, "Cannot write dot file")
842
843        # Use dot to create a visualization
844        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
845                '-Gpack=true', dotname], stdout=PIPE)
846
847        # Translate dot to vis format
848        vis_nodes = [ ]
849        vis = { 'node': vis_nodes }
850        for line in dot.stdout:
851            m = vis_re.match(line)
852            if m:
853                vn = m.group(1)
854                vis_node = {'name': vn, \
855                        'x': float(m.group(2)),\
856                        'y' : float(m.group(3)),\
857                    }
858                if vn in links.keys() or vn in lans.keys():
859                    vis_node['type'] = 'lan'
860                else:
861                    vis_node['type'] = 'node'
862                vis_nodes.append(vis_node)
863        rv = dot.wait()
864
865        os.remove(dotname)
866        if rv == 0 : return vis
867        else: return None
868
869    def get_access(self, tb, nodes, user, tbparam, master, export_project,
870            access_user):
871        """
872        Get access to testbed through fedd and set the parameters for that tb
873        """
874
875        translate_attr = {
876            'slavenodestartcmd': 'expstart',
877            'slaveconnectorstartcmd': 'gwstart',
878            'masternodestartcmd': 'mexpstart',
879            'masterconnectorstartcmd': 'mgwstart',
880            'connectorimage': 'gwimage',
881            'connectortype': 'gwtype',
882            'tunnelcfg': 'tun',
883            'smbshare': 'smbshare',
884        }
885
886        uri = self.tbmap.get(tb, None)
887        if not uri:
888            raise service_error(serice_error.server_config, 
889                    "Unknown testbed: %s" % tb)
890
891        # currently this lumps all users into one service access group
892        service_keys = [ a for u in user \
893                for a in u.get('access', []) \
894                    if a.has_key('sshPubkey')]
895
896        if len(service_keys) == 0:
897            raise service_error(service_error.req, 
898                    "Must have at least one SSH pubkey for services")
899
900
901        for p, u in access_user:
902            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
903                    "to %s") %  ((p or "None"), u, uri))
904
905            if p:
906                # Request with user and project specified
907                req = {\
908                        'destinationTestbed' : { 'uri' : uri },
909                        'project': { 
910                            'name': {'localname': p},
911                            'user': [ {'userID': { 'localname': u } } ],
912                            },
913                        'user':  user,
914                        'allocID' : { 'localname': 'test' },
915                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
916                        'serviceAccess' : service_keys
917                    }
918            else:
919                # Request with only user specified
920                req = {\
921                        'destinationTestbed' : { 'uri' : uri },
922                        'user':  [ {'userID': { 'localname': u } } ],
923                        'allocID' : { 'localname': 'test' },
924                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
925                        'serviceAccess' : service_keys
926                    }
927
928            if tb == master:
929                # NB, the export_project parameter is a dict that includes
930                # the type
931                req['exportProject'] = export_project
932
933            # node resources if any
934            if nodes != None and len(nodes) > 0:
935                rnodes = [ ]
936                for n in nodes:
937                    rn = { }
938                    image, hw, count = n.split(":")
939                    if image: rn['image'] = [ image ]
940                    if hw: rn['hardware'] = [ hw ]
941                    if count: rn['count'] = int(count)
942                    rnodes.append(rn)
943                req['resources']= { }
944                req['resources']['node'] = rnodes
945
946            try:
947                r = self.call_RequestAccess(uri, req, 
948                        self.cert_file, self.cert_pwd, self.trusted_certs)
949            except service_error, e:
950                if e.code == service_error.access:
951                    self.log.debug("[get_access] Access denied")
952                    r = None
953                    continue
954                else:
955                    raise e
956
957            if r.has_key('RequestAccessResponseBody'):
958                # Through to here we have a valid response, not a fault.
959                # Access denied is a fault, so something better or worse than
960                # access denied has happened.
961                r = r['RequestAccessResponseBody']
962                self.log.debug("[get_access] Access granted")
963                break
964            else:
965                raise service_error(service_error.protocol,
966                        "Bad proxy response")
967       
968        if not r:
969            raise service_error(service_error.access, 
970                    "Access denied by %s (%s)" % (tb, uri))
971
972        e = r['emulab']
973        p = e['project']
974        tbparam[tb] = { 
975                "boss": e['boss'],
976                "host": e['ops'],
977                "domain": e['domain'],
978                "fs": e['fileServer'],
979                "eventserver": e['eventServer'],
980                "project": unpack_id(p['name']),
981                "emulab" : e,
982                "allocID" : r['allocID'],
983                }
984        # Make the testbed name be the label the user applied
985        p['testbed'] = {'localname': tb }
986
987        for u in p['user']:
988            tbparam[tb]['user'] = unpack_id(u['userID'])
989
990        for a in e['fedAttr']:
991            if a['attribute']:
992                key = translate_attr.get(a['attribute'].lower(), None)
993                if key:
994                    tbparam[tb][key]= a['value']
995       
996    def release_access(self, tb, aid):
997        """
998        Release access to testbed through fedd
999        """
1000
1001        uri = self.tbmap.get(tb, None)
1002        if not uri:
1003            raise service_error(serice_error.server_config, 
1004                    "Unknown testbed: %s" % tb)
1005
1006        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
1007                self.cert_file, self.cert_pwd, self.trusted_certs)
1008
1009        # better error coding
1010
1011    def remote_splitter(self, uri, desc, master):
1012
1013        req = {
1014                'description' : { 'ns2description': desc },
1015                'master': master,
1016                'include_fedkit': bool(self.fedkit)
1017            }
1018
1019        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1020                self.trusted_certs)
1021
1022        if r.has_key('Ns2SplitResponseBody'):
1023            r = r['Ns2SplitResponseBody']
1024            if r.has_key('output'):
1025                return r['output'].splitlines()
1026            else:
1027                raise service_error(service_error.protocol, 
1028                        "Bad splitter response (no output)")
1029        else:
1030            raise service_error(service_error.protocol, "Bad splitter response")
1031       
1032    class current_testbed:
1033        """
1034        Object for collecting the current testbed description.  The testbed
1035        description is saved to a file with the local testbed variables
1036        subsittuted line by line.
1037        """
1038        def __init__(self, eid, tmpdir, fedkit):
1039            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1040            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1041            self.current_testbed = None
1042            self.testbed_file = None
1043
1044            self.def_expstart = \
1045                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1046            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1047            self.def_gwstart = \
1048                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1049            self.def_mgwstart = \
1050                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1051            self.def_gwimage = "FBSD61-TUNNEL2";
1052            self.def_gwtype = "pc";
1053
1054            self.eid = eid
1055            self.tmpdir = tmpdir
1056            self.fedkit = fedkit
1057
1058        def __call__(self, line, master, allocated, tbparams):
1059            # Capture testbed topology descriptions
1060            if self.current_testbed == None:
1061                m = self.begin_testbed.match(line)
1062                if m != None:
1063                    self.current_testbed = m.group(1)
1064                    if self.current_testbed == None:
1065                        raise service_error(service_error.req,
1066                                "Bad request format (unnamed testbed)")
1067                    allocated[self.current_testbed] = \
1068                            allocated.get(self.current_testbed,0) + 1
1069                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1070                    if not os.path.exists(tb_dir):
1071                        try:
1072                            os.mkdir(tb_dir)
1073                        except IOError:
1074                            raise service_error(service_error.internal,
1075                                    "Cannot create %s" % tb_dir)
1076                    try:
1077                        self.testbed_file = open("%s/%s.%s.tcl" %
1078                                (tb_dir, self.eid, self.current_testbed), 'w')
1079                    except IOError:
1080                        self.testbed_file = None
1081                    return True
1082                else: return False
1083            else:
1084                m = self.end_testbed.match(line)
1085                if m != None:
1086                    if m.group(1) != self.current_testbed:
1087                        raise service_error(service_error.internal, 
1088                                "Mismatched testbed markers!?")
1089                    if self.testbed_file != None: 
1090                        self.testbed_file.close()
1091                        self.testbed_file = None
1092                    self.current_testbed = None
1093                elif self.testbed_file:
1094                    # Substitute variables and put the line into the local
1095                    # testbed file.
1096                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1097                            self.def_gwtype)
1098                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1099                            self.def_gwimage)
1100                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1101                            self.def_mgwstart)
1102                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1103                            self.def_mexpstart)
1104                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1105                            self.def_gwstart)
1106                    expstart = tbparams[self.current_testbed].get('expstart', 
1107                            self.def_expstart)
1108                    project = tbparams[self.current_testbed].get('project')
1109                    line = re.sub("GWTYPE", gwtype, line)
1110                    line = re.sub("GWIMAGE", gwimage, line)
1111                    if self.current_testbed == master:
1112                        line = re.sub("GWSTART", mgwstart, line)
1113                        line = re.sub("EXPSTART", mexpstart, line)
1114                    else:
1115                        line = re.sub("GWSTART", gwstart, line)
1116                        line = re.sub("EXPSTART", expstart, line)
1117                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1118                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1119                    line = re.sub("EID", self.eid, line)
1120                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1121                            (project, self.eid), line)
1122                    if self.fedkit:
1123                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1124                                line)
1125                    print >>self.testbed_file, line
1126                return True
1127
1128    class allbeds:
1129        """
1130        Process the Allbeds section.  Get access to each federant and save the
1131        parameters in tbparams
1132        """
1133        def __init__(self, get_access):
1134            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1135            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1136            self.in_allbeds = False
1137            self.get_access = get_access
1138
1139        def __call__(self, line, user, tbparams, master, export_project,
1140                access_user):
1141            # Testbed access parameters
1142            if not self.in_allbeds:
1143                if self.begin_allbeds.match(line):
1144                    self.in_allbeds = True
1145                    return True
1146                else:
1147                    return False
1148            else:
1149                if self.end_allbeds.match(line):
1150                    self.in_allbeds = False
1151                else:
1152                    nodes = line.split('|')
1153                    tb = nodes.pop(0)
1154                    self.get_access(tb, nodes, user, tbparams, master,
1155                            export_project, access_user)
1156                return True
1157
1158    class gateways:
1159        def __init__(self, eid, master, tmpdir, gw_pubkey,
1160                gw_secretkey, copy_file, fedkit):
1161            self.begin_gateways = \
1162                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1163            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1164            self.current_gateways = None
1165            self.control_gateway = None
1166            self.active_end = { }
1167
1168            self.eid = eid
1169            self.master = master
1170            self.tmpdir = tmpdir
1171            self.gw_pubkey_base = gw_pubkey
1172            self.gw_secretkey_base = gw_secretkey
1173
1174            self.copy_file = copy_file
1175            self.fedkit = fedkit
1176
1177
1178        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1179                active_end, tbparams, dtb, myname, desthost, type):
1180            """
1181            Produce a gateway configuration file from a gateways line.
1182            """
1183
1184            sproject = tbparams[gw].get('project', 'project')
1185            dproject = tbparams[dtb].get('project', 'project')
1186            sdomain = ".%s.%s%s" % (eid, sproject,
1187                    tbparams[gw].get('domain', ".example.com"))
1188            ddomain = ".%s.%s%s" % (eid, dproject,
1189                    tbparams[dtb].get('domain', ".example.com"))
1190            boss = tbparams[master].get('boss', "boss")
1191            fs = tbparams[master].get('fs', "fs")
1192            event_server = "%s%s" % \
1193                    (tbparams[gw].get('eventserver', "event_server"),
1194                            tbparams[gw].get('domain', "example.com"))
1195            remote_event_server = "%s%s" % \
1196                    (tbparams[dtb].get('eventserver', "event_server"),
1197                            tbparams[dtb].get('domain', "example.com"))
1198            seer_control = "%s%s" % \
1199                    (tbparams[gw].get('control', "control"), sdomain)
1200
1201            if self.fedkit:
1202                remote_script_dir = "/usr/local/federation/bin"
1203                local_script_dir = "/usr/local/federation/bin"
1204            else:
1205                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1206                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1207
1208            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1209            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1210            tunnel_cfg = tbparams[gw].get("tun", "false")
1211
1212            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1213            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1214
1215            # translate to lower case so the `hostname` hack for specifying
1216            # configuration files works.
1217            conf_file = conf_file.lower();
1218            remote_conf_file = remote_conf_file.lower();
1219
1220            if dtb == master:
1221                active = "false"
1222            elif gw == master:
1223                active = "true"
1224            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1225                active = "false"
1226            else:
1227                active_end['%s-%s' % (gw, dtb)] = 1
1228                active = "true"
1229
1230            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1231            print >>gwconfig, "Active: %s" % active
1232            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1233            print >>gwconfig, "BossName: %s" % boss
1234            print >>gwconfig, "FsName: %s" % fs
1235            print >>gwconfig, "EventServerName: %s" % event_server
1236            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1237            print >>gwconfig, "SeerControl: %s" % seer_control
1238            print >>gwconfig, "Type: %s" % type
1239            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1240            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1241                    local_script_dir
1242            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1243            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1244            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1245                    (remote_conf_dir, remote_conf_file)
1246            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1247            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1248            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1249            gwconfig.close()
1250
1251            return active == "true"
1252
1253        def __call__(self, line, allocated, tbparams):
1254            # Process gateways
1255            if not self.current_gateways:
1256                m = self.begin_gateways.match(line)
1257                if m:
1258                    self.current_gateways = m.group(1)
1259                    if allocated.has_key(self.current_gateways):
1260                        # This test should always succeed
1261                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1262                        if not os.path.exists(tb_dir):
1263                            try:
1264                                os.mkdir(tb_dir)
1265                            except IOError:
1266                                raise service_error(service_error.internal,
1267                                        "Cannot create %s" % tb_dir)
1268                    else:
1269                        # XXX
1270                        self.log.error("[gateways]: Ignoring gateways for " + \
1271                                "unknown testbed %s" % self.current_gateways)
1272                        self.current_gateways = None
1273                    return True
1274                else:
1275                    return False
1276            else:
1277                m = self.end_gateways.match(line)
1278                if m :
1279                    if m.group(1) != self.current_gateways:
1280                        raise service_error(service_error.internal,
1281                                "Mismatched gateway markers!?")
1282                    if self.control_gateway:
1283                        try:
1284                            cc = open("%s/%s/client.conf" %
1285                                    (self.tmpdir, self.current_gateways), 'w')
1286                            print >>cc, "ControlGateway: %s" % \
1287                                    self.control_gateway
1288                            if tbparams[self.master].has_key('smbshare'):
1289                                print >>cc, "SMBSHare: %s" % \
1290                                        tbparams[self.master]['smbshare']
1291                            print >>cc, "ProjectUser: %s" % \
1292                                    tbparams[self.master]['user']
1293                            print >>cc, "ProjectName: %s" % \
1294                                    tbparams[self.master]['project']
1295                            cc.close()
1296                        except IOError:
1297                            raise service_error(service_error.internal,
1298                                    "Error creating client config")
1299                        try:
1300                            cc = open("%s/%s/seer.conf" %
1301                                    (self.tmpdir, self.current_gateways),
1302                                    'w')
1303                            if self.current_gateways != self.master:
1304                                print >>cc, "ControlNode: %s" % \
1305                                        self.control_gateway
1306                            print >>cc, "ExperimentID: %s/%s" % \
1307                                    ( tbparams[self.master]['project'], \
1308                                    self.eid )
1309                            cc.close()
1310                        except IOError:
1311                            raise service_error(service_error.internal,
1312                                    "Error creating seer config")
1313                    else:
1314                        debug.error("[gateways]: No control gateway for %s" %\
1315                                    self.current_gateways)
1316                    self.current_gateways = None
1317                else:
1318                    dtb, myname, desthost, type = line.split(" ")
1319
1320                    if type == "control" or type == "both":
1321                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1322                                self.eid, 
1323                                tbparams[self.current_gateways]['project'],
1324                                tbparams[self.current_gateways]['domain'])
1325                    try:
1326                        active = self.gateway_conf_file(self.current_gateways,
1327                                self.master, self.eid, self.gw_pubkey_base,
1328                                self.gw_secretkey_base,
1329                                self.active_end, tbparams, dtb, myname,
1330                                desthost, type)
1331                    except IOError, e:
1332                        raise service_error(service_error.internal,
1333                                "Failed to write config file for %s" % \
1334                                        self.current_gateway)
1335           
1336                    gw_pubkey = "%s/keys/%s" % \
1337                            (self.tmpdir, self.gw_pubkey_base)
1338                    gw_secretkey = "%s/keys/%s" % \
1339                            (self.tmpdir, self.gw_secretkey_base)
1340
1341                    pkfile = "%s/%s/%s" % \
1342                            ( self.tmpdir, self.current_gateways, 
1343                                    self.gw_pubkey_base)
1344                    skfile = "%s/%s/%s" % \
1345                            ( self.tmpdir, self.current_gateways, 
1346                                    self.gw_secretkey_base)
1347
1348                    if not os.path.exists(pkfile):
1349                        try:
1350                            self.copy_file(gw_pubkey, pkfile)
1351                        except IOError:
1352                            service_error(service_error.internal,
1353                                    "Failed to copy pubkey file")
1354
1355                    if active and not os.path.exists(skfile):
1356                        try:
1357                            self.copy_file(gw_secretkey, skfile)
1358                        except IOError:
1359                            service_error(service_error.internal,
1360                                    "Failed to copy secretkey file")
1361                return True
1362
1363    class shunt_to_file:
1364        """
1365        Simple class to write data between two regexps to a file.
1366        """
1367        def __init__(self, begin, end, filename):
1368            """
1369            Begin shunting on a match of begin, stop on end, send data to
1370            filename.
1371            """
1372            self.begin = re.compile(begin)
1373            self.end = re.compile(end)
1374            self.in_shunt = False
1375            self.file = None
1376            self.filename = filename
1377
1378        def __call__(self, line):
1379            """
1380            Call this on each line in the input that may be shunted.
1381            """
1382            if not self.in_shunt:
1383                if self.begin.match(line):
1384                    self.in_shunt = True
1385                    try:
1386                        self.file = open(self.filename, "w")
1387                    except:
1388                        self.file = None
1389                        raise
1390                    return True
1391                else:
1392                    return False
1393            else:
1394                if self.end.match(line):
1395                    if self.file: 
1396                        self.file.close()
1397                        self.file = None
1398                    self.in_shunt = False
1399                else:
1400                    if self.file:
1401                        print >>self.file, line
1402                return True
1403
1404    class shunt_to_list:
1405        """
1406        Same interface as shunt_to_file.  Data collected in self.list, one list
1407        element per line.
1408        """
1409        def __init__(self, begin, end):
1410            self.begin = re.compile(begin)
1411            self.end = re.compile(end)
1412            self.in_shunt = False
1413            self.list = [ ]
1414       
1415        def __call__(self, line):
1416            if not self.in_shunt:
1417                if self.begin.match(line):
1418                    self.in_shunt = True
1419                    return True
1420                else:
1421                    return False
1422            else:
1423                if self.end.match(line):
1424                    self.in_shunt = False
1425                else:
1426                    self.list.append(line)
1427                return True
1428
1429    class shunt_to_string:
1430        """
1431        Same interface as shunt_to_file.  Data collected in self.str, all in
1432        one string.
1433        """
1434        def __init__(self, begin, end):
1435            self.begin = re.compile(begin)
1436            self.end = re.compile(end)
1437            self.in_shunt = False
1438            self.str = ""
1439       
1440        def __call__(self, line):
1441            if not self.in_shunt:
1442                if self.begin.match(line):
1443                    self.in_shunt = True
1444                    return True
1445                else:
1446                    return False
1447            else:
1448                if self.end.match(line):
1449                    self.in_shunt = False
1450                else:
1451                    self.str += line
1452                return True
1453
1454    def create_experiment(self, req, fid):
1455        """
1456        The external interface to experiment creation called from the
1457        dispatcher.
1458
1459        Creates a working directory, splits the incoming description using the
1460        splitter script and parses out the avrious subsections using the
1461        lcasses above.  Once each sub-experiment is created, use pooled threads
1462        to instantiate them and start it all up.
1463        """
1464
1465        if not self.auth.check_attribute(fid, 'create'):
1466            raise service_error(service_error.access, "Create access denied")
1467
1468        try:
1469            tmpdir = tempfile.mkdtemp(prefix="split-")
1470        except IOError:
1471            raise service_error(service_error.internal, "Cannot create tmp dir")
1472
1473        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1474        gw_secretkey_base = "fed.%s" % self.ssh_type
1475        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1476        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1477        tclfile = tmpdir + "/experiment.tcl"
1478        tbparams = { }
1479        try:
1480            access_user = self.accessdb[fid]
1481        except KeyError:
1482            raise service_error(service_error.internal,
1483                    "Access map and authorizer out of sync in " + \
1484                            "create_experiment for fedid %s"  % fid)
1485
1486        pid = "dummy"
1487        gid = "dummy"
1488        # XXX
1489        fail_soft = False
1490
1491        try:
1492            os.mkdir(tmpdir+"/keys")
1493        except OSError:
1494            raise service_error(service_error.internal,
1495                    "Can't make temporary dir")
1496
1497        req = req.get('CreateRequestBody', None)
1498        if not req:
1499            raise service_error(service_error.req,
1500                    "Bad request format (no CreateRequestBody)")
1501        # The tcl parser needs to read a file so put the content into that file
1502        descr=req.get('experimentdescription', None)
1503        if descr:
1504            file_content=descr.get('ns2description', None)
1505            if file_content:
1506                try:
1507                    f = open(tclfile, 'w')
1508                    f.write(file_content)
1509                    f.close()
1510                except IOError:
1511                    raise service_error(service_error.internal,
1512                            "Cannot write temp experiment description")
1513            else:
1514                raise service_error(service_error.req, 
1515                        "Only ns2descriptions supported")
1516        else:
1517            raise service_error(service_error.req, "No experiment description")
1518
1519        if req.has_key('experimentID') and \
1520                req['experimentID'].has_key('localname'):
1521            eid = req['experimentID']['localname']
1522            self.state_lock.acquire()
1523            while (self.state.has_key(eid)):
1524                eid += random.choice(string.ascii_letters)
1525            # To avoid another thread picking this localname
1526            self.state[eid] = "placeholder"
1527            self.state_lock.release()
1528        else:
1529            eid = self.exp_stem
1530            for i in range(0,5):
1531                eid += random.choice(string.ascii_letters)
1532            self.state_lock.acquire()
1533            while (self.state.has_key(eid)):
1534                eid = self.exp_stem
1535                for i in range(0,5):
1536                    eid += random.choice(string.ascii_letters)
1537            # To avoid another thread picking this localname
1538            self.state[eid] = "placeholder"
1539            self.state_lock.release()
1540
1541        try: 
1542            # This catches exceptions to clear the placeholder if necessary
1543            try:
1544                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1545            except ValueError:
1546                raise service_error(service_error.server_config, 
1547                        "Bad key type (%s)" % self.ssh_type)
1548
1549            user = req.get('user', None)
1550            if user == None:
1551                raise service_error(service_error.req, "No user")
1552
1553            master = req.get('master', None)
1554            if not master:
1555                raise service_error(service_error.req,
1556                        "No master testbed label")
1557            export_project = req.get('exportProject', None)
1558            if not export_project:
1559                raise service_error(service_error.req, "No export project")
1560           
1561            if self.splitter_url:
1562                self.log.debug("Calling remote splitter at %s" % \
1563                        self.splitter_url)
1564                split_data = self.remote_splitter(self.splitter_url,
1565                        file_content, master)
1566            else:
1567                tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1568                    str(self.muxmax), '-m', master]
1569
1570                if self.fedkit:
1571                    tclcmd.append('-k')
1572
1573                tclcmd.extend([pid, gid, eid, tclfile])
1574
1575                self.log.debug("running local splitter %s", " ".join(tclcmd))
1576                tclparser = Popen(tclcmd, stdout=PIPE)
1577                split_data = tclparser.stdout
1578
1579            allocated = { }         # Testbeds we can access
1580            started = { }           # Testbeds where a sub-experiment started
1581                                # successfully
1582
1583            # Objects to parse the splitter output (defined above)
1584            parse_current_testbed = self.current_testbed(eid, tmpdir,
1585                    self.fedkit)
1586            parse_allbeds = self.allbeds(self.get_access)
1587            parse_gateways = self.gateways(eid, master, tmpdir,
1588                    gw_pubkey_base, gw_secretkey_base, self.copy_file,
1589                    self.fedkit)
1590            parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1591                        "^#\s+End\s+Vtopo")
1592            parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1593                        "^#\s+End\s+hostnames", tmpdir + "/hosts")
1594            parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1595                    "^#\s+End\s+tarfiles")
1596            parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1597                    "^#\s+End\s+rpms")
1598
1599            # Working on the split data
1600            for line in split_data:
1601                line = line.rstrip()
1602                if parse_current_testbed(line, master, allocated, tbparams):
1603                    continue
1604                elif parse_allbeds(line, user, tbparams, master, export_project,
1605                        access_user):
1606                    continue
1607                elif parse_gateways(line, allocated, tbparams):
1608                    continue
1609                elif parse_vtopo(line):
1610                    continue
1611                elif parse_hostnames(line):
1612                    continue
1613                elif parse_tarfiles(line):
1614                    continue
1615                elif parse_rpms(line):
1616                    continue
1617                else:
1618                    raise service_error(service_error.internal, 
1619                            "Bad tcl parse? %s" % line)
1620            # Virtual topology and visualization
1621            vtopo = self.gentopo(parse_vtopo.str)
1622            if not vtopo:
1623                raise service_error(service_error.internal, 
1624                        "Failed to generate virtual topology")
1625
1626            vis = self.genviz(vtopo)
1627            if not vis:
1628                raise service_error(service_error.internal, 
1629                        "Failed to generate visualization")
1630           
1631            # save federant information
1632            for k in allocated.keys():
1633                tbparams[k]['federant'] = {\
1634                        'name': [ { 'localname' : eid} ],\
1635                        'emulab': tbparams[k]['emulab'],\
1636                        'allocID' : tbparams[k]['allocID'],\
1637                        'master' : k == master,\
1638                    }
1639
1640
1641            # Copy tarfiles and rpms needed at remote sites into a staging area
1642            try:
1643                if self.fedkit:
1644                    parse_tarfiles.list.append(self.fedkit)
1645                for t in parse_tarfiles.list:
1646                    if not os.path.exists("%s/tarfiles" % tmpdir):
1647                        os.mkdir("%s/tarfiles" % tmpdir)
1648                    self.copy_file(t, "%s/tarfiles/%s" % \
1649                            (tmpdir, os.path.basename(t)))
1650                for r in parse_rpms.list:
1651                    if not os.path.exists("%s/rpms" % tmpdir):
1652                        os.mkdir("%s/rpms" % tmpdir)
1653                    self.copy_file(r, "%s/rpms/%s" % \
1654                            (tmpdir, os.path.basename(r)))
1655            except IOError, e:
1656                raise service_error(service_error.internal, 
1657                        "Cannot stage tarfile/rpm: %s" % e.strerror)
1658
1659        except service_error, e:
1660            # If something goes wrong in the parse (usually an access error)
1661            # clear the placeholder state.  From here on out the code delays
1662            # exceptions.
1663            self.state_lock.acquire()
1664            del self.state[eid]
1665            self.state_lock.release()
1666            raise e
1667
1668        thread_pool_info = self.thread_pool()
1669        threads = [ ]
1670
1671        for tb in [ k for k in allocated.keys() if k != master]:
1672            # Wait until we have a free slot to start the next testbed load
1673            thread_pool_info.acquire()
1674            while thread_pool_info.started - \
1675                    thread_pool_info.terminated >= self.nthreads:
1676                thread_pool_info.wait()
1677            thread_pool_info.release()
1678
1679            # Create and start a thread to start the segment, and save it to
1680            # get the return value later
1681            t  = self.pooled_thread(target=self.start_segment, 
1682                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1683                    pdata=thread_pool_info, trace_file=self.trace_file)
1684            threads.append(t)
1685            t.start()
1686
1687        # Wait until all finish (the first clause of the while is to make sure
1688        # one starts)
1689        thread_pool_info.acquire()
1690        while thread_pool_info.started == 0 or \
1691                thread_pool_info.started > thread_pool_info.terminated:
1692            thread_pool_info.wait()
1693        thread_pool_info.release()
1694
1695        # If none failed, start the master
1696        failed = [ t.getName() for t in threads if not t.rv ]
1697
1698        if len(failed) == 0:
1699            if not self.start_segment(master, eid, tbparams, tmpdir):
1700                failed.append(master)
1701
1702        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1703        # If one failed clean up, unless fail_soft is set
1704        if failed:
1705            if not fail_soft:
1706                for tb in succeeded:
1707                    self.stop_segment(tb, eid, tbparams)
1708                # Remove the placeholder
1709                self.state_lock.acquire()
1710                del self.state[eid]
1711                self.state_lock.release()
1712
1713                raise service_error(service_error.federant,
1714                    "Swap in failed on %s" % ",".join(failed))
1715        else:
1716            self.log.info("[start_segment]: Experiment %s started" % eid)
1717
1718        # Generate an ID for the experiment (slice) and a certificate that the
1719        # allocator can use to prove they own it.  We'll ship it back through
1720        # the encrypted connection.
1721        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1722
1723        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1724
1725        # Walk up tmpdir, deleting as we go
1726        for path, dirs, files in os.walk(tmpdir, topdown=False):
1727            for f in files:
1728                os.remove(os.path.join(path, f))
1729            for d in dirs:
1730                os.rmdir(os.path.join(path, d))
1731        os.rmdir(tmpdir)
1732
1733        # The deepcopy prevents the allocation ID and other binaries from being
1734        # translated into other formats
1735        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1736                for tb in tbparams.keys() \
1737                    if tbparams[tb].has_key('federant') ],\
1738                    'vtopo': vtopo,\
1739                    'vis' : vis,
1740                    'experimentID' : [\
1741                            { 'fedid': copy.copy(expid) }, \
1742                            { 'localname': eid },\
1743                        ],\
1744                    'experimentAccess': { 'X509' : expcert },\
1745                }
1746
1747        # Insert the experiment into our state and update the disk copy
1748        self.state_lock.acquire()
1749        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1750                for tb in tbparams.keys() \
1751                    if tbparams[tb].has_key('federant') ],\
1752                    'vtopo': vtopo,\
1753                    'vis' : vis,
1754                    'owner': fid,
1755                    'experimentID' : [\
1756                            { 'fedid': expid }, { 'localname': eid },\
1757                        ],\
1758                }
1759        self.state[eid] = self.state[expid]
1760        if self.state_filename: self.write_state()
1761        self.state_lock.release()
1762
1763        self.auth.set_attribute(fid, expid)
1764        self.auth.set_attribute(expid, expid)
1765
1766        if not failed:
1767            return resp
1768        else:
1769            raise service_error(service_error.partial, \
1770                    "Partial swap in on %s" % ",".join(succeeded))
1771
1772    def check_experiment_access(self, fid, key):
1773        """
1774        Confirm that the fid has access to the experiment.  Though a request
1775        may be made in terms of a local name, the access attribute is always
1776        the experiment's fedid.
1777        """
1778        if not isinstance(key, fedid):
1779            self.state_lock.acquire()
1780            if self.state.has_key(key):
1781                try:
1782                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1783                            if f.has_key('fedid') ]
1784                except KeyError:
1785                    self.state_lock.release()
1786                    raise service_error(service_error.internal, 
1787                            "No fedid for experiment %s when checking " +\
1788                                    "access(!?)" % key)
1789                if len(kl) == 1:
1790                    key = kl[0]
1791                else:
1792                    self.state_lock.release()
1793                    raise service_error(service_error.internal, 
1794                            "multiple fedids for experiment %s when " +\
1795                                    "checking access(!?)" % key)
1796            else:
1797                self.state_lock.release()
1798                raise service_error(service_error.access, "Access Denied")
1799            self.state_lock.release()
1800
1801        if self.auth.check_attribute(fid, key):
1802            return True
1803        else:
1804            raise service_error(service_error.access, "Access Denied")
1805
1806
1807
1808    def get_vtopo(self, req, fid):
1809        """
1810        Return the stored virtual topology for this experiment
1811        """
1812        rv = None
1813
1814        req = req.get('VtopoRequestBody', None)
1815        if not req:
1816            raise service_error(service_error.req,
1817                    "Bad request format (no VtopoRequestBody)")
1818        exp = req.get('experiment', None)
1819        if exp:
1820            if exp.has_key('fedid'):
1821                key = exp['fedid']
1822                keytype = "fedid"
1823            elif exp.has_key('localname'):
1824                key = exp['localname']
1825                keytype = "localname"
1826            else:
1827                raise service_error(service_error.req, "Unknown lookup type")
1828        else:
1829            raise service_error(service_error.req, "No request?")
1830
1831        self.check_experiment_access(fid, key)
1832
1833        self.state_lock.acquire()
1834        if self.state.has_key(key):
1835            rv = { 'experiment' : {keytype: key },\
1836                    'vtopo': self.state[key]['vtopo'],\
1837                }
1838        self.state_lock.release()
1839
1840        if rv: return rv
1841        else: raise service_error(service_error.req, "No such experiment")
1842
1843    def get_vis(self, req, fid):
1844        """
1845        Return the stored visualization for this experiment
1846        """
1847        rv = None
1848
1849        req = req.get('VisRequestBody', None)
1850        if not req:
1851            raise service_error(service_error.req,
1852                    "Bad request format (no VisRequestBody)")
1853        exp = req.get('experiment', None)
1854        if exp:
1855            if exp.has_key('fedid'):
1856                key = exp['fedid']
1857                keytype = "fedid"
1858            elif exp.has_key('localname'):
1859                key = exp['localname']
1860                keytype = "localname"
1861            else:
1862                raise service_error(service_error.req, "Unknown lookup type")
1863        else:
1864            raise service_error(service_error.req, "No request?")
1865
1866        self.check_experiment_access(fid, key)
1867
1868        self.state_lock.acquire()
1869        if self.state.has_key(key):
1870            rv =  { 'experiment' : {keytype: key },\
1871                    'vis': self.state[key]['vis'],\
1872                    }
1873        self.state_lock.release()
1874
1875        if rv: return rv
1876        else: raise service_error(service_error.req, "No such experiment")
1877
1878    def get_info(self, req, fid):
1879        """
1880        Return all the stored info about this experiment
1881        """
1882        rv = None
1883
1884        req = req.get('InfoRequestBody', None)
1885        if not req:
1886            raise service_error(service_error.req,
1887                    "Bad request format (no VisRequestBody)")
1888        exp = req.get('experiment', None)
1889        if exp:
1890            if exp.has_key('fedid'):
1891                key = exp['fedid']
1892                keytype = "fedid"
1893            elif exp.has_key('localname'):
1894                key = exp['localname']
1895                keytype = "localname"
1896            else:
1897                raise service_error(service_error.req, "Unknown lookup type")
1898        else:
1899            raise service_error(service_error.req, "No request?")
1900
1901        self.check_experiment_access(fid, key)
1902
1903        # The state may be massaged by the service function that called
1904        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1905        # state.
1906        self.state_lock.acquire()
1907        if self.state.has_key(key):
1908            rv = copy.deepcopy(self.state[key])
1909        self.state_lock.release()
1910
1911        if rv: return rv
1912        else: raise service_error(service_error.req, "No such experiment")
1913
1914
1915    def terminate_experiment(self, req, fid):
1916        """
1917        Swap this experiment out on the federants and delete the shared
1918        information
1919        """
1920        tbparams = { }
1921        req = req.get('TerminateRequestBody', None)
1922        if not req:
1923            raise service_error(service_error.req,
1924                    "Bad request format (no TerminateRequestBody)")
1925        exp = req.get('experiment', None)
1926        if exp:
1927            if exp.has_key('fedid'):
1928                key = exp['fedid']
1929                keytype = "fedid"
1930            elif exp.has_key('localname'):
1931                key = exp['localname']
1932                keytype = "localname"
1933            else:
1934                raise service_error(service_error.req, "Unknown lookup type")
1935        else:
1936            raise service_error(service_error.req, "No request?")
1937
1938        self.check_experiment_access(fid, key)
1939
1940        self.state_lock.acquire()
1941        fed_exp = self.state.get(key, None)
1942
1943        if fed_exp:
1944            # This branch of the conditional holds the lock to generate a
1945            # consistent temporary tbparams variable to deallocate experiments.
1946            # It releases the lock to do the deallocations and reacquires it to
1947            # remove the experiment state when the termination is complete.
1948            ids = []
1949            #  experimentID is a list of dicts that are self-describing
1950            #  identifiers.  This finds all the fedids and localnames - the
1951            #  keys of self.state - and puts them into ids.
1952            for id in fed_exp.get('experimentID', []):
1953                if id.has_key('fedid'): ids.append(id['fedid'])
1954                if id.has_key('localname'): ids.append(id['localname'])
1955
1956            # Construct enough of the tbparams to make the stop_segment calls
1957            # work
1958            for fed in fed_exp['federant']:
1959                try:
1960                    for e in fed['name']:
1961                        eid = e.get('localname', None)
1962                        if eid: break
1963                    else:
1964                        continue
1965
1966                    p = fed['emulab']['project']
1967
1968                    project = p['name']['localname']
1969                    tb = p['testbed']['localname']
1970                    user = p['user'][0]['userID']['localname']
1971
1972                    domain = fed['emulab']['domain']
1973                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1974                    aid = fed['allocID']
1975                except KeyError, e:
1976                    continue
1977                tbparams[tb] = {\
1978                        'user': user,\
1979                        'domain': domain,\
1980                        'project': project,\
1981                        'host': host,\
1982                        'eid': eid,\
1983                        'aid': aid,\
1984                    }
1985            self.state_lock.release()
1986
1987            # Stop everyone.
1988            for tb in tbparams.keys():
1989                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1990
1991            # release the allocations
1992            for tb in tbparams.keys():
1993                self.release_access(tb, tbparams[tb]['aid'])
1994
1995            # Remove the terminated experiment
1996            self.state_lock.acquire()
1997            for id in ids:
1998                if self.state.has_key(id): del self.state[id]
1999
2000            if self.state_filename: self.write_state()
2001            self.state_lock.release()
2002
2003            return { 'experiment': exp }
2004        else:
2005            # Don't forget to release the lock
2006            self.state_lock.release()
2007            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.