source: fedd/fedd_experiment_control.py @ bf0a80e

axis_examplecompt_changesinfo-opsversion-1.30version-2.00version-3.01version-3.02
Last change on this file since bf0a80e was bf0a80e, checked in by Ted Faber <faber@…>, 15 years ago

remove an extraneous if/else

  • Property mode set to 100644
File size: 58.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13
14import traceback
15# For parsing visualization output and splitter output
16import xml.parsers.expat
17
18from threading import *
19from subprocess import *
20
21from fedd_services import *
22from fedd_internal_services import *
23from fedd_util import *
24from fedid import fedid, generate_fedid
25from remote_service import xmlrpc_handler, soap_handler, service_caller
26import parse_detail
27from service_error import service_error
28
29
30class nullHandler(logging.Handler):
31    def emit(self, record): pass
32
33fl = logging.getLogger("fedd.experiment_control")
34fl.addHandler(nullHandler())
35
36class fedd_experiment_control_local:
37    """
38    Control of experiments that this system can directly access.
39
40    Includes experiment creation, termination and information dissemination.
41    Thred safe.
42    """
43   
44    class thread_pool:
45        """
46        A class to keep track of a set of threads all invoked for the same
47        task.  Manages the mutual exclusion of the states.
48        """
49        def __init__(self):
50            """
51            Start a pool.
52            """
53            self.changed = Condition()
54            self.started = 0
55            self.terminated = 0
56
57        def acquire(self):
58            """
59            Get the pool's lock.
60            """
61            self.changed.acquire()
62
63        def release(self):
64            """
65            Release the pool's lock.
66            """
67            self.changed.release()
68
69        def wait(self, timeout = None):
70            """
71            Wait for a pool thread to start or stop.
72            """
73            self.changed.wait(timeout)
74
75        def start(self):
76            """
77            Called by a pool thread to report starting.
78            """
79            self.changed.acquire()
80            self.started += 1
81            self.changed.notifyAll()
82            self.changed.release()
83
84        def terminate(self):
85            """
86            Called by a pool thread to report finishing.
87            """
88            self.changed.acquire()
89            self.terminated += 1
90            self.changed.notifyAll()
91            self.changed.release()
92
93        def clear(self):
94            """
95            Clear all pool data.
96            """
97            self.changed.acquire()
98            self.started = 0
99            self.terminated =0
100            self.changed.notifyAll()
101            self.changed.release()
102
103    class pooled_thread(Thread):
104        """
105        One of a set of threads dedicated to a specific task.  Uses the
106        thread_pool class above for coordination.
107        """
108        def __init__(self, group=None, target=None, name=None, args=(), 
109                kwargs={}, pdata=None, trace_file=None):
110            Thread.__init__(self, group, target, name, args, kwargs)
111            self.rv = None          # Return value of the ops in this thread
112            self.exception = None   # Exception that terminated this thread
113            self.target=target      # Target function to run on start()
114            self.args = args        # Args to pass to target
115            self.kwargs = kwargs    # Additional kw args
116            self.pdata = pdata      # thread_pool for this class
117            # Logger for this thread
118            self.log = logging.getLogger("fedd.experiment_control")
119       
120        def run(self):
121            """
122            Emulate Thread.run, except add pool data manipulation and error
123            logging.
124            """
125            if self.pdata:
126                self.pdata.start()
127
128            if self.target:
129                try:
130                    self.rv = self.target(*self.args, **self.kwargs)
131                except service_error, s:
132                    self.exception = s
133                    self.log.error("Thread exception: %s %s" % \
134                            (s.code_string(), s.desc))
135                except:
136                    self.exception = sys.exc_info()[1]
137                    self.log.error(("Unexpected thread exception: %s" +\
138                            "Trace %s") % (self.exception,\
139                                traceback.format_exc()))
140            if self.pdata:
141                self.pdata.terminate()
142
143    call_RequestAccess = service_caller('RequestAccess',
144            'getfeddPortType', feddServiceLocator, 
145            RequestAccessRequestMessage, 'RequestAccessRequestBody')
146
147    call_ReleaseAccess = service_caller('ReleaseAccess',
148            'getfeddPortType', feddServiceLocator, 
149            ReleaseAccessRequestMessage, 'ReleaseAccessRequestBody')
150
151    call_Ns2Split = service_caller('Ns2Split',
152            'getfeddInternalPortType', feddInternalServiceLocator, 
153            Ns2SplitRequestMessage, 'Ns2SplitRequestBody')
154
155    def __init__(self, config=None, auth=None):
156        """
157        Intialize the various attributes, most from the config object
158        """
159        self.thread_with_rv = fedd_experiment_control_local.pooled_thread
160        self.thread_pool = fedd_experiment_control_local.thread_pool
161
162        self.cert_file = None
163        self.cert_pwd = None
164        self.trusted_certs = None
165
166        # Walk through the various relevant certificat specifying config
167        # attributes until the local certificate attributes can be resolved.
168        # The walk is from most specific to most general specification.
169        for s in ("experiment_control", "globals"):
170            if config.has_section(s): 
171                if config.has_option(s, "cert_file"):
172                    if not self.cert_file:
173                        self.cert_file = config.get(s, "cert_file")
174                        self.cert_pwd = config.get(s, "cert_pwd")
175
176                if config.has_option(s, "trusted_certs"):
177                    if not self.trusted_certs:
178                        self.trusted_certs = config.get(s, "trusted_certs")
179
180
181        self.exp_stem = "fed-stem"
182        self.log = logging.getLogger("fedd.experiment_control")
183        self.muxmax = 2
184        self.nthreads = 2
185        self.randomize_experiments = False
186
187        self.scp_exec = "/usr/bin/scp"
188        self.splitter = None
189        self.ssh_exec="/usr/bin/ssh"
190        self.ssh_keygen = "/usr/bin/ssh-keygen"
191        self.ssh_identity_file = None
192
193        self.debug = config.getboolean("experiment_control", "create_debug")
194        self.state_filename = config.get("experiment_control", 
195                "experiment_state_file")
196        self.splitter_url = config.get("experiment_control", "splitter_url")
197        self.fedkit = config.get("experiment_control", "fedkit")
198        accessdb_file = config.get("experiment_control", "accessdb")
199
200        self.ssh_pubkey_file = config.get("experiment_control", 
201                "ssh_pubkey_file")
202        self.ssh_privkey_file = config.get("experiment_control",
203                "ssh_privkey_file")
204        # NB for internal master/slave ops, not experiment setup
205        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
206        self.state = { }
207        self.state_lock = Lock()
208        self.tclsh = "/usr/local/bin/otclsh"
209        self.tcl_splitter = "/usr/testbed/lib/ns2ir/parse.tcl"
210        self.tbmap = { 
211                'deter':'https://users.isi.deterlab.net:23235',
212                'emulab':'https://users.isi.deterlab.net:23236',
213                'ucb':'https://users.isi.deterlab.net:23237',
214                }
215        self.trace_file = sys.stderr
216
217        self.def_expstart = \
218                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
219                "/tmp/federate";
220        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
221                "FEDDIR/hosts";
222        self.def_gwstart = \
223                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
224                "/tmp/bridge.log";
225        self.def_mgwstart = \
226                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
227                "/tmp/bridge.log";
228        self.def_gwimage = "FBSD61-TUNNEL2";
229        self.def_gwtype = "pc";
230
231
232        if self.ssh_pubkey_file:
233            try:
234                f = open(self.ssh_pubkey_file, 'r')
235                self.ssh_pubkey = f.read()
236                f.close()
237            except IOError:
238                raise service_error(service_error.internal,
239                        "Cannot read sshpubkey")
240        else:
241            raise service_error(service_error.internal, 
242                    "No SSH public key file?")
243
244        if not self.ssh_privkey_file:
245            raise service_error(service_error.internal, 
246                    "No SSH public key file?")
247
248        set_log_level(config, "experiment_control", self.log)
249
250        if auth:
251            self.auth = auth
252        else:
253            self.log.error(\
254                    "[access]: No authorizer initialized, creating local one.")
255            auth = authorizer()
256
257        if accessdb_file:
258            try:
259                self.accessdb = self.read_accessdb(accessdb_file)
260            except IOError:
261                raise service_error(service_error.internal,
262                        "Error opening %s as experiment control accessdb" % \
263                                accessdb_file)
264            for fid in self.accessdb.keys():
265                self.auth.set_attribute(fid, 'create')
266        else:
267            raise service_error(service_error.internal,
268                    "No accessdb specified in config")
269
270        # Grab saved state.  OK to do this w/o locking because it's read only
271        # and only one thread should be in existence that can see self.state at
272        # this point.
273        if self.state_filename:
274            self.read_state()
275
276        # Dispatch tables
277        self.soap_services = {\
278                'Create': soap_handler(\
279                        CreateRequestMessage.typecode,
280                        getattr(self, "create_experiment"), 
281                        CreateResponseMessage,
282                        "CreateResponseBody"),
283                'Vtopo': soap_handler(\
284                        VtopoRequestMessage.typecode,
285                        getattr(self, "get_vtopo"),
286                        VtopoResponseMessage,
287                        "VtopoResponseBody"),
288                'Vis': soap_handler(\
289                        VisRequestMessage.typecode,
290                        getattr(self, "get_vis"),
291                        VisResponseMessage,
292                        "VisResponseBody"),
293                'Info': soap_handler(\
294                        InfoRequestMessage.typecode,
295                        getattr(self, "get_info"),
296                        InfoResponseMessage,
297                        "InfoResponseBody"),
298                'Terminate': soap_handler(\
299                        TerminateRequestMessage.typecode,
300                        getattr(self, "terminate_experiment"),
301                        TerminateResponseMessage,
302                        "TerminateResponseBody"),
303        }
304
305        self.xmlrpc_services = {\
306                'Create': xmlrpc_handler(\
307                        getattr(self, "create_experiment"), 
308                        "CreateResponseBody"),
309                'Vtopo': xmlrpc_handler(\
310                        getattr(self, "get_vtopo"),
311                        "VtopoResponseBody"),
312                'Vis': xmlrpc_handler(\
313                        getattr(self, "get_vis"),
314                        "VisResponseBody"),
315                'Info': xmlrpc_handler(\
316                        getattr(self, "get_info"),
317                        "InfoResponseBody"),
318                'Terminate': xmlrpc_handler(\
319                        getattr(self, "terminate_experiment"),
320                        "TerminateResponseBody"),
321        }
322
323    def copy_file(self, src, dest, size=1024):
324        """
325        Exceedingly simple file copy.
326        """
327        s = open(src,'r')
328        d = open(dest, 'w')
329
330        buf = "x"
331        while buf != "":
332            buf = s.read(size)
333            d.write(buf)
334        s.close()
335        d.close()
336
337    # Call while holding self.state_lock
338    def write_state(self):
339        """
340        Write a new copy of experiment state after copying the existing state
341        to a backup.
342
343        State format is a simple pickling of the state dictionary.
344        """
345        if os.access(self.state_filename, os.W_OK):
346            self.copy_file(self.state_filename, \
347                    "%s.bak" % self.state_filename)
348        try:
349            f = open(self.state_filename, 'w')
350            pickle.dump(self.state, f)
351        except IOError, e:
352            self.log.error("Can't write file %s: %s" % \
353                    (self.state_filename, e))
354        except pickle.PicklingError, e:
355            self.log.error("Pickling problem: %s" % e)
356        except TypeError, e:
357            self.log.error("Pickling problem (TypeError): %s" % e)
358
359    # Call while holding self.state_lock
360    def read_state(self):
361        """
362        Read a new copy of experiment state.  Old state is overwritten.
363
364        State format is a simple pickling of the state dictionary.
365        """
366        try:
367            f = open(self.state_filename, "r")
368            self.state = pickle.load(f)
369            self.log.debug("[read_state]: Read state from %s" % \
370                    self.state_filename)
371        except IOError, e:
372            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
373                    % (self.state_filename, e))
374        except pickle.UnpicklingError, e:
375            self.log.warning(("[read_state]: No saved state: " + \
376                    "Unpickling failed: %s") % e)
377       
378        for k in self.state.keys():
379            try:
380                # This list should only have one element in it, but phrasing it
381                # as a for loop doesn't cost much, really.  We have to find the
382                # fedid elements anyway.
383                for eid in [ f['fedid'] \
384                        for f in self.state[k]['experimentID']\
385                            if f.has_key('fedid') ]:
386                    self.auth.set_attribute(self.state[k]['owner'], eid)
387            except KeyError, e:
388                self.log.warning("[read_state]: State ownership or identity " +\
389                        "misformatted in %s: %s" % (self.state_filename, e))
390
391
392    def read_accessdb(self, accessdb_file):
393        access = {}
394        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
395        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
396                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
397        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
398                "\s*->\s*(" + name_expr + ")\s*$")
399        lineno = 0
400
401        f = open(accessdb_file, "r")
402        for line in f:
403            lineno += 1
404            line.strip()
405            if len(line) == 0 or line.startswith('#'):
406                continue
407            m = project_line.match(line)
408            if m:
409                fid = fedid(hexstr=m.group(1))
410                project, user = m.group(2,3)
411                if not access.has_key(fid):
412                    access[fid] = []
413                access[fid].append((project, user))
414                continue
415
416            m = user_line.match(line)
417            if m:
418                fid = fedid(hexstr=m.group(1))
419                project = None
420                user = m.group(2)
421                if not access.has_key(fid):
422                    access[fid] = []
423                access[fid].append((project, user))
424                continue
425            raise service_error(service_error.internal,
426                    "Error parsing access db %s at line %d" % \
427                        (accessdb_file, lineno))
428        f.close()
429        return access
430
431
432    def scp_file(self, file, user, host, dest=""):
433        """
434        scp a file to the remote host.  If debug is set the action is only
435        logged.
436        """
437
438        scp_cmd = [self.scp_exec, '-i', self.ssh_privkey_file, file, 
439                "%s@%s:%s" % (user, host, dest)]
440        rv = 0
441
442        try:
443            dnull = open("/dev/null", "r")
444        except IOError:
445            self.log.debug("[ssh_file]: failed to open /dev/null for redirect")
446            dnull = Null
447
448        self.log.debug("[scp_file]: %s" % " ".join(scp_cmd))
449        if not self.debug:
450            if dnull: rv = call(scp_cmd, stdout=dnull, stderr=dnull)
451            else: rv = call(scp_cmd)
452
453        return rv == 0
454
455    def ssh_cmd(self, user, host, cmd, wname=None):
456        """
457        Run a remote command on host as user.  If debug is set, the action is
458        only logged.
459        """
460        sh_str = "%s -i %s %s@%s %s" % (self.ssh_exec, self.ssh_privkey_file, 
461                user, host, cmd)
462
463        try:
464            dnull = open("/dev/null", "r")
465        except IOError:
466            self.log.debug("[ssh_cmd]: failed to open /dev/null for redirect")
467            dnull = Null
468
469        self.log.debug("[ssh_cmd]: %s" % sh_str)
470        if not self.debug:
471            if dnull:
472                sub = Popen(sh_str, shell=True, stdout=dnull, stderr=dnull)
473            else:
474                sub = Popen(sh_str, shell=True)
475            return sub.wait() == 0
476        else:
477            return True
478
479    def ship_configs(self, host, user, src_dir, dest_dir):
480        """
481        Copy federant-specific configuration files to the federant.
482        """
483        if not self.ssh_cmd(user, host, "mkdir -p %s" % dest_dir):
484            return False
485        if not self.ssh_cmd(user, host, "chmod 770 %s" % dest_dir):
486            return False
487
488        for f in os.listdir(src_dir):
489            if os.path.isdir(f):
490                if not self.ship_configs(host, user, "%s/%s" % (src_dir, f), 
491                        "%s/%s" % (dest_dir, f)):
492                    return False
493            else:
494                if not self.scp_file("%s/%s" % (src_dir, f), 
495                        user, host, dest_dir):
496                    return False
497        return True
498
499    def start_segment(self, tb, eid, tbparams, tmpdir, timeout=0):
500        """
501        Start a sub-experiment on a federant.
502
503        Get the current state, modify or create as appropriate, ship data and
504        configs and start the experiment.  There are small ordering differences
505        based on the initial state of the sub-experiment.
506        """
507        # ops node in the federant
508        host = "%s%s" % (tbparams[tb]['host'], tbparams[tb]['domain'])
509        user = tbparams[tb]['user']     # federant user
510        pid = tbparams[tb]['project']   # federant project
511        # XXX
512        base_confs = ( "hosts",)
513        tclfile = "%s.%s.tcl" % (eid, tb)   # sub-experiment description
514        # command to test experiment state
515        expinfo_exec = "/usr/testbed/bin/expinfo" 
516        # Configuration directories on the remote machine
517        proj_dir = "/proj/%s/exp/%s/tmp" % (pid, eid)
518        tarfiles_dir = "/proj/%s/tarfiles/%s" % (pid, eid)
519        rpms_dir = "/proj/%s/rpms/%s" % (pid, eid)
520        # Regular expressions to parse the expinfo response
521        state_re = re.compile("State:\s+(\w+)")
522        no_exp_re = re.compile("^No\s+such\s+experiment")
523        state = None    # Experiment state parsed from expinfo
524        # The expinfo ssh command
525        cmd = [self.ssh_exec, "%s@%s" % (user, host), expinfo_exec, pid, eid]
526
527        # Get status
528        self.log.debug("[start_segment]: %s"% " ".join(cmd))
529        dev_null = None
530        try:
531            dev_null = open("/dev/null", "a")
532        except IOError, e:
533            self.log.error("[start_segment]: can't open /dev/null: %s" %e)
534
535        if self.debug:
536            state = 'swapped'
537            rv = 0
538        else:
539            status = Popen(cmd, stdout=PIPE, stderr=dev_null)
540            for line in status.stdout:
541                m = state_re.match(line)
542                if m: state = m.group(1)
543                else:
544                    m = no_exp_re.match(line)
545                    if m: state = "none"
546            rv = status.wait()
547
548        # If the experiment is not present the subcommand returns a non-zero
549        # return value.  If we successfully parsed a "none" outcome, ignore the
550        # return code.
551        if rv != 0 and state != "none":
552            raise service_error(service_error.internal,
553                    "Cannot get status of segment %s:%s/%s" % (tb, pid, eid))
554
555        self.log.debug("[start_segment]: %s: %s" % (tb, state))
556        self.log.info("[start_segment]:transferring experiment to %s" % tb)
557
558        if not self.scp_file("%s/%s/%s" % (tmpdir, tb, tclfile), user, host):
559            return False
560        # Clear the federation files
561        if not self.ssh_cmd(user, host, 
562                "/bin/sh -c \"'/bin/rm -rf %s'\"" % proj_dir):
563            return False
564        if not self.ssh_cmd(user, host, 
565                "/bin/sh -c \"'mkdir -p %s'\"" % proj_dir):
566            return False
567        # Clear and create the tarfiles and rpm directories
568        for d in (tarfiles_dir, rpms_dir):
569            if not self.ssh_cmd(user, host, 
570                    "/bin/sh -c \"'/bin/rm -rf %s/*'\"" % d):
571                return False
572            if not self.ssh_cmd(user, host, "mkdir -p %s" % d, 
573                    "create tarfiles"):
574                return False
575       
576        if state == 'active':
577            # Remote experiment is active.  Modify it.
578            for f in base_confs:
579                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
580                        "%s/%s" % (proj_dir, f)):
581                    return False
582            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
583                    proj_dir):
584                return False
585            if os.path.isdir("%s/tarfiles" % tmpdir):
586                if not self.ship_configs(host, user,
587                        "%s/tarfiles" % tmpdir, tarfiles_dir):
588                    return False
589            if os.path.isdir("%s/rpms" % tmpdir):
590                if not self.ship_configs(host, user,
591                        "%s/rpms" % tmpdir, tarfiles_dir):
592                    return False
593            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
594            if not self.ssh_cmd(user, host,
595                    "/usr/testbed/bin/modexp -r -s -w %s %s %s" % \
596                            (pid, eid, tclfile), "modexp"):
597                return False
598            return True
599        elif state == "swapped":
600            # Remote experiment swapped out.  Modify it and swap it in.
601            for f in base_confs:
602                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
603                        "%s/%s" % (proj_dir, f)):
604                    return False
605            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
606                    proj_dir):
607                return False
608            if os.path.isdir("%s/tarfiles" % tmpdir):
609                if not self.ship_configs(host, user,
610                        "%s/tarfiles" % tmpdir, tarfiles_dir):
611                    return False
612            if os.path.isdir("%s/rpms" % tmpdir):
613                if not self.ship_configs(host, user,
614                        "%s/rpms" % tmpdir, tarfiles_dir):
615                    return False
616            self.log.info("[start_segment]: Modifying %s on %s" % (eid, tb))
617            if not self.ssh_cmd(user, host,
618                    "/usr/testbed/bin/modexp -w %s %s %s" % (pid, eid, tclfile),
619                    "modexp"):
620                return False
621            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
622            if not self.ssh_cmd(user, host,
623                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
624                    "swapexp"):
625                return False
626            return True
627        elif state == "none":
628            # No remote experiment.  Create one.  We do this in 2 steps so we
629            # can put the configuration files and scripts into the new
630            # experiment directories.
631
632            # Tarfiles must be present for creation to work
633            if os.path.isdir("%s/tarfiles" % tmpdir):
634                if not self.ship_configs(host, user,
635                        "%s/tarfiles" % tmpdir, tarfiles_dir):
636                    return False
637            if os.path.isdir("%s/rpms" % tmpdir):
638                if not self.ship_configs(host, user,
639                        "%s/rpms" % tmpdir, tarfiles_dir):
640                    return False
641            self.log.info("[start_segment]: Creating %s on %s" % (eid, tb))
642            if not self.ssh_cmd(user, host,
643                    "/usr/testbed/bin/startexp -i -f -w -p %s -e %s %s" % \
644                            (pid, eid, tclfile), "startexp"):
645                return False
646            # After startexp the per-experiment directories exist
647            for f in base_confs:
648                if not self.scp_file("%s/%s" % (tmpdir, f), user, host,
649                        "%s/%s" % (proj_dir, f)):
650                    return False
651            if not self.ship_configs(host, user, "%s/%s" % (tmpdir, tb),
652                    proj_dir):
653                return False
654            self.log.info("[start_segment]: Swapping %s in on %s" % (eid, tb))
655            if not self.ssh_cmd(user, host,
656                    "/usr/testbed/bin/swapexp -w %s %s in" % (pid, eid),
657                    "swapexp"):
658                return False
659            return True
660        else:
661            self.log.debug("[start_segment]:unknown state %s" % state)
662            return False
663
664    def stop_segment(self, tb, eid, tbparams):
665        """
666        Stop a sub experiment by calling swapexp on the federant
667        """
668        user = tbparams[tb]['user']
669        host = tbparams[tb]['host']
670        pid = tbparams[tb]['project']
671
672        self.log.info("[stop_segment]: Stopping %s on %s" % (eid, tb))
673        return self.ssh_cmd(user, host,
674                "/usr/testbed/bin/swapexp -w %s %s out" % (pid, eid))
675
676       
677    def generate_ssh_keys(self, dest, type="rsa" ):
678        """
679        Generate a set of keys for the gateways to use to talk.
680
681        Keys are of type type and are stored in the required dest file.
682        """
683        valid_types = ("rsa", "dsa")
684        t = type.lower();
685        if t not in valid_types: raise ValueError
686        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
687
688        try:
689            trace = open("/dev/null", "w")
690        except IOError:
691            raise service_error(service_error.internal,
692                    "Cannot open /dev/null??");
693
694        # May raise CalledProcessError
695        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
696        rv = call(cmd, stdout=trace, stderr=trace)
697        if rv != 0:
698            raise service_error(service_error.internal, 
699                    "Cannot generate nonce ssh keys.  %s return code %d" \
700                            % (self.ssh_keygen, rv))
701
702    def gentopo(self, str):
703        """
704        Generate the topology dtat structure from the splitter's XML
705        representation of it.
706
707        The topology XML looks like:
708            <experiment>
709                <nodes>
710                    <node><vname></vname><ips>ip1:ip2</ips></node>
711                </nodes>
712                <lans>
713                    <lan>
714                        <vname></vname><vnode></vnode><ip></ip>
715                        <bandwidth></bandwidth><member>node:port</member>
716                    </lan>
717                </lans>
718        """
719        class topo_parse:
720            """
721            Parse the topology XML and create the dats structure.
722            """
723            def __init__(self):
724                # Typing of the subelements for data conversion
725                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
726                self.int_subelements = ( 'bandwidth',)
727                self.float_subelements = ( 'delay',)
728                # The final data structure
729                self.nodes = [ ]
730                self.lans =  [ ]
731                self.topo = { \
732                        'node': self.nodes,\
733                        'lan' : self.lans,\
734                    }
735                self.element = { }  # Current element being created
736                self.chars = ""     # Last text seen
737
738            def end_element(self, name):
739                # After each sub element the contents is added to the current
740                # element or to the appropriate list.
741                if name == 'node':
742                    self.nodes.append(self.element)
743                    self.element = { }
744                elif name == 'lan':
745                    self.lans.append(self.element)
746                    self.element = { }
747                elif name in self.str_subelements:
748                    self.element[name] = self.chars
749                    self.chars = ""
750                elif name in self.int_subelements:
751                    self.element[name] = int(self.chars)
752                    self.chars = ""
753                elif name in self.float_subelements:
754                    self.element[name] = float(self.chars)
755                    self.chars = ""
756
757            def found_chars(self, data):
758                self.chars += data.rstrip()
759
760
761        tp = topo_parse();
762        parser = xml.parsers.expat.ParserCreate()
763        parser.EndElementHandler = tp.end_element
764        parser.CharacterDataHandler = tp.found_chars
765
766        parser.Parse(str)
767
768        return tp.topo
769       
770
771    def genviz(self, topo):
772        """
773        Generate the visualization the virtual topology
774        """
775
776        neato = "/usr/local/bin/neato"
777        # These are used to parse neato output and to create the visualization
778        # file.
779        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
780        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
781                "%s</type></node>"
782
783        try:
784            # Node names
785            nodes = [ n['vname'] for n in topo['node'] ]
786            topo_lans = topo['lan']
787        except KeyError:
788            raise service_error(service_error.internal, "Bad topology")
789
790        lans = { }
791        links = { }
792
793        # Walk through the virtual topology, organizing the connections into
794        # 2-node connections (links) and more-than-2-node connections (lans).
795        # When a lan is created, it's added to the list of nodes (there's a
796        # node in the visualization for the lan).
797        for l in topo_lans:
798            if links.has_key(l['vname']):
799                if len(links[l['vname']]) < 2:
800                    links[l['vname']].append(l['vnode'])
801                else:
802                    nodes.append(l['vname'])
803                    lans[l['vname']] = links[l['vname']]
804                    del links[l['vname']]
805                    lans[l['vname']].append(l['vnode'])
806            elif lans.has_key(l['vname']):
807                lans[l['vname']].append(l['vnode'])
808            else:
809                links[l['vname']] = [ l['vnode'] ]
810
811
812        # Open up a temporary file for dot to turn into a visualization
813        try:
814            df, dotname = tempfile.mkstemp()
815            dotfile = os.fdopen(df, 'w')
816        except IOError:
817            raise service_error(service_error.internal,
818                    "Failed to open file in genviz")
819
820        # Generate a dot/neato input file from the links, nodes and lans
821        try:
822            print >>dotfile, "graph G {"
823            for n in nodes:
824                print >>dotfile, '\t"%s"' % n
825            for l in links.keys():
826                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
827            for l in lans.keys():
828                for n in lans[l]:
829                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
830            print >>dotfile, "}"
831            dotfile.close()
832        except TypeError:
833            raise service_error(service_error.internal,
834                    "Single endpoint link in vtopo")
835        except IOError:
836            raise service_error(service_error.internal, "Cannot write dot file")
837
838        # Use dot to create a visualization
839        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
840                '-Gpack=true', dotname], stdout=PIPE)
841
842        # Translate dot to vis format
843        vis_nodes = [ ]
844        vis = { 'node': vis_nodes }
845        for line in dot.stdout:
846            m = vis_re.match(line)
847            if m:
848                vn = m.group(1)
849                vis_node = {'name': vn, \
850                        'x': float(m.group(2)),\
851                        'y' : float(m.group(3)),\
852                    }
853                if vn in links.keys() or vn in lans.keys():
854                    vis_node['type'] = 'lan'
855                else:
856                    vis_node['type'] = 'node'
857                vis_nodes.append(vis_node)
858        rv = dot.wait()
859
860        os.remove(dotname)
861        if rv == 0 : return vis
862        else: return None
863
864    def get_access(self, tb, nodes, user, tbparam, master, export_project,
865            access_user):
866        """
867        Get access to testbed through fedd and set the parameters for that tb
868        """
869
870        translate_attr = {
871            'slavenodestartcmd': 'expstart',
872            'slaveconnectorstartcmd': 'gwstart',
873            'masternodestartcmd': 'mexpstart',
874            'masterconnectorstartcmd': 'mgwstart',
875            'connectorimage': 'gwimage',
876            'connectortype': 'gwtype',
877            'tunnelcfg': 'tun',
878            'smbshare': 'smbshare',
879        }
880
881        uri = self.tbmap.get(tb, None)
882        if not uri:
883            raise service_error(serice_error.server_config, 
884                    "Unknown testbed: %s" % tb)
885
886        # currently this lumps all users into one service access group
887        service_keys = [ a for u in user \
888                for a in u.get('access', []) \
889                    if a.has_key('sshPubkey')]
890
891        if len(service_keys) == 0:
892            raise service_error(service_error.req, 
893                    "Must have at least one SSH pubkey for services")
894
895
896        for p, u in access_user:
897
898            if p:
899                # Request with user and project specified
900                req = {\
901                        'destinationTestbed' : { 'uri' : uri },
902                        'project': { 
903                            'name': {'localname': p},
904                            'user': [ {'userID': { 'localname': u } } ],
905                            },
906                        'user':  user,
907                        'allocID' : { 'localname': 'test' },
908                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
909                        'serviceAccess' : service_keys
910                    }
911            else:
912                # Request with only user specified
913                req = {\
914                        'destinationTestbed' : { 'uri' : uri },
915                        'user':  [ {'userID': { 'localname': u } } ],
916                        'allocID' : { 'localname': 'test' },
917                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
918                        'serviceAccess' : service_keys
919                    }
920
921            if tb == master:
922                # NB, the export_project parameter is a dict that includes
923                # the type
924                req['exportProject'] = export_project
925
926            # node resources if any
927            if nodes != None and len(nodes) > 0:
928                rnodes = [ ]
929                for n in nodes:
930                    rn = { }
931                    image, hw, count = n.split(":")
932                    if image: rn['image'] = [ image ]
933                    if hw: rn['hardware'] = [ hw ]
934                    if count: rn['count'] = int(count)
935                    rnodes.append(rn)
936                req['resources']= { }
937                req['resources']['node'] = rnodes
938
939            try:
940                r = self.call_RequestAccess(uri, req, 
941                        self.cert_file, self.cert_pwd, self.trusted_certs)
942            except service_error, e:
943                if e.code == service_error.access:
944                    r = None
945                    continue
946                else:
947                    raise e
948
949            if r.has_key('RequestAccessResponseBody'):
950                r = r['RequestAccessResponseBody']
951            else:
952                raise service_error(service_error.protocol,
953                        "Bad proxy response")
954       
955        if not r:
956            raise service_error(service_error.access, 
957                    "Access denied by %s (%s)" % (tb, uri))
958
959        e = r['emulab']
960        p = e['project']
961        tbparam[tb] = { 
962                "boss": e['boss'],
963                "host": e['ops'],
964                "domain": e['domain'],
965                "fs": e['fileServer'],
966                "eventserver": e['eventServer'],
967                "project": unpack_id(p['name']),
968                "emulab" : e,
969                "allocID" : r['allocID'],
970                }
971        # Make the testbed name be the label the user applied
972        p['testbed'] = {'localname': tb }
973
974        for u in p['user']:
975            tbparam[tb]['user'] = unpack_id(u['userID'])
976
977        for a in e['fedAttr']:
978            if a['attribute']:
979                key = translate_attr.get(a['attribute'].lower(), None)
980                if key:
981                    tbparam[tb][key]= a['value']
982       
983    def release_access(self, tb, aid):
984        """
985        Release access to testbed through fedd
986        """
987
988        uri = self.tbmap.get(tb, None)
989        if not uri:
990            raise service_error(serice_error.server_config, 
991                    "Unknown testbed: %s" % tb)
992
993        resp = self.call_ReleaseAccess(uri, {'allocID': aid},
994                self.cert_file, self.cert_pwd, self.trusted_certs)
995
996        # better error coding
997
998    def remote_splitter(self, uri, desc, master):
999
1000        req = {
1001                'description' : { 'ns2description': desc },
1002                'master': master,
1003                'include_fedkit': bool(self.fedkit)
1004            }
1005
1006        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
1007                self.trusted_certs)
1008
1009        if r.has_key('Ns2SplitResponseBody'):
1010            r = r['Ns2SplitResponseBody']
1011            if r.has_key('output'):
1012                return r['output'].splitlines()
1013            else:
1014                raise service_error(service_error.protocol, 
1015                        "Bad splitter response (no output)")
1016        else:
1017            raise service_error(service_error.protocol, "Bad splitter response")
1018       
1019    class current_testbed:
1020        """
1021        Object for collecting the current testbed description.  The testbed
1022        description is saved to a file with the local testbed variables
1023        subsittuted line by line.
1024        """
1025        def __init__(self, eid, tmpdir, fedkit):
1026            self.begin_testbed = re.compile("^#\s+Begin\s+Testbed\s+\((\w+)\)")
1027            self.end_testbed = re.compile("^#\s+End\s+Testbed\s+\((\w+)\)")
1028            self.current_testbed = None
1029            self.testbed_file = None
1030
1031            self.def_expstart = \
1032                    "sudo -H /bin/sh FEDDIR/fed_bootstrap >& /tmp/federate";
1033            self.def_mexpstart = "sudo -H FEDDIR/make_hosts FEDDIR/hosts";
1034            self.def_gwstart = \
1035                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF>& /tmp/bridge.log";
1036            self.def_mgwstart = \
1037                    "sudo -H FEDDIR/fed-tun.pl -f GWCONF >& /tmp/bridge.log";
1038            self.def_gwimage = "FBSD61-TUNNEL2";
1039            self.def_gwtype = "pc";
1040
1041            self.eid = eid
1042            self.tmpdir = tmpdir
1043            self.fedkit = fedkit
1044
1045        def __call__(self, line, master, allocated, tbparams):
1046            # Capture testbed topology descriptions
1047            if self.current_testbed == None:
1048                m = self.begin_testbed.match(line)
1049                if m != None:
1050                    self.current_testbed = m.group(1)
1051                    if self.current_testbed == None:
1052                        raise service_error(service_error.req,
1053                                "Bad request format (unnamed testbed)")
1054                    allocated[self.current_testbed] = \
1055                            allocated.get(self.current_testbed,0) + 1
1056                    tb_dir = "%s/%s" % (self.tmpdir, self.current_testbed)
1057                    if not os.path.exists(tb_dir):
1058                        try:
1059                            os.mkdir(tb_dir)
1060                        except IOError:
1061                            raise service_error(service_error.internal,
1062                                    "Cannot create %s" % tb_dir)
1063                    try:
1064                        self.testbed_file = open("%s/%s.%s.tcl" %
1065                                (tb_dir, self.eid, self.current_testbed), 'w')
1066                    except IOError:
1067                        self.testbed_file = None
1068                    return True
1069                else: return False
1070            else:
1071                m = self.end_testbed.match(line)
1072                if m != None:
1073                    if m.group(1) != self.current_testbed:
1074                        raise service_error(service_error.internal, 
1075                                "Mismatched testbed markers!?")
1076                    if self.testbed_file != None: 
1077                        self.testbed_file.close()
1078                        self.testbed_file = None
1079                    self.current_testbed = None
1080                elif self.testbed_file:
1081                    # Substitute variables and put the line into the local
1082                    # testbed file.
1083                    gwtype = tbparams[self.current_testbed].get('gwtype', 
1084                            self.def_gwtype)
1085                    gwimage = tbparams[self.current_testbed].get('gwimage', 
1086                            self.def_gwimage)
1087                    mgwstart = tbparams[self.current_testbed].get('mgwstart', 
1088                            self.def_mgwstart)
1089                    mexpstart = tbparams[self.current_testbed].get('mexpstart', 
1090                            self.def_mexpstart)
1091                    gwstart = tbparams[self.current_testbed].get('gwstart', 
1092                            self.def_gwstart)
1093                    expstart = tbparams[self.current_testbed].get('expstart', 
1094                            self.def_expstart)
1095                    project = tbparams[self.current_testbed].get('project')
1096                    line = re.sub("GWTYPE", gwtype, line)
1097                    line = re.sub("GWIMAGE", gwimage, line)
1098                    if self.current_testbed == master:
1099                        line = re.sub("GWSTART", mgwstart, line)
1100                        line = re.sub("EXPSTART", mexpstart, line)
1101                    else:
1102                        line = re.sub("GWSTART", gwstart, line)
1103                        line = re.sub("EXPSTART", expstart, line)
1104                    line = re.sub("GWCONF", "FEDDIR`hostname`.gw.conf", line)
1105                    line = re.sub("PROJDIR", "/proj/%s/" % project, line)
1106                    line = re.sub("EID", self.eid, line)
1107                    line = re.sub("FEDDIR", "/proj/%s/exp/%s/tmp/" % \
1108                            (project, self.eid), line)
1109                    if self.fedkit:
1110                        line = re.sub("FEDKIT", os.path.basename(self.fedkit),
1111                                line)
1112                    print >>self.testbed_file, line
1113                return True
1114
1115    class allbeds:
1116        """
1117        Process the Allbeds section.  Get access to each federant and save the
1118        parameters in tbparams
1119        """
1120        def __init__(self, get_access):
1121            self.begin_allbeds = re.compile("^#\s+Begin\s+Allbeds")
1122            self.end_allbeds = re.compile("^#\s+End\s+Allbeds")
1123            self.in_allbeds = False
1124            self.get_access = get_access
1125
1126        def __call__(self, line, user, tbparams, master, export_project,
1127                access_user):
1128            # Testbed access parameters
1129            if not self.in_allbeds:
1130                if self.begin_allbeds.match(line):
1131                    self.in_allbeds = True
1132                    return True
1133                else:
1134                    return False
1135            else:
1136                if self.end_allbeds.match(line):
1137                    self.in_allbeds = False
1138                else:
1139                    nodes = line.split('|')
1140                    tb = nodes.pop(0)
1141                    self.get_access(tb, nodes, user, tbparams, master,
1142                            export_project, access_user)
1143                return True
1144
1145    class gateways:
1146        def __init__(self, eid, master, tmpdir, gw_pubkey,
1147                gw_secretkey, copy_file, fedkit):
1148            self.begin_gateways = \
1149                    re.compile("^#\s+Begin\s+gateways\s+\((\w+)\)")
1150            self.end_gateways = re.compile("^#\s+End\s+gateways\s+\((\w+)\)")
1151            self.current_gateways = None
1152            self.control_gateway = None
1153            self.active_end = { }
1154
1155            self.eid = eid
1156            self.master = master
1157            self.tmpdir = tmpdir
1158            self.gw_pubkey_base = gw_pubkey
1159            self.gw_secretkey_base = gw_secretkey
1160
1161            self.copy_file = copy_file
1162            self.fedkit = fedkit
1163
1164
1165        def gateway_conf_file(self, gw, master, eid, pubkey, privkey,
1166                active_end, tbparams, dtb, myname, desthost, type):
1167            """
1168            Produce a gateway configuration file from a gateways line.
1169            """
1170
1171            sproject = tbparams[gw].get('project', 'project')
1172            dproject = tbparams[dtb].get('project', 'project')
1173            sdomain = ".%s.%s%s" % (eid, sproject,
1174                    tbparams[gw].get('domain', ".example.com"))
1175            ddomain = ".%s.%s%s" % (eid, dproject,
1176                    tbparams[dtb].get('domain', ".example.com"))
1177            boss = tbparams[master].get('boss', "boss")
1178            fs = tbparams[master].get('fs', "fs")
1179            event_server = "%s%s" % \
1180                    (tbparams[gw].get('eventserver', "event_server"),
1181                            tbparams[gw].get('domain', "example.com"))
1182            remote_event_server = "%s%s" % \
1183                    (tbparams[dtb].get('eventserver', "event_server"),
1184                            tbparams[dtb].get('domain', "example.com"))
1185            seer_control = "%s%s" % \
1186                    (tbparams[gw].get('control', "control"), sdomain)
1187
1188            if self.fedkit:
1189                remote_script_dir = "/usr/local/federation/bin"
1190                local_script_dir = "/usr/local/federation/bin"
1191            else:
1192                remote_script_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1193                local_script_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1194
1195            local_key_dir = "/proj/%s/exp/%s/tmp" % ( sproject, eid)
1196            remote_conf_dir = "/proj/%s/exp/%s/tmp" % ( dproject, eid)
1197            tunnel_cfg = tbparams[gw].get("tun", "false")
1198
1199            conf_file = "%s%s.gw.conf" % (myname, sdomain)
1200            remote_conf_file = "%s%s.gw.conf" % (desthost, ddomain)
1201
1202            # translate to lower case so the `hostname` hack for specifying
1203            # configuration files works.
1204            conf_file = conf_file.lower();
1205            remote_conf_file = remote_conf_file.lower();
1206
1207            if dtb == master:
1208                active = "false"
1209            elif gw == master:
1210                active = "true"
1211            elif active_end.has_key['%s-%s' % (dtb, gw)]:
1212                active = "false"
1213            else:
1214                active_end['%s-%s' % (gw, dtb)] = 1
1215                active = "true"
1216
1217            gwconfig = open("%s/%s/%s" % (self.tmpdir, gw, conf_file), "w")
1218            print >>gwconfig, "Active: %s" % active
1219            print >>gwconfig, "TunnelCfg: %s" % tunnel_cfg
1220            print >>gwconfig, "BossName: %s" % boss
1221            print >>gwconfig, "FsName: %s" % fs
1222            print >>gwconfig, "EventServerName: %s" % event_server
1223            print >>gwconfig, "RemoteEventServerName: %s" % remote_event_server
1224            print >>gwconfig, "SeerControl: %s" % seer_control
1225            print >>gwconfig, "Type: %s" % type
1226            print >>gwconfig, "RemoteScriptDir: %s" % remote_script_dir
1227            print >>gwconfig, "EventRepeater: %s/fed_evrepeater" % \
1228                    local_script_dir
1229            print >>gwconfig, "RemoteExperiment: %s/%s" % (dproject, eid)
1230            print >>gwconfig, "LocalExperiment: %s/%s" % (sproject, eid)
1231            print >>gwconfig, "RemoteConfigFile: %s/%s" % \
1232                    (remote_conf_dir, remote_conf_file)
1233            print >>gwconfig, "Peer: %s%s" % (desthost, ddomain)
1234            print >>gwconfig, "Pubkeys: %s/%s" % (local_key_dir, pubkey)
1235            print >>gwconfig, "Privkeys: %s/%s" % (local_key_dir, privkey)
1236            gwconfig.close()
1237
1238            return active == "true"
1239
1240        def __call__(self, line, allocated, tbparams):
1241            # Process gateways
1242            if not self.current_gateways:
1243                m = self.begin_gateways.match(line)
1244                if m:
1245                    self.current_gateways = m.group(1)
1246                    if allocated.has_key(self.current_gateways):
1247                        # This test should always succeed
1248                        tb_dir = "%s/%s" % (self.tmpdir, self.current_gateways)
1249                        if not os.path.exists(tb_dir):
1250                            try:
1251                                os.mkdir(tb_dir)
1252                            except IOError:
1253                                raise service_error(service_error.internal,
1254                                        "Cannot create %s" % tb_dir)
1255                    else:
1256                        # XXX
1257                        self.log.error("[gateways]: Ignoring gateways for " + \
1258                                "unknown testbed %s" % self.current_gateways)
1259                        self.current_gateways = None
1260                    return True
1261                else:
1262                    return False
1263            else:
1264                m = self.end_gateways.match(line)
1265                if m :
1266                    if m.group(1) != self.current_gateways:
1267                        raise service_error(service_error.internal,
1268                                "Mismatched gateway markers!?")
1269                    if self.control_gateway:
1270                        try:
1271                            cc = open("%s/%s/client.conf" %
1272                                    (self.tmpdir, self.current_gateways), 'w')
1273                            print >>cc, "ControlGateway: %s" % \
1274                                    self.control_gateway
1275                            if tbparams[self.master].has_key('smbshare'):
1276                                print >>cc, "SMBSHare: %s" % \
1277                                        tbparams[self.master]['smbshare']
1278                            print >>cc, "ProjectUser: %s" % \
1279                                    tbparams[self.master]['user']
1280                            print >>cc, "ProjectName: %s" % \
1281                                    tbparams[self.master]['project']
1282                            cc.close()
1283                        except IOError:
1284                            raise service_error(service_error.internal,
1285                                    "Error creating client config")
1286                        try:
1287                            cc = open("%s/%s/seer.conf" %
1288                                    (self.tmpdir, self.current_gateways),
1289                                    'w')
1290                            if self.current_gateways != self.master:
1291                                print >>cc, "ControlNode: %s" % \
1292                                        self.control_gateway
1293                            print >>cc, "ExperimentID: %s/%s" % \
1294                                    ( tbparams[self.master]['project'], \
1295                                    self.eid )
1296                            cc.close()
1297                        except IOError:
1298                            raise service_error(service_error.internal,
1299                                    "Error creating seer config")
1300                    else:
1301                        debug.error("[gateways]: No control gateway for %s" %\
1302                                    self.current_gateways)
1303                    self.current_gateways = None
1304                else:
1305                    dtb, myname, desthost, type = line.split(" ")
1306
1307                    if type == "control" or type == "both":
1308                        self.control_gateway = "%s.%s.%s%s" % (myname, 
1309                                self.eid, 
1310                                tbparams[self.current_gateways]['project'],
1311                                tbparams[self.current_gateways]['domain'])
1312                    try:
1313                        active = self.gateway_conf_file(self.current_gateways,
1314                                self.master, self.eid, self.gw_pubkey_base,
1315                                self.gw_secretkey_base,
1316                                self.active_end, tbparams, dtb, myname,
1317                                desthost, type)
1318                    except IOError, e:
1319                        raise service_error(service_error.internal,
1320                                "Failed to write config file for %s" % \
1321                                        self.current_gateway)
1322           
1323                    gw_pubkey = "%s/keys/%s" % \
1324                            (self.tmpdir, self.gw_pubkey_base)
1325                    gw_secretkey = "%s/keys/%s" % \
1326                            (self.tmpdir, self.gw_secretkey_base)
1327
1328                    pkfile = "%s/%s/%s" % \
1329                            ( self.tmpdir, self.current_gateways, 
1330                                    self.gw_pubkey_base)
1331                    skfile = "%s/%s/%s" % \
1332                            ( self.tmpdir, self.current_gateways, 
1333                                    self.gw_secretkey_base)
1334
1335                    if not os.path.exists(pkfile):
1336                        try:
1337                            self.copy_file(gw_pubkey, pkfile)
1338                        except IOError:
1339                            service_error(service_error.internal,
1340                                    "Failed to copy pubkey file")
1341
1342                    if active and not os.path.exists(skfile):
1343                        try:
1344                            self.copy_file(gw_secretkey, skfile)
1345                        except IOError:
1346                            service_error(service_error.internal,
1347                                    "Failed to copy secretkey file")
1348                return True
1349
1350    class shunt_to_file:
1351        """
1352        Simple class to write data between two regexps to a file.
1353        """
1354        def __init__(self, begin, end, filename):
1355            """
1356            Begin shunting on a match of begin, stop on end, send data to
1357            filename.
1358            """
1359            self.begin = re.compile(begin)
1360            self.end = re.compile(end)
1361            self.in_shunt = False
1362            self.file = None
1363            self.filename = filename
1364
1365        def __call__(self, line):
1366            """
1367            Call this on each line in the input that may be shunted.
1368            """
1369            if not self.in_shunt:
1370                if self.begin.match(line):
1371                    self.in_shunt = True
1372                    try:
1373                        self.file = open(self.filename, "w")
1374                    except:
1375                        self.file = None
1376                        raise
1377                    return True
1378                else:
1379                    return False
1380            else:
1381                if self.end.match(line):
1382                    if self.file: 
1383                        self.file.close()
1384                        self.file = None
1385                    self.in_shunt = False
1386                else:
1387                    if self.file:
1388                        print >>self.file, line
1389                return True
1390
1391    class shunt_to_list:
1392        """
1393        Same interface as shunt_to_file.  Data collected in self.list, one list
1394        element per line.
1395        """
1396        def __init__(self, begin, end):
1397            self.begin = re.compile(begin)
1398            self.end = re.compile(end)
1399            self.in_shunt = False
1400            self.list = [ ]
1401       
1402        def __call__(self, line):
1403            if not self.in_shunt:
1404                if self.begin.match(line):
1405                    self.in_shunt = True
1406                    return True
1407                else:
1408                    return False
1409            else:
1410                if self.end.match(line):
1411                    self.in_shunt = False
1412                else:
1413                    self.list.append(line)
1414                return True
1415
1416    class shunt_to_string:
1417        """
1418        Same interface as shunt_to_file.  Data collected in self.str, all in
1419        one string.
1420        """
1421        def __init__(self, begin, end):
1422            self.begin = re.compile(begin)
1423            self.end = re.compile(end)
1424            self.in_shunt = False
1425            self.str = ""
1426       
1427        def __call__(self, line):
1428            if not self.in_shunt:
1429                if self.begin.match(line):
1430                    self.in_shunt = True
1431                    return True
1432                else:
1433                    return False
1434            else:
1435                if self.end.match(line):
1436                    self.in_shunt = False
1437                else:
1438                    self.str += line
1439                return True
1440
1441    def create_experiment(self, req, fid):
1442        """
1443        The external interface to experiment creation called from the
1444        dispatcher.
1445
1446        Creates a working directory, splits the incoming description using the
1447        splitter script and parses out the avrious subsections using the
1448        lcasses above.  Once each sub-experiment is created, use pooled threads
1449        to instantiate them and start it all up.
1450        """
1451
1452        if not self.auth.check_attribute(fid, 'create'):
1453            raise service_error(service_error.access, "Create access denied")
1454
1455        try:
1456            tmpdir = tempfile.mkdtemp(prefix="split-")
1457        except IOError:
1458            raise service_error(service_error.internal, "Cannot create tmp dir")
1459
1460        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1461        gw_secretkey_base = "fed.%s" % self.ssh_type
1462        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1463        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1464        tclfile = tmpdir + "/experiment.tcl"
1465        tbparams = { }
1466        try:
1467            access_user = self.accessdb[fid]
1468        except KeyError:
1469            raise service_error(service_error.internal,
1470                    "Access map and authorizer out of sync in " + \
1471                            "create_experiment for fedid %s"  % fid)
1472
1473        pid = "dummy"
1474        gid = "dummy"
1475        # XXX
1476        fail_soft = False
1477
1478        try:
1479            os.mkdir(tmpdir+"/keys")
1480        except OSError:
1481            raise service_error(service_error.internal,
1482                    "Can't make temporary dir")
1483
1484        req = req.get('CreateRequestBody', None)
1485        if not req:
1486            raise service_error(service_error.req,
1487                    "Bad request format (no CreateRequestBody)")
1488        # The tcl parser needs to read a file so put the content into that file
1489        descr=req.get('experimentdescription', None)
1490        if descr:
1491            file_content=descr.get('ns2description', None)
1492            if file_content:
1493                try:
1494                    f = open(tclfile, 'w')
1495                    f.write(file_content)
1496                    f.close()
1497                except IOError:
1498                    raise service_error(service_error.internal,
1499                            "Cannot write temp experiment description")
1500            else:
1501                raise service_error(service_error.req, 
1502                        "Only ns2descriptions supported")
1503        else:
1504            raise service_error(service_error.req, "No experiment description")
1505
1506        if req.has_key('experimentID') and \
1507                req['experimentID'].has_key('localname'):
1508            eid = req['experimentID']['localname']
1509            self.state_lock.acquire()
1510            while (self.state.has_key(eid)):
1511                eid += random.choice(string.ascii_letters)
1512            # To avoid another thread picking this localname
1513            self.state[eid] = "placeholder"
1514            self.state_lock.release()
1515        else:
1516            eid = self.exp_stem
1517            for i in range(0,5):
1518                eid += random.choice(string.ascii_letters)
1519            self.state_lock.acquire()
1520            while (self.state.has_key(eid)):
1521                eid = self.exp_stem
1522                for i in range(0,5):
1523                    eid += random.choice(string.ascii_letters)
1524            # To avoid another thread picking this localname
1525            self.state[eid] = "placeholder"
1526            self.state_lock.release()
1527
1528        try:
1529            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1530        except ValueError:
1531            raise service_error(service_error.server_config, 
1532                    "Bad key type (%s)" % self.ssh_type)
1533
1534        user = req.get('user', None)
1535        if user == None:
1536            raise service_error(service_error.req, "No user")
1537
1538        master = req.get('master', None)
1539        if not master:
1540            raise service_error(service_error.req, "No master testbed label")
1541        export_project = req.get('exportProject', None)
1542        if not export_project:
1543            raise service_error(service_error.req, "No export project")
1544       
1545        if self.splitter_url:
1546            self.log.debug("Calling remote splitter at %s" % self.splitter_url)
1547            split_data = self.remote_splitter(self.splitter_url, file_content,
1548                    master)
1549        else:
1550            tclcmd = [self.tclsh, self.tcl_splitter, '-s', '-x', 
1551                str(self.muxmax), '-m', master]
1552
1553            if self.fedkit:
1554                tclcmd.append('-k')
1555
1556            tclcmd.extend([pid, gid, eid, tclfile])
1557
1558            self.log.debug("running local splitter %s", " ".join(tclcmd))
1559            tclparser = Popen(tclcmd, stdout=PIPE)
1560            split_data = tclparser.stdout
1561
1562        allocated = { }     # Testbeds we can access
1563        started = { }       # Testbeds where a sub-experiment started
1564                            # successfully
1565
1566        # Objects to parse the splitter output (defined above)
1567        parse_current_testbed = self.current_testbed(eid, tmpdir, self.fedkit)
1568        parse_allbeds = self.allbeds(self.get_access)
1569        parse_gateways = self.gateways(eid, master, tmpdir,
1570                gw_pubkey_base, gw_secretkey_base, self.copy_file, self.fedkit)
1571        parse_vtopo = self.shunt_to_string("^#\s+Begin\s+Vtopo",
1572                    "^#\s+End\s+Vtopo")
1573        parse_hostnames = self.shunt_to_file("^#\s+Begin\s+hostnames",
1574                    "^#\s+End\s+hostnames", tmpdir + "/hosts")
1575        parse_tarfiles = self.shunt_to_list("^#\s+Begin\s+tarfiles",
1576                "^#\s+End\s+tarfiles")
1577        parse_rpms = self.shunt_to_list("^#\s+Begin\s+rpms",
1578                "^#\s+End\s+rpms")
1579
1580        # Worling on the split data
1581        for line in split_data:
1582            line = line.rstrip()
1583            if parse_current_testbed(line, master, allocated, tbparams):
1584                continue
1585            elif parse_allbeds(line, user, tbparams, master, export_project,
1586                    access_user):
1587                continue
1588            elif parse_gateways(line, allocated, tbparams):
1589                continue
1590            elif parse_vtopo(line):
1591                continue
1592            elif parse_hostnames(line):
1593                continue
1594            elif parse_tarfiles(line):
1595                continue
1596            elif parse_rpms(line):
1597                continue
1598            else:
1599                raise service_error(service_error.internal, 
1600                        "Bad tcl parse? %s" % line)
1601
1602        # Virtual topology and visualization
1603        vtopo = self.gentopo(parse_vtopo.str)
1604        if not vtopo:
1605            raise service_error(service_error.internal, 
1606                    "Failed to generate virtual topology")
1607
1608        vis = self.genviz(vtopo)
1609        if not vis:
1610            raise service_error(service_error.internal, 
1611                    "Failed to generate visualization")
1612
1613        # save federant information
1614        for k in allocated.keys():
1615            tbparams[k]['federant'] = {\
1616                    'name': [ { 'localname' : eid} ],\
1617                    'emulab': tbparams[k]['emulab'],\
1618                    'allocID' : tbparams[k]['allocID'],\
1619                    'master' : k == master,\
1620                }
1621
1622
1623        # Copy tarfiles and rpms needed at remote sites into a staging area
1624        try:
1625            if self.fedkit:
1626                parse_tarfiles.list.append(self.fedkit)
1627            for t in parse_tarfiles.list:
1628                if not os.path.exists("%s/tarfiles" % tmpdir):
1629                    os.mkdir("%s/tarfiles" % tmpdir)
1630                self.copy_file(t, "%s/tarfiles/%s" % \
1631                        (tmpdir, os.path.basename(t)))
1632            for r in parse_rpms.list:
1633                if not os.path.exists("%s/rpms" % tmpdir):
1634                    os.mkdir("%s/rpms" % tmpdir)
1635                self.copy_file(r, "%s/rpms/%s" % \
1636                        (tmpdir, os.path.basename(r)))
1637        except IOError, e:
1638            raise service_error(service_error.internal, 
1639                    "Cannot stage tarfile/rpm: %s" % e.strerror)
1640
1641        thread_pool_info = self.thread_pool()
1642        threads = [ ]
1643
1644        for tb in [ k for k in allocated.keys() if k != master]:
1645            # Wait until we have a free slot to start the next testbed load
1646            thread_pool_info.acquire()
1647            while thread_pool_info.started - \
1648                    thread_pool_info.terminated >= self.nthreads:
1649                thread_pool_info.wait()
1650            thread_pool_info.release()
1651
1652            # Create and start a thread to start the segment, and save it to
1653            # get the return value later
1654            t  = self.pooled_thread(target=self.start_segment, 
1655                    args=(tb, eid, tbparams, tmpdir, 0), name=tb,
1656                    pdata=thread_pool_info, trace_file=self.trace_file)
1657            threads.append(t)
1658            t.start()
1659
1660        # Wait until all finish (the first clause of the while is to make sure
1661        # one starts)
1662        thread_pool_info.acquire()
1663        while thread_pool_info.started == 0 or \
1664                thread_pool_info.started > thread_pool_info.terminated:
1665            thread_pool_info.wait()
1666        thread_pool_info.release()
1667
1668        # If none failed, start the master
1669        failed = [ t.getName() for t in threads if not t.rv ]
1670
1671        if len(failed) == 0:
1672            if not self.start_segment(master, eid, tbparams, tmpdir):
1673                failed.append(master)
1674
1675        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1676        # If one failed clean up, unless fail_soft is set
1677        if failed:
1678            if not fail_soft:
1679                for tb in succeeded:
1680                    self.stop_segment(tb, eid, tbparams)
1681                # Remove the placeholder
1682                self.state_lock.acquire()
1683                del self.state[eid]
1684                self.state_lock.release()
1685
1686                raise service_error(service_error.federant,
1687                    "Swap in failed on %s" % ",".join(failed))
1688        else:
1689            self.log.info("[start_segment]: Experiment %s started" % eid)
1690
1691        # Generate an ID for the experiment (slice) and a certificate that the
1692        # allocator can use to prove they own it.  We'll ship it back through
1693        # the encrypted connection.
1694        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1695
1696        self.log.debug("[start_experiment]: removing %s" % tmpdir)
1697
1698        # Walk up tmpdir, deleting as we go
1699        for path, dirs, files in os.walk(tmpdir, topdown=False):
1700            for f in files:
1701                os.remove(os.path.join(path, f))
1702            for d in dirs:
1703                os.rmdir(os.path.join(path, d))
1704        os.rmdir(tmpdir)
1705
1706        # The deepcopy prevents the allocation ID and other binaries from being
1707        # translated into other formats
1708        resp = { 'federant' : [ copy.deepcopy(tbparams[tb]['federant']) \
1709                for tb in tbparams.keys() \
1710                    if tbparams[tb].has_key('federant') ],\
1711                    'vtopo': vtopo,\
1712                    'vis' : vis,
1713                    'experimentID' : [\
1714                            { 'fedid': copy.copy(expid) }, \
1715                            { 'localname': eid },\
1716                        ],\
1717                    'experimentAccess': { 'X509' : expcert },\
1718                }
1719
1720        # Insert the experiment into our state and update the disk copy
1721        self.state_lock.acquire()
1722        self.state[expid] = { 'federant' : [ tbparams[tb]['federant'] \
1723                for tb in tbparams.keys() \
1724                    if tbparams[tb].has_key('federant') ],\
1725                    'vtopo': vtopo,\
1726                    'vis' : vis,
1727                    'owner': fid,
1728                    'experimentID' : [\
1729                            { 'fedid': expid }, { 'localname': eid },\
1730                        ],\
1731                }
1732        self.state[eid] = self.state[expid]
1733        if self.state_filename: self.write_state()
1734        self.state_lock.release()
1735
1736        self.auth.set_attribute(fid, expid)
1737        self.auth.set_attribute(expid, expid)
1738
1739        if not failed:
1740            return resp
1741        else:
1742            raise service_error(service_error.partial, \
1743                    "Partial swap in on %s" % ",".join(succeeded))
1744
1745    def check_experiment_access(self, fid, key):
1746        """
1747        Confirm that the fid has access to the experiment.  Though a request
1748        may be made in terms of a local name, the access attribute is always
1749        the experiment's fedid.
1750        """
1751        if not isinstance(key, fedid):
1752            self.state_lock.acquire()
1753            if self.state.has_key(key):
1754                try:
1755                    kl = [ f['fedid'] for f in self.state[key]['experimentID']\
1756                            if f.has_key('fedid') ]
1757                except KeyError:
1758                    self.state_lock.release()
1759                    raise service_error(service_error.internal, 
1760                            "No fedid for experiment %s when checking " +\
1761                                    "access(!?)" % key)
1762                if len(kl) == 1:
1763                    key = kl[0]
1764                else:
1765                    self.state_lock.release()
1766                    raise service_error(service_error.internal, 
1767                            "multiple fedids for experiment %s when " +\
1768                                    "checking access(!?)" % key)
1769            else:
1770                self.state_lock.release()
1771                raise service_error(service_error.access, "Access Denied")
1772            self.state_lock.release()
1773
1774        if self.auth.check_attribute(fid, key):
1775            return True
1776        else:
1777            raise service_error(service_error.access, "Access Denied")
1778
1779
1780
1781    def get_vtopo(self, req, fid):
1782        """
1783        Return the stored virtual topology for this experiment
1784        """
1785        rv = None
1786
1787        req = req.get('VtopoRequestBody', None)
1788        if not req:
1789            raise service_error(service_error.req,
1790                    "Bad request format (no VtopoRequestBody)")
1791        exp = req.get('experiment', None)
1792        if exp:
1793            if exp.has_key('fedid'):
1794                key = exp['fedid']
1795                keytype = "fedid"
1796            elif exp.has_key('localname'):
1797                key = exp['localname']
1798                keytype = "localname"
1799            else:
1800                raise service_error(service_error.req, "Unknown lookup type")
1801        else:
1802            raise service_error(service_error.req, "No request?")
1803
1804        self.check_experiment_access(fid, key)
1805
1806        self.state_lock.acquire()
1807        if self.state.has_key(key):
1808            rv = { 'experiment' : {keytype: key },\
1809                    'vtopo': self.state[key]['vtopo'],\
1810                }
1811        self.state_lock.release()
1812
1813        if rv: return rv
1814        else: raise service_error(service_error.req, "No such experiment")
1815
1816    def get_vis(self, req, fid):
1817        """
1818        Return the stored visualization for this experiment
1819        """
1820        rv = None
1821
1822        req = req.get('VisRequestBody', None)
1823        if not req:
1824            raise service_error(service_error.req,
1825                    "Bad request format (no VisRequestBody)")
1826        exp = req.get('experiment', None)
1827        if exp:
1828            if exp.has_key('fedid'):
1829                key = exp['fedid']
1830                keytype = "fedid"
1831            elif exp.has_key('localname'):
1832                key = exp['localname']
1833                keytype = "localname"
1834            else:
1835                raise service_error(service_error.req, "Unknown lookup type")
1836        else:
1837            raise service_error(service_error.req, "No request?")
1838
1839        self.check_experiment_access(fid, key)
1840
1841        self.state_lock.acquire()
1842        if self.state.has_key(key):
1843            rv =  { 'experiment' : {keytype: key },\
1844                    'vis': self.state[key]['vis'],\
1845                    }
1846        self.state_lock.release()
1847
1848        if rv: return rv
1849        else: raise service_error(service_error.req, "No such experiment")
1850
1851    def get_info(self, req, fid):
1852        """
1853        Return all the stored info about this experiment
1854        """
1855        rv = None
1856
1857        req = req.get('InfoRequestBody', None)
1858        if not req:
1859            raise service_error(service_error.req,
1860                    "Bad request format (no VisRequestBody)")
1861        exp = req.get('experiment', None)
1862        if exp:
1863            if exp.has_key('fedid'):
1864                key = exp['fedid']
1865                keytype = "fedid"
1866            elif exp.has_key('localname'):
1867                key = exp['localname']
1868                keytype = "localname"
1869            else:
1870                raise service_error(service_error.req, "Unknown lookup type")
1871        else:
1872            raise service_error(service_error.req, "No request?")
1873
1874        self.check_experiment_access(fid, key)
1875
1876        # The state may be massaged by the service function that called
1877        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1878        # state.
1879        self.state_lock.acquire()
1880        if self.state.has_key(key):
1881            rv = copy.deepcopy(self.state[key])
1882        self.state_lock.release()
1883
1884        if rv: return rv
1885        else: raise service_error(service_error.req, "No such experiment")
1886
1887
1888    def terminate_experiment(self, req, fid):
1889        """
1890        Swap this experiment out on the federants and delete the shared
1891        information
1892        """
1893        tbparams = { }
1894        req = req.get('TerminateRequestBody', None)
1895        if not req:
1896            raise service_error(service_error.req,
1897                    "Bad request format (no TerminateRequestBody)")
1898        exp = req.get('experiment', None)
1899        if exp:
1900            if exp.has_key('fedid'):
1901                key = exp['fedid']
1902                keytype = "fedid"
1903            elif exp.has_key('localname'):
1904                key = exp['localname']
1905                keytype = "localname"
1906            else:
1907                raise service_error(service_error.req, "Unknown lookup type")
1908        else:
1909            raise service_error(service_error.req, "No request?")
1910
1911        self.check_experiment_access(fid, key)
1912
1913        self.state_lock.acquire()
1914        fed_exp = self.state.get(key, None)
1915
1916        if fed_exp:
1917            # This branch of the conditional holds the lock to generate a
1918            # consistent temporary tbparams variable to deallocate experiments.
1919            # It releases the lock to do the deallocations and reacquires it to
1920            # remove the experiment state when the termination is complete.
1921            ids = []
1922            #  experimentID is a list of dicts that are self-describing
1923            #  identifiers.  This finds all the fedids and localnames - the
1924            #  keys of self.state - and puts them into ids.
1925            for id in fed_exp.get('experimentID', []):
1926                if id.has_key('fedid'): ids.append(id['fedid'])
1927                if id.has_key('localname'): ids.append(id['localname'])
1928
1929            # Construct enough of the tbparams to make the stop_segment calls
1930            # work
1931            for fed in fed_exp['federant']:
1932                try:
1933                    for e in fed['name']:
1934                        eid = e.get('localname', None)
1935                        if eid: break
1936                    else:
1937                        continue
1938
1939                    p = fed['emulab']['project']
1940
1941                    project = p['name']['localname']
1942                    tb = p['testbed']['localname']
1943                    user = p['user'][0]['userID']['localname']
1944
1945                    domain = fed['emulab']['domain']
1946                    host  = "%s%s" % (fed['emulab']['ops'], domain)
1947                    aid = fed['allocID']
1948                except KeyError, e:
1949                    continue
1950                tbparams[tb] = {\
1951                        'user': user,\
1952                        'domain': domain,\
1953                        'project': project,\
1954                        'host': host,\
1955                        'eid': eid,\
1956                        'aid': aid,\
1957                    }
1958            self.state_lock.release()
1959
1960            # Stop everyone.
1961            for tb in tbparams.keys():
1962                self.stop_segment(tb, tbparams[tb]['eid'], tbparams)
1963
1964            # release the allocations
1965            for tb in tbparams.keys():
1966                self.release_access(tb, tbparams[tb]['aid'])
1967
1968            # Remove the terminated experiment
1969            self.state_lock.acquire()
1970            for id in ids:
1971                if self.state.has_key(id): del self.state[id]
1972
1973            if self.state_filename: self.write_state()
1974            self.state_lock.release()
1975
1976            return { 'experiment': exp }
1977        else:
1978            # Don't forget to release the lock
1979            self.state_lock.release()
1980            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.