source: fedd/federation/experiment_control.py @ e794984

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since e794984 was e794984, checked in by Ted Faber <faber@…>, 15 years ago

remove debugging

  • Property mode set to 100644
File size: 71.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_TerminateSegment = service_caller('TerminateSegment')
200    call_Ns2Split = service_caller('Ns2Split')
201
202    def __init__(self, config=None, auth=None):
203        """
204        Intialize the various attributes, most from the config object
205        """
206
207        def parse_tarfile_list(tf):
208            """
209            Parse a tarfile list from the configuration.  This is a set of
210            paths and tarfiles separated by spaces.
211            """
212            rv = [ ]
213            if tf is not None:
214                tl = tf.split()
215                while len(tl) > 1:
216                    p, t = tl[0:2]
217                    del tl[0:2]
218                    rv.append((p, t))
219            return rv
220
221        self.thread_with_rv = experiment_control_local.pooled_thread
222        self.thread_pool = experiment_control_local.thread_pool
223        self.list_log = experiment_control_local.list_log
224
225        self.cert_file = config.get("experiment_control", "cert_file")
226        if self.cert_file:
227            self.cert_pwd = config.get("experiment_control", "cert_pwd")
228        else:
229            self.cert_file = config.get("globals", "cert_file")
230            self.cert_pwd = config.get("globals", "cert_pwd")
231
232        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
233                or config.get("globals", "trusted_certs")
234
235        self.repodir = config.get("experiment_control", "repodir")
236
237        self.exp_stem = "fed-stem"
238        self.log = logging.getLogger("fedd.experiment_control")
239        set_log_level(config, "experiment_control", self.log)
240        self.muxmax = 2
241        self.nthreads = 2
242        self.randomize_experiments = False
243
244        self.splitter = None
245        self.ssh_keygen = "/usr/bin/ssh-keygen"
246        self.ssh_identity_file = None
247
248
249        self.debug = config.getboolean("experiment_control", "create_debug")
250        self.state_filename = config.get("experiment_control", 
251                "experiment_state")
252        self.splitter_url = config.get("experiment_control", "splitter_uri")
253        self.fedkit = parse_tarfile_list(\
254                config.get("experiment_control", "fedkit"))
255        self.gatewaykit = parse_tarfile_list(\
256                config.get("experiment_control", "gatewaykit"))
257        accessdb_file = config.get("experiment_control", "accessdb")
258
259        self.ssh_pubkey_file = config.get("experiment_control", 
260                "ssh_pubkey_file")
261        self.ssh_privkey_file = config.get("experiment_control",
262                "ssh_privkey_file")
263        # NB for internal master/slave ops, not experiment setup
264        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
265
266        self.overrides = set([])
267        ovr = config.get('experiment_control', 'overrides')
268        if ovr:
269            for o in ovr.split(","):
270                o = o.strip()
271                if o.startswith('fedid:'): o = o[len('fedid:'):]
272                self.overrides.add(fedid(hexstr=o))
273
274        self.state = { }
275        self.state_lock = Lock()
276        self.tclsh = "/usr/local/bin/otclsh"
277        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
278                config.get("experiment_control", "tcl_splitter",
279                        "/usr/testbed/lib/ns2ir/parse.tcl")
280        mapdb_file = config.get("experiment_control", "mapdb")
281        self.trace_file = sys.stderr
282
283        self.def_expstart = \
284                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
285                "/tmp/federate";
286        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
287                "FEDDIR/hosts";
288        self.def_gwstart = \
289                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
290                "/tmp/bridge.log";
291        self.def_mgwstart = \
292                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
293                "/tmp/bridge.log";
294        self.def_gwimage = "FBSD61-TUNNEL2";
295        self.def_gwtype = "pc";
296        self.local_access = { }
297
298        if auth:
299            self.auth = auth
300        else:
301            self.log.error(\
302                    "[access]: No authorizer initialized, creating local one.")
303            auth = authorizer()
304
305
306        if self.ssh_pubkey_file:
307            try:
308                f = open(self.ssh_pubkey_file, 'r')
309                self.ssh_pubkey = f.read()
310                f.close()
311            except IOError:
312                raise service_error(service_error.internal,
313                        "Cannot read sshpubkey")
314        else:
315            raise service_error(service_error.internal, 
316                    "No SSH public key file?")
317
318        if not self.ssh_privkey_file:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322
323        if mapdb_file:
324            self.read_mapdb(mapdb_file)
325        else:
326            self.log.warn("[experiment_control] No testbed map, using defaults")
327            self.tbmap = { 
328                    'deter':'https://users.isi.deterlab.net:23235',
329                    'emulab':'https://users.isi.deterlab.net:23236',
330                    'ucb':'https://users.isi.deterlab.net:23237',
331                    }
332
333        if accessdb_file:
334                self.read_accessdb(accessdb_file)
335        else:
336            raise service_error(service_error.internal,
337                    "No accessdb specified in config")
338
339        # Grab saved state.  OK to do this w/o locking because it's read only
340        # and only one thread should be in existence that can see self.state at
341        # this point.
342        if self.state_filename:
343            self.read_state()
344
345        # Dispatch tables
346        self.soap_services = {\
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354        }
355
356        self.xmlrpc_services = {\
357                'Create': xmlrpc_handler('Create', self.create_experiment),
358                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
359                'Vis': xmlrpc_handler('Vis', self.get_vis),
360                'Info': xmlrpc_handler('Info', self.get_info),
361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
362                'Terminate': xmlrpc_handler('Terminate',
363                    self.terminate_experiment),
364        }
365
366    def copy_file(self, src, dest, size=1024):
367        """
368        Exceedingly simple file copy.
369        """
370        s = open(src,'r')
371        d = open(dest, 'w')
372
373        buf = "x"
374        while buf != "":
375            buf = s.read(size)
376            d.write(buf)
377        s.close()
378        d.close()
379
380    # Call while holding self.state_lock
381    def write_state(self):
382        """
383        Write a new copy of experiment state after copying the existing state
384        to a backup.
385
386        State format is a simple pickling of the state dictionary.
387        """
388        if os.access(self.state_filename, os.W_OK):
389            self.copy_file(self.state_filename, \
390                    "%s.bak" % self.state_filename)
391        try:
392            f = open(self.state_filename, 'w')
393            pickle.dump(self.state, f)
394        except IOError, e:
395            self.log.error("Can't write file %s: %s" % \
396                    (self.state_filename, e))
397        except pickle.PicklingError, e:
398            self.log.error("Pickling problem: %s" % e)
399        except TypeError, e:
400            self.log.error("Pickling problem (TypeError): %s" % e)
401
402    # Call while holding self.state_lock
403    def read_state(self):
404        """
405        Read a new copy of experiment state.  Old state is overwritten.
406
407        State format is a simple pickling of the state dictionary.
408        """
409       
410        def get_experiment_id(state):
411            """
412            Pull the fedid experimentID out of the saved state.  This is kind
413            of a gross walk through the dict.
414            """
415
416            if state.has_key('experimentID'):
417                for e in state['experimentID']:
418                    if e.has_key('fedid'):
419                        return e['fedid']
420                else:
421                    return None
422            else:
423                return None
424
425        def get_alloc_ids(state):
426            """
427            Pull the fedids of the identifiers of each allocation from the
428            state.  Again, a dict dive that's best isolated.
429            """
430
431            return [ f['allocID']['fedid'] 
432                    for f in state.get('federant',[]) \
433                        if f.has_key('allocID') and \
434                            f['allocID'].has_key('fedid')]
435
436
437        try:
438            f = open(self.state_filename, "r")
439            self.state = pickle.load(f)
440            self.log.debug("[read_state]: Read state from %s" % \
441                    self.state_filename)
442        except IOError, e:
443            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
444                    % (self.state_filename, e))
445        except pickle.UnpicklingError, e:
446            self.log.warning(("[read_state]: No saved state: " + \
447                    "Unpickling failed: %s") % e)
448       
449        for s in self.state.values():
450            try:
451
452                eid = get_experiment_id(s)
453                if eid : 
454                    # Give the owner rights to the experiment
455                    self.auth.set_attribute(s['owner'], eid)
456                    # And holders of the eid as well
457                    self.auth.set_attribute(eid, eid)
458                    # allow overrides to control experiments as well
459                    for o in self.overrides:
460                        self.auth.set_attribute(o, eid)
461                    # Set permissions to allow reading of the software repo, if
462                    # any, as well.
463                    for a in get_alloc_ids(s):
464                        self.auth.set_attribute(a, 'repo/%s' % eid)
465                else:
466                    raise KeyError("No experiment id")
467            except KeyError, e:
468                self.log.warning("[read_state]: State ownership or identity " +\
469                        "misformatted in %s: %s" % (self.state_filename, e))
470
471
472    def read_accessdb(self, accessdb_file):
473        """
474        Read the mapping from fedids that can create experiments to their name
475        in the 3-level access namespace.  All will be asserted from this
476        testbed and can include the local username and porject that will be
477        asserted on their behalf by this fedd.  Each fedid is also added to the
478        authorization system with the "create" attribute.
479        """
480        self.accessdb = {}
481        # These are the regexps for parsing the db
482        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
483        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
484                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
485        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
486                "\s*->\s*(" + name_expr + ")\s*$")
487        lineno = 0
488
489        # Parse the mappings and store in self.authdb, a dict of
490        # fedid -> (proj, user)
491        try:
492            f = open(accessdb_file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if len(line) == 0 or line.startswith('#'):
497                    continue
498                m = project_line.match(line)
499                if m:
500                    fid = fedid(hexstr=m.group(1))
501                    project, user = m.group(2,3)
502                    if not self.accessdb.has_key(fid):
503                        self.accessdb[fid] = []
504                    self.accessdb[fid].append((project, user))
505                    continue
506
507                m = user_line.match(line)
508                if m:
509                    fid = fedid(hexstr=m.group(1))
510                    project = None
511                    user = m.group(2)
512                    if not self.accessdb.has_key(fid):
513                        self.accessdb[fid] = []
514                    self.accessdb[fid].append((project, user))
515                    continue
516                self.log.warn("[experiment_control] Error parsing access " +\
517                        "db %s at line %d" %  (accessdb_file, lineno))
518        except IOError:
519            raise service_error(service_error.internal,
520                    "Error opening/reading %s as experiment " +\
521                            "control accessdb" %  accessdb_file)
522        f.close()
523
524        # Initialize the authorization attributes
525        for fid in self.accessdb.keys():
526            self.auth.set_attribute(fid, 'create')
527
528    def read_mapdb(self, file):
529        """
530        Read a simple colon separated list of mappings for the
531        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
532        """
533
534        self.tbmap = { }
535        lineno =0
536        try:
537            f = open(file, "r")
538            for line in f:
539                lineno += 1
540                line = line.strip()
541                if line.startswith('#') or len(line) == 0:
542                    continue
543                try:
544                    label, url = line.split(':', 1)
545                    self.tbmap[label] = url
546                except ValueError, e:
547                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
548                            "map db: %s %s" % (lineno, line, e))
549        except IOError, e:
550            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
551                    "open %s: %s" % (file, e))
552        f.close()
553       
554    def generate_ssh_keys(self, dest, type="rsa" ):
555        """
556        Generate a set of keys for the gateways to use to talk.
557
558        Keys are of type type and are stored in the required dest file.
559        """
560        valid_types = ("rsa", "dsa")
561        t = type.lower();
562        if t not in valid_types: raise ValueError
563        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
564
565        try:
566            trace = open("/dev/null", "w")
567        except IOError:
568            raise service_error(service_error.internal,
569                    "Cannot open /dev/null??");
570
571        # May raise CalledProcessError
572        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
573        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
574        if rv != 0:
575            raise service_error(service_error.internal, 
576                    "Cannot generate nonce ssh keys.  %s return code %d" \
577                            % (self.ssh_keygen, rv))
578
579    def gentopo(self, str):
580        """
581        Generate the topology dtat structure from the splitter's XML
582        representation of it.
583
584        The topology XML looks like:
585            <experiment>
586                <nodes>
587                    <node><vname></vname><ips>ip1:ip2</ips></node>
588                </nodes>
589                <lans>
590                    <lan>
591                        <vname></vname><vnode></vnode><ip></ip>
592                        <bandwidth></bandwidth><member>node:port</member>
593                    </lan>
594                </lans>
595        """
596        class topo_parse:
597            """
598            Parse the topology XML and create the dats structure.
599            """
600            def __init__(self):
601                # Typing of the subelements for data conversion
602                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
603                self.int_subelements = ( 'bandwidth',)
604                self.float_subelements = ( 'delay',)
605                # The final data structure
606                self.nodes = [ ]
607                self.lans =  [ ]
608                self.topo = { \
609                        'node': self.nodes,\
610                        'lan' : self.lans,\
611                    }
612                self.element = { }  # Current element being created
613                self.chars = ""     # Last text seen
614
615            def end_element(self, name):
616                # After each sub element the contents is added to the current
617                # element or to the appropriate list.
618                if name == 'node':
619                    self.nodes.append(self.element)
620                    self.element = { }
621                elif name == 'lan':
622                    self.lans.append(self.element)
623                    self.element = { }
624                elif name in self.str_subelements:
625                    self.element[name] = self.chars
626                    self.chars = ""
627                elif name in self.int_subelements:
628                    self.element[name] = int(self.chars)
629                    self.chars = ""
630                elif name in self.float_subelements:
631                    self.element[name] = float(self.chars)
632                    self.chars = ""
633
634            def found_chars(self, data):
635                self.chars += data.rstrip()
636
637
638        tp = topo_parse();
639        parser = xml.parsers.expat.ParserCreate()
640        parser.EndElementHandler = tp.end_element
641        parser.CharacterDataHandler = tp.found_chars
642
643        parser.Parse(str)
644
645        return tp.topo
646       
647
648    def genviz(self, topo):
649        """
650        Generate the visualization the virtual topology
651        """
652
653        neato = "/usr/local/bin/neato"
654        # These are used to parse neato output and to create the visualization
655        # file.
656        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
657        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
658                "%s</type></node>"
659
660        try:
661            # Node names
662            nodes = [ n['vname'] for n in topo['node'] ]
663            topo_lans = topo['lan']
664        except KeyError, e:
665            raise service_error(service_error.internal, "Bad topology: %s" %e)
666
667        lans = { }
668        links = { }
669
670        # Walk through the virtual topology, organizing the connections into
671        # 2-node connections (links) and more-than-2-node connections (lans).
672        # When a lan is created, it's added to the list of nodes (there's a
673        # node in the visualization for the lan).
674        for l in topo_lans:
675            if links.has_key(l['vname']):
676                if len(links[l['vname']]) < 2:
677                    links[l['vname']].append(l['vnode'])
678                else:
679                    nodes.append(l['vname'])
680                    lans[l['vname']] = links[l['vname']]
681                    del links[l['vname']]
682                    lans[l['vname']].append(l['vnode'])
683            elif lans.has_key(l['vname']):
684                lans[l['vname']].append(l['vnode'])
685            else:
686                links[l['vname']] = [ l['vnode'] ]
687
688
689        # Open up a temporary file for dot to turn into a visualization
690        try:
691            df, dotname = tempfile.mkstemp()
692            dotfile = os.fdopen(df, 'w')
693        except IOError:
694            raise service_error(service_error.internal,
695                    "Failed to open file in genviz")
696
697        try:
698            dnull = open('/dev/null', 'w')
699        except IOError:
700            service_error(service_error.internal,
701                    "Failed to open /dev/null in genviz")
702
703        # Generate a dot/neato input file from the links, nodes and lans
704        try:
705            print >>dotfile, "graph G {"
706            for n in nodes:
707                print >>dotfile, '\t"%s"' % n
708            for l in links.keys():
709                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
710            for l in lans.keys():
711                for n in lans[l]:
712                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
713            print >>dotfile, "}"
714            dotfile.close()
715        except TypeError:
716            raise service_error(service_error.internal,
717                    "Single endpoint link in vtopo")
718        except IOError:
719            raise service_error(service_error.internal, "Cannot write dot file")
720
721        # Use dot to create a visualization
722        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
723                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
724                close_fds=True)
725        dnull.close()
726
727        # Translate dot to vis format
728        vis_nodes = [ ]
729        vis = { 'node': vis_nodes }
730        for line in dot.stdout:
731            m = vis_re.match(line)
732            if m:
733                vn = m.group(1)
734                vis_node = {'name': vn, \
735                        'x': float(m.group(2)),\
736                        'y' : float(m.group(3)),\
737                    }
738                if vn in links.keys() or vn in lans.keys():
739                    vis_node['type'] = 'lan'
740                else:
741                    vis_node['type'] = 'node'
742                vis_nodes.append(vis_node)
743        rv = dot.wait()
744
745        os.remove(dotname)
746        if rv == 0 : return vis
747        else: return None
748
749    def get_access(self, tb, nodes, user, tbparam, master, export_project,
750            access_user):
751        """
752        Get access to testbed through fedd and set the parameters for that tb
753        """
754        uri = self.tbmap.get(tb, None)
755        if not uri:
756            raise service_error(serice_error.server_config, 
757                    "Unknown testbed: %s" % tb)
758
759        # currently this lumps all users into one service access group
760        service_keys = [ a for u in user \
761                for a in u.get('access', []) \
762                    if a.has_key('sshPubkey')]
763
764        if len(service_keys) == 0:
765            raise service_error(service_error.req, 
766                    "Must have at least one SSH pubkey for services")
767
768
769        for p, u in access_user:
770            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
771                    "to %s") %  ((p or "None"), u, uri))
772
773            if p:
774                # Request with user and project specified
775                req = {\
776                        'destinationTestbed' : { 'uri' : uri },
777                        'project': { 
778                            'name': {'localname': p},
779                            'user': [ {'userID': { 'localname': u } } ],
780                            },
781                        'user':  user,
782                        'allocID' : { 'localname': 'test' },
783                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
784                        'serviceAccess' : service_keys
785                    }
786            else:
787                # Request with only user specified
788                req = {\
789                        'destinationTestbed' : { 'uri' : uri },
790                        'user':  [ {'userID': { 'localname': u } } ],
791                        'allocID' : { 'localname': 'test' },
792                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
793                        'serviceAccess' : service_keys
794                    }
795
796            if tb == master:
797                # NB, the export_project parameter is a dict that includes
798                # the type
799                req['exportProject'] = export_project
800
801            # node resources if any
802            if nodes != None and len(nodes) > 0:
803                rnodes = [ ]
804                for n in nodes:
805                    rn = { }
806                    image, hw, count = n.split(":")
807                    if image: rn['image'] = [ image ]
808                    if hw: rn['hardware'] = [ hw ]
809                    if count and int(count) >0 : rn['count'] = int(count)
810                    rnodes.append(rn)
811                req['resources']= { }
812                req['resources']['node'] = rnodes
813
814            try:
815                if self.local_access.has_key(uri):
816                    # Local access call
817                    req = { 'RequestAccessRequestBody' : req }
818                    r = self.local_access[uri].RequestAccess(req, 
819                            fedid(file=self.cert_file))
820                    r = { 'RequestAccessResponseBody' : r }
821                else:
822                    r = self.call_RequestAccess(uri, req, 
823                            self.cert_file, self.cert_pwd, self.trusted_certs)
824            except service_error, e:
825                if e.code == service_error.access:
826                    self.log.debug("[get_access] Access denied")
827                    r = None
828                    continue
829                else:
830                    raise e
831
832            if r.has_key('RequestAccessResponseBody'):
833                # Through to here we have a valid response, not a fault.
834                # Access denied is a fault, so something better or worse than
835                # access denied has happened.
836                r = r['RequestAccessResponseBody']
837                self.log.debug("[get_access] Access granted")
838                break
839            else:
840                raise service_error(service_error.protocol,
841                        "Bad proxy response")
842       
843        if not r:
844            raise service_error(service_error.access, 
845                    "Access denied by %s (%s)" % (tb, uri))
846
847        e = r['emulab']
848        p = e['project']
849        tbparam[tb] = { 
850                "boss": e['boss'],
851                "host": e['ops'],
852                "domain": e['domain'],
853                "fs": e['fileServer'],
854                "eventserver": e['eventServer'],
855                "project": unpack_id(p['name']),
856                "emulab" : e,
857                "allocID" : r['allocID'],
858                }
859        # Make the testbed name be the label the user applied
860        p['testbed'] = {'localname': tb }
861
862        for u in p['user']:
863            role = u.get('role', None)
864            if role == 'experimentCreation':
865                tbparam[tb]['user'] = unpack_id(u['userID'])
866                break
867        else:
868            raise service_error(service_error.internal, 
869                    "No createExperimentUser from %s" %tb)
870
871        # Add attributes to barameter space.  We don't allow attributes to
872        # overlay any parameters already installed.
873        for a in e['fedAttr']:
874            try:
875                if a['attribute'] and isinstance(a['attribute'], basestring)\
876                        and not tbparam[tb].has_key(a['attribute'].lower()):
877                    tbparam[tb][a['attribute'].lower()] = a['value']
878            except KeyError:
879                self.log.error("Bad attribute in response: %s" % a)
880       
881    def release_access(self, tb, aid):
882        """
883        Release access to testbed through fedd
884        """
885
886        uri = self.tbmap.get(tb, None)
887        if not uri:
888            raise service_error(serice_error.server_config, 
889                    "Unknown testbed: %s" % tb)
890
891        if self.local_access.has_key(uri):
892            resp = self.local_access[uri].ReleaseAccess(\
893                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
894                    fedid(file=self.cert_file))
895            resp = { 'ReleaseAccessResponseBody': resp } 
896        else:
897            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
898                    self.cert_file, self.cert_pwd, self.trusted_certs)
899
900        # better error coding
901
902    def remote_splitter(self, uri, desc, master):
903
904        req = {
905                'description' : { 'ns2description': desc },
906                'master': master,
907                'include_fedkit': bool(self.fedkit),
908                'include_gatewaykit': bool(self.gatewaykit)
909            }
910
911        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
912                self.trusted_certs)
913
914        if r.has_key('Ns2SplitResponseBody'):
915            r = r['Ns2SplitResponseBody']
916            if r.has_key('output'):
917                return r['output'].splitlines()
918            else:
919                raise service_error(service_error.protocol, 
920                        "Bad splitter response (no output)")
921        else:
922            raise service_error(service_error.protocol, "Bad splitter response")
923
924    class start_segment:
925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
926                cert_pwd=None, trusted_certs=None, caller=None):
927            self.log = log
928            self.debug = debug
929            self.cert_file = cert_file
930            self.cert_pwd = cert_pwd
931            self.trusted_certs = None
932            self.caller = caller
933            self.testbed = testbed
934
935        def __call__(self, uri, aid, topo, master, attrs=None):
936            req = {
937                    'allocID': { 'fedid' : aid }, 
938                    'segmentdescription': { 
939                        'topdldescription': topo.to_dict(),
940                    },
941                    'master': master,
942                }
943            if attrs:
944                req['fedAttr'] = attrs
945
946            try:
947                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
948                        self.trusted_certs)
949                return True
950            except service_error, e:
951                self.log.error("Start segment failed on %s: %s" % \
952                        (self.testbed, e))
953                return False
954
955
956
957    class terminate_segment:
958        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
959                cert_pwd=None, trusted_certs=None, caller=None):
960            self.log = log
961            self.debug = debug
962            self.cert_file = cert_file
963            self.cert_pwd = cert_pwd
964            self.trusted_certs = None
965            self.caller = caller
966            self.testbed = testbed
967
968        def __call__(self, uri, aid ):
969            req = {
970                    'allocID': aid , 
971                }
972            try:
973                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
974                        self.trusted_certs)
975                return True
976            except service_error, e:
977                self.log.error("Terminate segment failed on %s: %s" % \
978                        (self.testbed, e))
979                return False
980   
981
982    def allocate_resources(self, allocated, master, eid, expid, expcert, 
983            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
984        started = { }           # Testbeds where a sub-experiment started
985                                # successfully
986
987        # XXX
988        fail_soft = False
989
990        log = alloc_log or self.log
991
992        thread_pool = self.thread_pool(self.nthreads)
993        threads = [ ]
994
995        for tb in [ k for k in allocated.keys() if k != master]:
996            # Create and start a thread to start the segment, and save it to
997            # get the return value later
998            thread_pool.wait_for_slot()
999            uri = self.tbmap.get(tb, None)
1000            if not uri:
1001                raise service_error(service_error.internal, 
1002                        "Unknown testbed %s !?" % tb)
1003
1004            if tbparams[tb].has_key('allocID') and \
1005                    tbparams[tb]['allocID'].has_key('fedid'):
1006                aid = tbparams[tb]['allocID']['fedid']
1007            else:
1008                raise service_error(service_error.internal, 
1009                        "No alloc id for testbed %s !?" % tb)
1010
1011            t  = self.pooled_thread(\
1012                    target=self.start_segment(log=log, debug=self.debug,
1013                        testbed=tb, cert_file=self.cert_file,
1014                        cert_pwd=self.cert_pwd,
1015                        trusted_certs=self.trusted_certs,
1016                        caller=self.call_StartSegment), 
1017                    args=(uri, aid, topo[tb], False, attrs), name=tb,
1018                    pdata=thread_pool, trace_file=self.trace_file)
1019            threads.append(t)
1020            t.start()
1021
1022        # Wait until all finish
1023        thread_pool.wait_for_all_done()
1024
1025        # If none failed, start the master
1026        failed = [ t.getName() for t in threads if not t.rv ]
1027
1028        if len(failed) == 0:
1029            uri = self.tbmap.get(master, None)
1030            if not uri:
1031                raise service_error(service_error.internal, 
1032                        "Unknown testbed %s !?" % master)
1033
1034            if tbparams[master].has_key('allocID') and \
1035                    tbparams[master]['allocID'].has_key('fedid'):
1036                aid = tbparams[master]['allocID']['fedid']
1037            else:
1038                raise service_error(service_error.internal, 
1039                    "No alloc id for testbed %s !?" % master)
1040            starter = self.start_segment(log=log, debug=self.debug,
1041                    testbed=master, cert_file=self.cert_file,
1042                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1043                    caller=self.call_StartSegment)
1044            if not starter(uri, aid, topo[master], True, attrs):
1045                failed.append(master)
1046
1047        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1048        # If one failed clean up, unless fail_soft is set
1049        if failed and False:
1050            if not fail_soft:
1051                thread_pool.clear()
1052                for tb in succeeded:
1053                    # Create and start a thread to stop the segment
1054                    thread_pool.wait_for_slot()
1055                    t  = self.pooled_thread(\
1056                            target=self.stop_segment(log=log,
1057                                testbed=tb,
1058                                keyfile=self.ssh_privkey_file,
1059                                debug=self.debug), 
1060                            args=(tb, eid, tbparams), name=tb,
1061                            pdata=thread_pool, trace_file=self.trace_file)
1062                    t.start()
1063                # Wait until all finish
1064                thread_pool.wait_for_all_done()
1065
1066                # release the allocations
1067                for tb in tbparams.keys():
1068                    self.release_access(tb, tbparams[tb]['allocID'])
1069                # Remove the placeholder
1070                self.state_lock.acquire()
1071                self.state[eid]['experimentStatus'] = 'failed'
1072                if self.state_filename: self.write_state()
1073                self.state_lock.release()
1074
1075                log.error("Swap in failed on %s" % ",".join(failed))
1076                return
1077        else:
1078            log.info("[start_segment]: Experiment %s active" % eid)
1079
1080        log.debug("[start_experiment]: removing %s" % tmpdir)
1081
1082        # Walk up tmpdir, deleting as we go
1083        for path, dirs, files in os.walk(tmpdir, topdown=False):
1084            for f in files:
1085                os.remove(os.path.join(path, f))
1086            for d in dirs:
1087                os.rmdir(os.path.join(path, d))
1088        os.rmdir(tmpdir)
1089
1090        # Insert the experiment into our state and update the disk copy
1091        self.state_lock.acquire()
1092        self.state[expid]['experimentStatus'] = 'active'
1093        self.state[eid] = self.state[expid]
1094        if self.state_filename: self.write_state()
1095        self.state_lock.release()
1096        return
1097
1098
1099    def create_experiment(self, req, fid):
1100        """
1101        The external interface to experiment creation called from the
1102        dispatcher.
1103
1104        Creates a working directory, splits the incoming description using the
1105        splitter script and parses out the avrious subsections using the
1106        lcasses above.  Once each sub-experiment is created, use pooled threads
1107        to instantiate them and start it all up.
1108        """
1109
1110        def add_kit(e, kit):
1111            """
1112            Add a Software object created from the list of (install, location)
1113            tuples passed as kit  to the software attribute of an object e.  We
1114            do this enough to break out the code, but it's kind of a hack to
1115            avoid changing the old tuple rep.
1116            """
1117
1118            s = [ topdl.Software(install=i, location=l) for i, l in kit]
1119
1120            if isinstance(e.software, list): e.software.extend(s)
1121            else: e.software = s
1122
1123
1124        if not self.auth.check_attribute(fid, 'create'):
1125            raise service_error(service_error.access, "Create access denied")
1126
1127        try:
1128            tmpdir = tempfile.mkdtemp(prefix="split-")
1129        except IOError:
1130            raise service_error(service_error.internal, "Cannot create tmp dir")
1131
1132        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1133        gw_secretkey_base = "fed.%s" % self.ssh_type
1134        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1135        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1136        tclfile = tmpdir + "/experiment.tcl"
1137        tbparams = { }
1138        try:
1139            access_user = self.accessdb[fid]
1140        except KeyError:
1141            raise service_error(service_error.internal,
1142                    "Access map and authorizer out of sync in " + \
1143                            "create_experiment for fedid %s"  % fid)
1144
1145        pid = "dummy"
1146        gid = "dummy"
1147        try:
1148            os.mkdir(tmpdir+"/keys")
1149        except OSError:
1150            raise service_error(service_error.internal,
1151                    "Can't make temporary dir")
1152
1153        req = req.get('CreateRequestBody', None)
1154        if not req:
1155            raise service_error(service_error.req,
1156                    "Bad request format (no CreateRequestBody)")
1157        # The tcl parser needs to read a file so put the content into that file
1158        descr=req.get('experimentdescription', None)
1159        if descr:
1160            file_content=descr.get('ns2description', None)
1161            if file_content:
1162                try:
1163                    f = open(tclfile, 'w')
1164                    f.write(file_content)
1165                    f.close()
1166                except IOError:
1167                    raise service_error(service_error.internal,
1168                            "Cannot write temp experiment description")
1169            else:
1170                raise service_error(service_error.req, 
1171                        "Only ns2descriptions supported")
1172        else:
1173            raise service_error(service_error.req, "No experiment description")
1174
1175        # Generate an ID for the experiment (slice) and a certificate that the
1176        # allocator can use to prove they own it.  We'll ship it back through
1177        # the encrypted connection.
1178        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1179
1180        if req.has_key('experimentID') and \
1181                req['experimentID'].has_key('localname'):
1182            overwrite = False
1183            eid = req['experimentID']['localname']
1184            # If there's an old failed experiment here with the same local name
1185            # and accessible by this user, we'll overwrite it, otherwise we'll
1186            # fall through and do the collision avoidance.
1187            old_expid = self.get_experiment_fedid(eid)
1188            if old_expid and self.check_experiment_access(fid, old_expid):
1189                self.state_lock.acquire()
1190                status = self.state[eid].get('experimentStatus', None)
1191                if status and status == 'failed':
1192                    # remove the old access attribute
1193                    self.auth.unset_attribute(fid, old_expid)
1194                    overwrite = True
1195                    del self.state[eid]
1196                    del self.state[old_expid]
1197                self.state_lock.release()
1198            self.state_lock.acquire()
1199            while (self.state.has_key(eid) and not overwrite):
1200                eid += random.choice(string.ascii_letters)
1201            # Initial state
1202            self.state[eid] = {
1203                    'experimentID' : \
1204                            [ { 'localname' : eid }, {'fedid': expid } ],
1205                    'experimentStatus': 'starting',
1206                    'experimentAccess': { 'X509' : expcert },
1207                    'owner': fid,
1208                    'log' : [],
1209                }
1210            self.state[expid] = self.state[eid]
1211            if self.state_filename: self.write_state()
1212            self.state_lock.release()
1213        else:
1214            eid = self.exp_stem
1215            for i in range(0,5):
1216                eid += random.choice(string.ascii_letters)
1217            self.state_lock.acquire()
1218            while (self.state.has_key(eid)):
1219                eid = self.exp_stem
1220                for i in range(0,5):
1221                    eid += random.choice(string.ascii_letters)
1222            # Initial state
1223            self.state[eid] = {
1224                    'experimentID' : \
1225                            [ { 'localname' : eid }, {'fedid': expid } ],
1226                    'experimentStatus': 'starting',
1227                    'experimentAccess': { 'X509' : expcert },
1228                    'owner': fid,
1229                    'log' : [],
1230                }
1231            self.state[expid] = self.state[eid]
1232            if self.state_filename: self.write_state()
1233            self.state_lock.release()
1234
1235        try: 
1236            # This catches exceptions to clear the placeholder if necessary
1237            try:
1238                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1239            except ValueError:
1240                raise service_error(service_error.server_config, 
1241                        "Bad key type (%s)" % self.ssh_type)
1242
1243            user = req.get('user', None)
1244            if user == None:
1245                raise service_error(service_error.req, "No user")
1246
1247            master = req.get('master', None)
1248            if not master:
1249                raise service_error(service_error.req,
1250                        "No master testbed label")
1251            export_project = req.get('exportProject', None)
1252            if not export_project:
1253                raise service_error(service_error.req, "No export project")
1254           
1255            if self.splitter_url:
1256                self.log.debug("Calling remote splitter at %s" % \
1257                        self.splitter_url)
1258                split_data = self.remote_splitter(self.splitter_url,
1259                        file_content, master)
1260            else:
1261                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1262                    str(self.muxmax), '-m', master]
1263
1264                if self.fedkit:
1265                    tclcmd.append('-k')
1266
1267                if self.gatewaykit:
1268                    tclcmd.append('-K')
1269
1270                tclcmd.extend([pid, gid, eid, tclfile])
1271
1272                self.log.debug("running local splitter %s", " ".join(tclcmd))
1273                # This is just fantastic.  As a side effect the parser copies
1274                # tb_compat.tcl into the current directory, so that directory
1275                # must be writable by the fedd user.  Doing this in the
1276                # temporary subdir ensures this is the case.
1277                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1278                        cwd=tmpdir)
1279                split_data = tclparser.stdout
1280
1281            allocated = { }         # Testbeds we can access
1282            # Allocate IP addresses: The allocator is a buddy system memory
1283            # allocator.  Allocate from the largest substrate to the
1284            # smallest to make the packing more likely to work - i.e.
1285            # avoiding internal fragmentation.
1286            top = topdl.topology_from_xml(file=split_data, top="experiment")
1287            subs = sorted(top.substrates, 
1288                    cmp=lambda x,y: cmp(len(x.interfaces), 
1289                        len(y.interfaces)),
1290                    reverse=True)
1291            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1292            ifs = { }
1293            hosts = [ ]
1294            # The config urlpath
1295            configpath = "/%s/config" % expid
1296            # The config file system location
1297            configdir ="%s%s" % ( self.repodir, configpath)
1298
1299            for idx, s in enumerate(subs):
1300                a = ips.allocate(len(s.interfaces)+2)
1301                if a :
1302                    base, num = a
1303                    if num < len(s.interfaces) +2 : 
1304                        raise service_error(service_error.internal,
1305                                "Allocator returned wrong number of IPs??")
1306                else:
1307                    raise service_error(service_error.req, 
1308                            "Cannot allocate IP addresses")
1309
1310                base += 1
1311                for i in s.interfaces:
1312                    i.attribute.append(
1313                            topdl.Attribute('ip4_address', 
1314                                "%s" % ip_addr(base)))
1315                    hname = i.element.name[0]
1316                    if ifs.has_key(hname):
1317                        hosts.append("%s\t%s-%s %s-%d" % \
1318                                (ip_addr(base), hname, s.name, hname,
1319                                    ifs[hname]))
1320                    else:
1321                        ifs[hname] = 0
1322                        hosts.append("%s\t%s-%s %s-%d %s" % \
1323                                (ip_addr(base), hname, s.name, hname,
1324                                    ifs[hname], hname))
1325
1326                    ifs[hname] += 1
1327                    base += 1
1328            # save config files
1329            try:
1330                os.makedirs(configdir)
1331            except IOError, e:
1332                raise service_error(
1333                        "Cannot create config directory: %s" % e)
1334            # Find the testbeds to look up
1335            testbeds = set([ a.value for e in top.elements \
1336                    for a in e.attribute \
1337                        if a.attribute == 'testbed'] )
1338
1339
1340            # Make per testbed topologies.  Copy the main topo and remove
1341            # interfaces and nodes that don't live in the testbed.
1342            topo ={ }
1343            for tb in testbeds:
1344                self.get_access(tb, None, user, tbparams, master,
1345                        export_project, access_user)
1346                allocated[tb] = 1
1347                topo[tb] = top.clone()
1348                to_delete = [ ]
1349                for e in topo[tb].elements:
1350                    etb = e.get_attribute('testbed')
1351                    if etb and etb != tb:
1352                        for i in e.interface:
1353                            for s in i.subs:
1354                                try:
1355                                    s.interfaces.remove(i)
1356                                except ValueError:
1357                                    raise service_error(service_error.internal,
1358                                            "Can't remove interface??")
1359                        to_delete.append(e)
1360                for e in to_delete:
1361                    topo[tb].elements.remove(e)
1362                topo[tb].make_indices()
1363
1364                for e in topo[tb].elements:
1365                    if tb == master:
1366                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1367                    else:
1368                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1369                    scmd = e.get_attribute('startup')
1370                    if scmd:
1371                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
1372
1373                    e.set_attribute('startup', cmd)
1374                    if self.fedkit: add_kit(e, self.fedkit)
1375
1376            # Copy configuration files into the remote file store
1377            try:
1378                f = open("%s/hosts" % configdir, "w")
1379                f.write('\n'.join(hosts))
1380                f.close()
1381            except IOError, e:
1382                raise service_error(service_error.internal, 
1383                        "Cannot write hosts file: %s" % e)
1384            try:
1385                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
1386                        (configdir, gw_pubkey_base))
1387                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
1388                        (configdir, gw_secretkey_base))
1389            except IOError, e:
1390                raise service_error(service_error.internal, 
1391                        "Cannot copy keyfiles: %s" % e)
1392
1393            # Allow the individual testbeds to access the configuration files.
1394            for tb in tbparams.keys():
1395                asignee = tbparams[tb]['allocID']['fedid']
1396                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1397                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1398
1399            # Now, for each substrate in the main topology, find those that
1400            # have nodes on more than one testbed.  Insert portal nodes
1401            # into the copies of those substrates on the sub topologies.
1402            for s in top.substrates:
1403                # tbs will contain an ip address on this subsrate that is in
1404                # each testbed.
1405                tbs = { }
1406                for i in s.interfaces:
1407                    e = i.element
1408                    tb = e.get_attribute('testbed')
1409                    if tb and not tbs.has_key(tb):
1410                        for i in e.interface:
1411                            if s in i.subs:
1412                                tbs[tb]= i.get_attribute('ip4_address')
1413                if len(tbs) < 2:
1414                    continue
1415
1416                # More than one testbed is on this substrate.  Insert
1417                # some portals into the subtopologies.  st == source testbed,
1418                # dt == destination testbed.
1419                segment_substrate = { }
1420                for st in tbs.keys():
1421                    segment_substrate[st] = { }
1422                    for dt in [ t for t in tbs.keys() if t != st]:
1423                        myname =  "%stunnel" % dt
1424                        desthost  =  "%stunnel" % st
1425                        sproject = tbparams[st].get('project', 'project')
1426                        dproject = tbparams[dt].get('project', 'project')
1427                        mproject = tbparams[master].get('project', 'project')
1428                        sdomain = tbparams[st].get('domain', ".example.com")
1429                        ddomain = tbparams[dt].get('domain', ".example.com")
1430                        mdomain = tbparams[master].get('domain', '.example.com')
1431                        muser = tbparams[master].get('user', 'root')
1432                        smbshare = tbparams[master].get('smbshare', 'USERS')
1433                        # XXX: active and type need to be unkludged
1434                        active = ("%s" % (st == master))
1435                        if not segment_substrate[st].has_key(dt):
1436                            # Put a substrate and a segment for the connected
1437                            # testbed in there.
1438                            tsubstrate = \
1439                                    topdl.Substrate(name='%s-%s' % (st, dt),
1440                                            attribute= [
1441                                                topdl.Attribute(
1442                                                    attribute='portal',
1443                                                    value='true')
1444                                                ]
1445                                            )
1446                            segment_element = topdl.Segment(
1447                                    id= tbparams[dt]['allocID'],
1448                                    type='emulab',
1449                                    uri = self.tbmap.get(dt, None),
1450                                    interface=[ 
1451                                        topdl.Interface(
1452                                            substrate=tsubstrate.name),
1453                                        ],
1454                                    attribute = [
1455                                        topdl.Attribute(attribute=n, value=v)
1456                                            for n, v in (\
1457                                                ('domain', ddomain),
1458                                                ('experiment', "%s/%s" % \
1459                                                        (dproject, eid)),)
1460                                        ],
1461                                    )
1462                            segment_substrate[st][dt] = tsubstrate
1463                            topo[st].substrates.append(tsubstrate)
1464                            topo[st].elements.append(segment_element)
1465                        portal = topdl.Computer(
1466                                name="%stunnel" % dt,
1467                                attribute=[ 
1468                                    topdl.Attribute(attribute=n,value=v)
1469                                        for n, v in (\
1470                                            ('portal', 'true'),
1471                                            ('domain', sdomain),
1472                                            ('masterdomain', mdomain),
1473                                            ('masterexperiment', "%s/%s" % \
1474                                                    (mproject, eid)),
1475                                            ('masteruser', muser),
1476                                            ('smbshare', smbshare),
1477                                            ('experiment', "%s/%s" % \
1478                                                    (sproject, eid)),
1479                                            ('peer', "%s" % desthost),
1480                                            ('peer_segment', "%s" % \
1481                                                    tbparams[dt]['allocID']['fedid']),
1482                                            ('scriptdir', 
1483                                                "/usr/local/federation/bin"),
1484                                            ('active', "%s" % active),
1485                                            ('portal_type', 'both'), 
1486                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1487                                    ],
1488                                interface=[
1489                                    topdl.Interface(
1490                                        substrate=s.name,
1491                                        attribute=[ 
1492                                            topdl.Attribute(
1493                                                attribute='ip4_address', 
1494                                                value=tbs[dt]
1495                                            )
1496                                        ]),
1497                                    topdl.Interface(
1498                                        substrate=\
1499                                            segment_substrate[st][dt].name,
1500                                        attribute=[
1501                                            topdl.Attribute(attribute='portal',
1502                                                value='true')
1503                                            ]
1504                                        ),
1505                                    ],
1506                                )
1507                        if self.fedkit: add_kit(portal, self.fedkit)
1508                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
1509
1510                        topo[st].elements.append(portal)
1511
1512            # Connect the gateway nodes into the topologies and clear out
1513            # substrates that are not in the topologies
1514            for tb in testbeds:
1515                topo[tb].incorporate_elements()
1516                topo[tb].substrates = \
1517                        [s for s in topo[tb].substrates \
1518                            if len(s.interfaces) >0]
1519
1520            # Copy the rpms and tarfiles to a distribution directory from
1521            # which the federants can retrieve them
1522            linkpath = "%s/software" %  expid
1523            softdir ="%s/%s" % ( self.repodir, linkpath)
1524            softmap = { }
1525            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1526                    for p, t in l ])
1527            pkgs.update([x.location for e in top.elements \
1528                    for x in e.software])
1529            try:
1530                os.makedirs(softdir)
1531            except IOError, e:
1532                raise service_error(
1533                        "Cannot create software directory: %s" % e)
1534            for pkg in pkgs:
1535                loc = pkg
1536
1537                scheme, host, path = urlparse(loc)[0:3]
1538                dest = os.path.basename(path)
1539                if not scheme:
1540                    if not loc.startswith('/'):
1541                        loc = "/%s" % loc
1542                    loc = "file://%s" %loc
1543                try:
1544                    u = urlopen(loc)
1545                except Exception, e:
1546                    raise service_error(service_error.req, 
1547                            "Cannot open %s: %s" % (loc, e))
1548                try:
1549                    f = open("%s/%s" % (softdir, dest) , "w")
1550                    self.log.debug("Writing %s/%s" % (softdir,dest) )
1551                    data = u.read(4096)
1552                    while data:
1553                        f.write(data)
1554                        data = u.read(4096)
1555                    f.close()
1556                    u.close()
1557                except Exception, e:
1558                    raise service_error(service_error.internal,
1559                            "Could not copy %s: %s" % (loc, e))
1560                path = re.sub("/tmp", "", linkpath)
1561                # XXX
1562                softmap[pkg] = \
1563                        "https://users.isi.deterlab.net:23232/%s/%s" %\
1564                        ( path, dest)
1565
1566                # Allow the individual testbeds to access the software.
1567                for tb in tbparams.keys():
1568                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1569                            "/%s/%s" % ( path, dest))
1570
1571            # Convert the software locations in the segments into the local
1572            # copies on this host
1573            for soft in [ s for tb in topo.values() \
1574                    for e in tb.elements \
1575                        if getattr(e, 'software', False) \
1576                            for s in e.software ]:
1577                if softmap.has_key(soft.location):
1578                    soft.location = softmap[soft.location]
1579
1580            vtopo = topdl.topology_to_vtopo(top)
1581            vis = self.genviz(vtopo)
1582
1583            # save federant information
1584            for k in allocated.keys():
1585                tbparams[k]['federant'] = {\
1586                        'name': [ { 'localname' : eid} ],\
1587                        'emulab': tbparams[k]['emulab'],\
1588                        'allocID' : tbparams[k]['allocID'],\
1589                        'master' : k == master,\
1590                    }
1591
1592            self.state_lock.acquire()
1593            self.state[eid]['vtopo'] = vtopo
1594            self.state[eid]['vis'] = vis
1595            self.state[expid]['federant'] = \
1596                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1597                        if tbparams[tb].has_key('federant') ]
1598            if self.state_filename: 
1599                self.write_state()
1600            self.state_lock.release()
1601        except service_error, e:
1602            # If something goes wrong in the parse (usually an access error)
1603            # clear the placeholder state.  From here on out the code delays
1604            # exceptions.  Failing at this point returns a fault to the remote
1605            # caller.
1606
1607            self.state_lock.acquire()
1608            del self.state[eid]
1609            del self.state[expid]
1610            if self.state_filename: self.write_state()
1611            self.state_lock.release()
1612            raise e
1613
1614
1615        # Start the background swapper and return the starting state.  From
1616        # here on out, the state will stick around a while.
1617
1618        # Let users touch the state
1619        self.auth.set_attribute(fid, expid)
1620        self.auth.set_attribute(expid, expid)
1621        # Override fedids can manipulate state as well
1622        for o in self.overrides:
1623            self.auth.set_attribute(o, expid)
1624
1625        # Create a logger that logs to the experiment's state object as well as
1626        # to the main log file.
1627        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1628        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
1629        # XXX: there should be a global one of these rather than repeating the
1630        # code.
1631        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1632                    '%d %b %y %H:%M:%S'))
1633        alloc_log.addHandler(h)
1634       
1635        # XXX
1636        url_base = 'https://users.isi.deterlab.net:23232'
1637        attrs = [ 
1638                {
1639                    'attribute': 'ssh_pubkey', 
1640                    'value': '%s/%s/config/%s' % \
1641                            (url_base, expid, gw_pubkey_base)
1642                },
1643                {
1644                    'attribute': 'ssh_secretkey', 
1645                    'value': '%s/%s/config/%s' % \
1646                            (url_base, expid, gw_secretkey_base)
1647                },
1648                {
1649                    'attribute': 'hosts', 
1650                    'value': '%s/%s/config/hosts' % \
1651                            (url_base, expid)
1652                },
1653                {
1654                    'attribute': 'experiment_name',
1655                    'value': eid,
1656                },
1657            ]
1658
1659        # Start a thread to do the resource allocation
1660        t  = Thread(target=self.allocate_resources,
1661                args=(allocated, master, eid, expid, expcert, tbparams, 
1662                    topo, tmpdir, alloc_log, attrs),
1663                name=eid)
1664        t.start()
1665
1666        rv = {
1667                'experimentID': [
1668                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1669                ],
1670                'experimentStatus': 'starting',
1671                'experimentAccess': { 'X509' : expcert }
1672            }
1673
1674        return rv
1675   
1676    def get_experiment_fedid(self, key):
1677        """
1678        find the fedid associated with the localname key in the state database.
1679        """
1680
1681        rv = None
1682        self.state_lock.acquire()
1683        if self.state.has_key(key):
1684            if isinstance(self.state[key], dict):
1685                try:
1686                    kl = [ f['fedid'] for f in \
1687                            self.state[key]['experimentID']\
1688                                if f.has_key('fedid') ]
1689                except KeyError:
1690                    self.state_lock.release()
1691                    raise service_error(service_error.internal, 
1692                            "No fedid for experiment %s when getting "+\
1693                                    "fedid(!?)" % key)
1694                if len(kl) == 1:
1695                    rv = kl[0]
1696                else:
1697                    self.state_lock.release()
1698                    raise service_error(service_error.internal, 
1699                            "multiple fedids for experiment %s when " +\
1700                                    "getting fedid(!?)" % key)
1701            else:
1702                self.state_lock.release()
1703                raise service_error(service_error.internal, 
1704                        "Unexpected state for %s" % key)
1705        self.state_lock.release()
1706        return rv
1707
1708    def check_experiment_access(self, fid, key):
1709        """
1710        Confirm that the fid has access to the experiment.  Though a request
1711        may be made in terms of a local name, the access attribute is always
1712        the experiment's fedid.
1713        """
1714        if not isinstance(key, fedid):
1715            key = self.get_experiment_fedid(key)
1716
1717        if self.auth.check_attribute(fid, key):
1718            return True
1719        else:
1720            raise service_error(service_error.access, "Access Denied")
1721
1722
1723    def get_handler(self, path, fid):
1724        if self.auth.check_attribute(fid, path):
1725            return ("%s/%s" % (self.repodir, path), "application/binary")
1726        else:
1727            return (None, None)
1728
1729    def get_vtopo(self, req, fid):
1730        """
1731        Return the stored virtual topology for this experiment
1732        """
1733        rv = None
1734        state = None
1735
1736        req = req.get('VtopoRequestBody', None)
1737        if not req:
1738            raise service_error(service_error.req,
1739                    "Bad request format (no VtopoRequestBody)")
1740        exp = req.get('experiment', None)
1741        if exp:
1742            if exp.has_key('fedid'):
1743                key = exp['fedid']
1744                keytype = "fedid"
1745            elif exp.has_key('localname'):
1746                key = exp['localname']
1747                keytype = "localname"
1748            else:
1749                raise service_error(service_error.req, "Unknown lookup type")
1750        else:
1751            raise service_error(service_error.req, "No request?")
1752
1753        self.check_experiment_access(fid, key)
1754
1755        self.state_lock.acquire()
1756        if self.state.has_key(key):
1757            if self.state[key].has_key('vtopo'):
1758                rv = { 'experiment' : {keytype: key },\
1759                        'vtopo': self.state[key]['vtopo'],\
1760                    }
1761            else:
1762                state = self.state[key]['experimentStatus']
1763        self.state_lock.release()
1764
1765        if rv: return rv
1766        else: 
1767            if state:
1768                raise service_error(service_error.partial, 
1769                        "Not ready: %s" % state)
1770            else:
1771                raise service_error(service_error.req, "No such experiment")
1772
1773    def get_vis(self, req, fid):
1774        """
1775        Return the stored visualization for this experiment
1776        """
1777        rv = None
1778        state = None
1779
1780        req = req.get('VisRequestBody', None)
1781        if not req:
1782            raise service_error(service_error.req,
1783                    "Bad request format (no VisRequestBody)")
1784        exp = req.get('experiment', None)
1785        if exp:
1786            if exp.has_key('fedid'):
1787                key = exp['fedid']
1788                keytype = "fedid"
1789            elif exp.has_key('localname'):
1790                key = exp['localname']
1791                keytype = "localname"
1792            else:
1793                raise service_error(service_error.req, "Unknown lookup type")
1794        else:
1795            raise service_error(service_error.req, "No request?")
1796
1797        self.check_experiment_access(fid, key)
1798
1799        self.state_lock.acquire()
1800        if self.state.has_key(key):
1801            if self.state[key].has_key('vis'):
1802                rv =  { 'experiment' : {keytype: key },\
1803                        'vis': self.state[key]['vis'],\
1804                        }
1805            else:
1806                state = self.state[key]['experimentStatus']
1807        self.state_lock.release()
1808
1809        if rv: return rv
1810        else:
1811            if state:
1812                raise service_error(service_error.partial, 
1813                        "Not ready: %s" % state)
1814            else:
1815                raise service_error(service_error.req, "No such experiment")
1816
1817    def clean_info_response(self, rv):
1818        """
1819        Remove the information in the experiment's state object that is not in
1820        the info response.
1821        """
1822        # Remove the owner info (should always be there, but...)
1823        if rv.has_key('owner'): del rv['owner']
1824
1825        # Convert the log into the allocationLog parameter and remove the
1826        # log entry (with defensive programming)
1827        if rv.has_key('log'):
1828            rv['allocationLog'] = "".join(rv['log'])
1829            del rv['log']
1830        else:
1831            rv['allocationLog'] = ""
1832
1833        if rv['experimentStatus'] != 'active':
1834            if rv.has_key('federant'): del rv['federant']
1835        else:
1836            # remove the allocationID info from each federant
1837            for f in rv.get('federant', []):
1838                if f.has_key('allocID'): del f['allocID']
1839        return rv
1840
1841    def get_info(self, req, fid):
1842        """
1843        Return all the stored info about this experiment
1844        """
1845        rv = None
1846
1847        req = req.get('InfoRequestBody', None)
1848        if not req:
1849            raise service_error(service_error.req,
1850                    "Bad request format (no InfoRequestBody)")
1851        exp = req.get('experiment', None)
1852        if exp:
1853            if exp.has_key('fedid'):
1854                key = exp['fedid']
1855                keytype = "fedid"
1856            elif exp.has_key('localname'):
1857                key = exp['localname']
1858                keytype = "localname"
1859            else:
1860                raise service_error(service_error.req, "Unknown lookup type")
1861        else:
1862            raise service_error(service_error.req, "No request?")
1863
1864        self.check_experiment_access(fid, key)
1865
1866        # The state may be massaged by the service function that called
1867        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1868        # state.
1869        self.state_lock.acquire()
1870        if self.state.has_key(key):
1871            rv = copy.deepcopy(self.state[key])
1872        self.state_lock.release()
1873
1874        if rv:
1875            return self.clean_info_response(rv)
1876        else:
1877            raise service_error(service_error.req, "No such experiment")
1878
1879    def get_multi_info(self, req, fid):
1880        """
1881        Return all the stored info that this fedid can access
1882        """
1883        rv = { 'info': [ ] }
1884
1885        self.state_lock.acquire()
1886        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
1887            self.check_experiment_access(fid, key)
1888
1889            if self.state.has_key(key):
1890                e = copy.deepcopy(self.state[key])
1891                e = self.clean_info_response(e)
1892                rv['info'].append(e)
1893        self.state_lock.release()
1894        return rv
1895
1896    def terminate_experiment(self, req, fid):
1897        """
1898        Swap this experiment out on the federants and delete the shared
1899        information
1900        """
1901        tbparams = { }
1902        req = req.get('TerminateRequestBody', None)
1903        if not req:
1904            raise service_error(service_error.req,
1905                    "Bad request format (no TerminateRequestBody)")
1906        force = req.get('force', False)
1907        exp = req.get('experiment', None)
1908        if exp:
1909            if exp.has_key('fedid'):
1910                key = exp['fedid']
1911                keytype = "fedid"
1912            elif exp.has_key('localname'):
1913                key = exp['localname']
1914                keytype = "localname"
1915            else:
1916                raise service_error(service_error.req, "Unknown lookup type")
1917        else:
1918            raise service_error(service_error.req, "No request?")
1919
1920        self.check_experiment_access(fid, key)
1921
1922        dealloc_list = [ ]
1923
1924
1925        # Create a logger that logs to the dealloc_list as well as to the main
1926        # log file.
1927        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
1928        h = logging.StreamHandler(self.list_log(dealloc_list))
1929        # XXX: there should be a global one of these rather than repeating the
1930        # code.
1931        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1932                    '%d %b %y %H:%M:%S'))
1933        dealloc_log.addHandler(h)
1934
1935        self.state_lock.acquire()
1936        fed_exp = self.state.get(key, None)
1937
1938        if fed_exp:
1939            # This branch of the conditional holds the lock to generate a
1940            # consistent temporary tbparams variable to deallocate experiments.
1941            # It releases the lock to do the deallocations and reacquires it to
1942            # remove the experiment state when the termination is complete.
1943
1944            # First make sure that the experiment creation is complete.
1945            status = fed_exp.get('experimentStatus', None)
1946
1947            if status:
1948                if status in ('starting', 'terminating'):
1949                    if not force:
1950                        self.state_lock.release()
1951                        raise service_error(service_error.partial, 
1952                                'Experiment still being created or destroyed')
1953                    else:
1954                        self.log.warning('Experiment in %s state ' % status + \
1955                                'being terminated by force.')
1956            else:
1957                # No status??? trouble
1958                self.state_lock.release()
1959                raise service_error(service_error.internal,
1960                        "Experiment has no status!?")
1961
1962            ids = []
1963            #  experimentID is a list of dicts that are self-describing
1964            #  identifiers.  This finds all the fedids and localnames - the
1965            #  keys of self.state - and puts them into ids.
1966            for id in fed_exp.get('experimentID', []):
1967                if id.has_key('fedid'): ids.append(id['fedid'])
1968                if id.has_key('localname'): ids.append(id['localname'])
1969
1970            # Collect the allocation/segment ids
1971            for fed in fed_exp.get('federant', []):
1972                try:
1973                    tb = fed['emulab']['project']['testbed']['localname']
1974                    aid = fed['allocID']
1975                except KeyError, e:
1976                    continue
1977                tbparams[tb] = aid
1978            fed_exp['experimentStatus'] = 'terminating'
1979            if self.state_filename: self.write_state()
1980            self.state_lock.release()
1981
1982            # Stop everyone.  NB, wait_for_all waits until a thread starts and
1983            # then completes, so we can't wait if nothing starts.  So, no
1984            # tbparams, no start.
1985            if len(tbparams) > 0:
1986                thread_pool = self.thread_pool(self.nthreads)
1987                for tb in tbparams.keys():
1988                    # Create and start a thread to stop the segment
1989                    thread_pool.wait_for_slot()
1990                    uri = self.tbmap.get(tb, None)
1991                    t  = self.pooled_thread(\
1992                            target=self.terminate_segment(log=dealloc_log,
1993                                testbed=tb,
1994                                cert_file=self.cert_file, 
1995                                cert_pwd=self.cert_pwd,
1996                                trusted_certs=self.trusted_certs,
1997                                caller=self.call_TerminateSegment),
1998                            args=(uri, tbparams[tb]), name=tb,
1999                            pdata=thread_pool, trace_file=self.trace_file)
2000                    t.start()
2001                # Wait for completions
2002                thread_pool.wait_for_all_done()
2003
2004            # release the allocations (failed experiments have done this
2005            # already, and starting experiments may be in odd states, so we
2006            # ignore errors releasing those allocations
2007            try: 
2008                for tb in tbparams.keys():
2009                    self.release_access(tb, tbparams[tb])
2010            except service_error, e:
2011                if status != 'failed' and not force:
2012                    raise e
2013
2014            # Remove the terminated experiment
2015            self.state_lock.acquire()
2016            for id in ids:
2017                if self.state.has_key(id): del self.state[id]
2018
2019            if self.state_filename: self.write_state()
2020            self.state_lock.release()
2021
2022            return { 
2023                    'experiment': exp , 
2024                    'deallocationLog': "".join(dealloc_list),
2025                    }
2026        else:
2027            # Don't forget to release the lock
2028            self.state_lock.release()
2029            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.