source: fedd/federation/experiment_control.py @ e19b75c

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since e19b75c was e19b75c, checked in by Ted Faber <faber@…>, 15 years ago

remove old code

  • Property mode set to 100644
File size: 72.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_TerminateSegment = service_caller('TerminateSegment')
200    call_Ns2Split = service_caller('Ns2Split')
201
202    def __init__(self, config=None, auth=None):
203        """
204        Intialize the various attributes, most from the config object
205        """
206
207        def parse_tarfile_list(tf):
208            """
209            Parse a tarfile list from the configuration.  This is a set of
210            paths and tarfiles separated by spaces.
211            """
212            rv = [ ]
213            if tf is not None:
214                tl = tf.split()
215                while len(tl) > 1:
216                    p, t = tl[0:2]
217                    del tl[0:2]
218                    rv.append((p, t))
219            return rv
220
221        self.thread_with_rv = experiment_control_local.pooled_thread
222        self.thread_pool = experiment_control_local.thread_pool
223        self.list_log = experiment_control_local.list_log
224
225        self.cert_file = config.get("experiment_control", "cert_file")
226        if self.cert_file:
227            self.cert_pwd = config.get("experiment_control", "cert_pwd")
228        else:
229            self.cert_file = config.get("globals", "cert_file")
230            self.cert_pwd = config.get("globals", "cert_pwd")
231
232        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
233                or config.get("globals", "trusted_certs")
234
235        self.repodir = config.get("experiment_control", "repodir")
236
237        self.exp_stem = "fed-stem"
238        self.log = logging.getLogger("fedd.experiment_control")
239        set_log_level(config, "experiment_control", self.log)
240        self.muxmax = 2
241        self.nthreads = 2
242        self.randomize_experiments = False
243
244        self.splitter = None
245        self.ssh_keygen = "/usr/bin/ssh-keygen"
246        self.ssh_identity_file = None
247
248
249        self.debug = config.getboolean("experiment_control", "create_debug")
250        self.state_filename = config.get("experiment_control", 
251                "experiment_state")
252        self.splitter_url = config.get("experiment_control", "splitter_uri")
253        self.fedkit = parse_tarfile_list(\
254                config.get("experiment_control", "fedkit"))
255        self.gatewaykit = parse_tarfile_list(\
256                config.get("experiment_control", "gatewaykit"))
257        accessdb_file = config.get("experiment_control", "accessdb")
258
259        self.ssh_pubkey_file = config.get("experiment_control", 
260                "ssh_pubkey_file")
261        self.ssh_privkey_file = config.get("experiment_control",
262                "ssh_privkey_file")
263        # NB for internal master/slave ops, not experiment setup
264        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
265
266        self.overrides = set([])
267        ovr = config.get('experiment_control', 'overrides')
268        if ovr:
269            for o in ovr.split(","):
270                o = o.strip()
271                if o.startswith('fedid:'): o = o[len('fedid:'):]
272                self.overrides.add(fedid(hexstr=o))
273
274        self.state = { }
275        self.state_lock = Lock()
276        self.tclsh = "/usr/local/bin/otclsh"
277        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
278                config.get("experiment_control", "tcl_splitter",
279                        "/usr/testbed/lib/ns2ir/parse.tcl")
280        mapdb_file = config.get("experiment_control", "mapdb")
281        self.trace_file = sys.stderr
282
283        self.def_expstart = \
284                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
285                "/tmp/federate";
286        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
287                "FEDDIR/hosts";
288        self.def_gwstart = \
289                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
290                "/tmp/bridge.log";
291        self.def_mgwstart = \
292                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
293                "/tmp/bridge.log";
294        self.def_gwimage = "FBSD61-TUNNEL2";
295        self.def_gwtype = "pc";
296        self.local_access = { }
297
298        if auth:
299            self.auth = auth
300        else:
301            self.log.error(\
302                    "[access]: No authorizer initialized, creating local one.")
303            auth = authorizer()
304
305
306        if self.ssh_pubkey_file:
307            try:
308                f = open(self.ssh_pubkey_file, 'r')
309                self.ssh_pubkey = f.read()
310                f.close()
311            except IOError:
312                raise service_error(service_error.internal,
313                        "Cannot read sshpubkey")
314        else:
315            raise service_error(service_error.internal, 
316                    "No SSH public key file?")
317
318        if not self.ssh_privkey_file:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322
323        if mapdb_file:
324            self.read_mapdb(mapdb_file)
325        else:
326            self.log.warn("[experiment_control] No testbed map, using defaults")
327            self.tbmap = { 
328                    'deter':'https://users.isi.deterlab.net:23235',
329                    'emulab':'https://users.isi.deterlab.net:23236',
330                    'ucb':'https://users.isi.deterlab.net:23237',
331                    }
332
333        if accessdb_file:
334                self.read_accessdb(accessdb_file)
335        else:
336            raise service_error(service_error.internal,
337                    "No accessdb specified in config")
338
339        # Grab saved state.  OK to do this w/o locking because it's read only
340        # and only one thread should be in existence that can see self.state at
341        # this point.
342        if self.state_filename:
343            self.read_state()
344
345        # Dispatch tables
346        self.soap_services = {\
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354        }
355
356        self.xmlrpc_services = {\
357                'Create': xmlrpc_handler('Create', self.create_experiment),
358                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
359                'Vis': xmlrpc_handler('Vis', self.get_vis),
360                'Info': xmlrpc_handler('Info', self.get_info),
361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
362                'Terminate': xmlrpc_handler('Terminate',
363                    self.terminate_experiment),
364        }
365
366    def copy_file(self, src, dest, size=1024):
367        """
368        Exceedingly simple file copy.
369        """
370        s = open(src,'r')
371        d = open(dest, 'w')
372
373        buf = "x"
374        while buf != "":
375            buf = s.read(size)
376            d.write(buf)
377        s.close()
378        d.close()
379
380    # Call while holding self.state_lock
381    def write_state(self):
382        """
383        Write a new copy of experiment state after copying the existing state
384        to a backup.
385
386        State format is a simple pickling of the state dictionary.
387        """
388        if os.access(self.state_filename, os.W_OK):
389            self.copy_file(self.state_filename, \
390                    "%s.bak" % self.state_filename)
391        try:
392            f = open(self.state_filename, 'w')
393            pickle.dump(self.state, f)
394        except IOError, e:
395            self.log.error("Can't write file %s: %s" % \
396                    (self.state_filename, e))
397        except pickle.PicklingError, e:
398            self.log.error("Pickling problem: %s" % e)
399        except TypeError, e:
400            self.log.error("Pickling problem (TypeError): %s" % e)
401
402    # Call while holding self.state_lock
403    def read_state(self):
404        """
405        Read a new copy of experiment state.  Old state is overwritten.
406
407        State format is a simple pickling of the state dictionary.
408        """
409       
410        def get_experiment_id(state):
411            """
412            Pull the fedid experimentID out of the saved state.  This is kind
413            of a gross walk through the dict.
414            """
415
416            if state.has_key('experimentID'):
417                for e in state['experimentID']:
418                    if e.has_key('fedid'):
419                        return e['fedid']
420                else:
421                    return None
422            else:
423                return None
424
425        def get_alloc_ids(state):
426            """
427            Pull the fedids of the identifiers of each allocation from the
428            state.  Again, a dict dive that's best isolated.
429            """
430
431            return [ f['allocID']['fedid'] 
432                    for f in state.get('federant',[]) \
433                        if f.has_key('allocID') and \
434                            f['allocID'].has_key('fedid')]
435
436
437        try:
438            f = open(self.state_filename, "r")
439            self.state = pickle.load(f)
440            self.log.debug("[read_state]: Read state from %s" % \
441                    self.state_filename)
442        except IOError, e:
443            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
444                    % (self.state_filename, e))
445        except pickle.UnpicklingError, e:
446            self.log.warning(("[read_state]: No saved state: " + \
447                    "Unpickling failed: %s") % e)
448       
449        for s in self.state.values():
450            try:
451
452                eid = get_experiment_id(s)
453                if eid : 
454                    # Give the owner rights to the experiment
455                    self.auth.set_attribute(s['owner'], eid)
456                    # And holders of the eid as well
457                    self.auth.set_attribute(eid, eid)
458                    # allow overrides to control experiments as well
459                    for o in self.overrides:
460                        self.auth.set_attribute(o, eid)
461                    # Set permissions to allow reading of the software repo, if
462                    # any, as well.
463                    for a in get_alloc_ids(s):
464                        self.auth.set_attribute(a, 'repo/%s' % eid)
465                else:
466                    raise KeyError("No experiment id")
467            except KeyError, e:
468                self.log.warning("[read_state]: State ownership or identity " +\
469                        "misformatted in %s: %s" % (self.state_filename, e))
470
471
472    def read_accessdb(self, accessdb_file):
473        """
474        Read the mapping from fedids that can create experiments to their name
475        in the 3-level access namespace.  All will be asserted from this
476        testbed and can include the local username and porject that will be
477        asserted on their behalf by this fedd.  Each fedid is also added to the
478        authorization system with the "create" attribute.
479        """
480        self.accessdb = {}
481        # These are the regexps for parsing the db
482        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
483        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
484                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
485        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
486                "\s*->\s*(" + name_expr + ")\s*$")
487        lineno = 0
488
489        # Parse the mappings and store in self.authdb, a dict of
490        # fedid -> (proj, user)
491        try:
492            f = open(accessdb_file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if len(line) == 0 or line.startswith('#'):
497                    continue
498                m = project_line.match(line)
499                if m:
500                    fid = fedid(hexstr=m.group(1))
501                    project, user = m.group(2,3)
502                    if not self.accessdb.has_key(fid):
503                        self.accessdb[fid] = []
504                    self.accessdb[fid].append((project, user))
505                    continue
506
507                m = user_line.match(line)
508                if m:
509                    fid = fedid(hexstr=m.group(1))
510                    project = None
511                    user = m.group(2)
512                    if not self.accessdb.has_key(fid):
513                        self.accessdb[fid] = []
514                    self.accessdb[fid].append((project, user))
515                    continue
516                self.log.warn("[experiment_control] Error parsing access " +\
517                        "db %s at line %d" %  (accessdb_file, lineno))
518        except IOError:
519            raise service_error(service_error.internal,
520                    "Error opening/reading %s as experiment " +\
521                            "control accessdb" %  accessdb_file)
522        f.close()
523
524        # Initialize the authorization attributes
525        for fid in self.accessdb.keys():
526            self.auth.set_attribute(fid, 'create')
527
528    def read_mapdb(self, file):
529        """
530        Read a simple colon separated list of mappings for the
531        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
532        """
533
534        self.tbmap = { }
535        lineno =0
536        try:
537            f = open(file, "r")
538            for line in f:
539                lineno += 1
540                line = line.strip()
541                if line.startswith('#') or len(line) == 0:
542                    continue
543                try:
544                    label, url = line.split(':', 1)
545                    self.tbmap[label] = url
546                except ValueError, e:
547                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
548                            "map db: %s %s" % (lineno, line, e))
549        except IOError, e:
550            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
551                    "open %s: %s" % (file, e))
552        f.close()
553       
554    def generate_ssh_keys(self, dest, type="rsa" ):
555        """
556        Generate a set of keys for the gateways to use to talk.
557
558        Keys are of type type and are stored in the required dest file.
559        """
560        valid_types = ("rsa", "dsa")
561        t = type.lower();
562        if t not in valid_types: raise ValueError
563        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
564
565        try:
566            trace = open("/dev/null", "w")
567        except IOError:
568            raise service_error(service_error.internal,
569                    "Cannot open /dev/null??");
570
571        # May raise CalledProcessError
572        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
573        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
574        if rv != 0:
575            raise service_error(service_error.internal, 
576                    "Cannot generate nonce ssh keys.  %s return code %d" \
577                            % (self.ssh_keygen, rv))
578
579    def gentopo(self, str):
580        """
581        Generate the topology dtat structure from the splitter's XML
582        representation of it.
583
584        The topology XML looks like:
585            <experiment>
586                <nodes>
587                    <node><vname></vname><ips>ip1:ip2</ips></node>
588                </nodes>
589                <lans>
590                    <lan>
591                        <vname></vname><vnode></vnode><ip></ip>
592                        <bandwidth></bandwidth><member>node:port</member>
593                    </lan>
594                </lans>
595        """
596        class topo_parse:
597            """
598            Parse the topology XML and create the dats structure.
599            """
600            def __init__(self):
601                # Typing of the subelements for data conversion
602                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
603                self.int_subelements = ( 'bandwidth',)
604                self.float_subelements = ( 'delay',)
605                # The final data structure
606                self.nodes = [ ]
607                self.lans =  [ ]
608                self.topo = { \
609                        'node': self.nodes,\
610                        'lan' : self.lans,\
611                    }
612                self.element = { }  # Current element being created
613                self.chars = ""     # Last text seen
614
615            def end_element(self, name):
616                # After each sub element the contents is added to the current
617                # element or to the appropriate list.
618                if name == 'node':
619                    self.nodes.append(self.element)
620                    self.element = { }
621                elif name == 'lan':
622                    self.lans.append(self.element)
623                    self.element = { }
624                elif name in self.str_subelements:
625                    self.element[name] = self.chars
626                    self.chars = ""
627                elif name in self.int_subelements:
628                    self.element[name] = int(self.chars)
629                    self.chars = ""
630                elif name in self.float_subelements:
631                    self.element[name] = float(self.chars)
632                    self.chars = ""
633
634            def found_chars(self, data):
635                self.chars += data.rstrip()
636
637
638        tp = topo_parse();
639        parser = xml.parsers.expat.ParserCreate()
640        parser.EndElementHandler = tp.end_element
641        parser.CharacterDataHandler = tp.found_chars
642
643        parser.Parse(str)
644
645        return tp.topo
646       
647
648    def genviz(self, topo):
649        """
650        Generate the visualization the virtual topology
651        """
652
653        neato = "/usr/local/bin/neato"
654        # These are used to parse neato output and to create the visualization
655        # file.
656        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
657        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
658                "%s</type></node>"
659
660        try:
661            # Node names
662            nodes = [ n['vname'] for n in topo['node'] ]
663            topo_lans = topo['lan']
664        except KeyError, e:
665            raise service_error(service_error.internal, "Bad topology: %s" %e)
666
667        lans = { }
668        links = { }
669
670        # Walk through the virtual topology, organizing the connections into
671        # 2-node connections (links) and more-than-2-node connections (lans).
672        # When a lan is created, it's added to the list of nodes (there's a
673        # node in the visualization for the lan).
674        for l in topo_lans:
675            if links.has_key(l['vname']):
676                if len(links[l['vname']]) < 2:
677                    links[l['vname']].append(l['vnode'])
678                else:
679                    nodes.append(l['vname'])
680                    lans[l['vname']] = links[l['vname']]
681                    del links[l['vname']]
682                    lans[l['vname']].append(l['vnode'])
683            elif lans.has_key(l['vname']):
684                lans[l['vname']].append(l['vnode'])
685            else:
686                links[l['vname']] = [ l['vnode'] ]
687
688
689        # Open up a temporary file for dot to turn into a visualization
690        try:
691            df, dotname = tempfile.mkstemp()
692            dotfile = os.fdopen(df, 'w')
693        except IOError:
694            raise service_error(service_error.internal,
695                    "Failed to open file in genviz")
696
697        try:
698            dnull = open('/dev/null', 'w')
699        except IOError:
700            service_error(service_error.internal,
701                    "Failed to open /dev/null in genviz")
702
703        # Generate a dot/neato input file from the links, nodes and lans
704        try:
705            print >>dotfile, "graph G {"
706            for n in nodes:
707                print >>dotfile, '\t"%s"' % n
708            for l in links.keys():
709                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
710            for l in lans.keys():
711                for n in lans[l]:
712                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
713            print >>dotfile, "}"
714            dotfile.close()
715        except TypeError:
716            raise service_error(service_error.internal,
717                    "Single endpoint link in vtopo")
718        except IOError:
719            raise service_error(service_error.internal, "Cannot write dot file")
720
721        # Use dot to create a visualization
722        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
723                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
724                close_fds=True)
725        dnull.close()
726
727        # Translate dot to vis format
728        vis_nodes = [ ]
729        vis = { 'node': vis_nodes }
730        for line in dot.stdout:
731            m = vis_re.match(line)
732            if m:
733                vn = m.group(1)
734                vis_node = {'name': vn, \
735                        'x': float(m.group(2)),\
736                        'y' : float(m.group(3)),\
737                    }
738                if vn in links.keys() or vn in lans.keys():
739                    vis_node['type'] = 'lan'
740                else:
741                    vis_node['type'] = 'node'
742                vis_nodes.append(vis_node)
743        rv = dot.wait()
744
745        os.remove(dotname)
746        if rv == 0 : return vis
747        else: return None
748
749    def get_access(self, tb, nodes, user, tbparam, master, export_project,
750            access_user):
751        """
752        Get access to testbed through fedd and set the parameters for that tb
753        """
754        uri = self.tbmap.get(tb, None)
755        if not uri:
756            raise service_error(serice_error.server_config, 
757                    "Unknown testbed: %s" % tb)
758
759        # currently this lumps all users into one service access group
760        service_keys = [ a for u in user \
761                for a in u.get('access', []) \
762                    if a.has_key('sshPubkey')]
763
764        if len(service_keys) == 0:
765            raise service_error(service_error.req, 
766                    "Must have at least one SSH pubkey for services")
767
768
769        for p, u in access_user:
770            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
771                    "to %s") %  ((p or "None"), u, uri))
772
773            if p:
774                # Request with user and project specified
775                req = {\
776                        'destinationTestbed' : { 'uri' : uri },
777                        'project': { 
778                            'name': {'localname': p},
779                            'user': [ {'userID': { 'localname': u } } ],
780                            },
781                        'user':  user,
782                        'allocID' : { 'localname': 'test' },
783                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
784                        'serviceAccess' : service_keys
785                    }
786            else:
787                # Request with only user specified
788                req = {\
789                        'destinationTestbed' : { 'uri' : uri },
790                        'user':  [ {'userID': { 'localname': u } } ],
791                        'allocID' : { 'localname': 'test' },
792                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
793                        'serviceAccess' : service_keys
794                    }
795
796            if tb == master:
797                # NB, the export_project parameter is a dict that includes
798                # the type
799                req['exportProject'] = export_project
800
801            # node resources if any
802            if nodes != None and len(nodes) > 0:
803                rnodes = [ ]
804                for n in nodes:
805                    rn = { }
806                    image, hw, count = n.split(":")
807                    if image: rn['image'] = [ image ]
808                    if hw: rn['hardware'] = [ hw ]
809                    if count and int(count) >0 : rn['count'] = int(count)
810                    rnodes.append(rn)
811                req['resources']= { }
812                req['resources']['node'] = rnodes
813
814            try:
815                if self.local_access.has_key(uri):
816                    # Local access call
817                    req = { 'RequestAccessRequestBody' : req }
818                    r = self.local_access[uri].RequestAccess(req, 
819                            fedid(file=self.cert_file))
820                    r = { 'RequestAccessResponseBody' : r }
821                else:
822                    r = self.call_RequestAccess(uri, req, 
823                            self.cert_file, self.cert_pwd, self.trusted_certs)
824            except service_error, e:
825                if e.code == service_error.access:
826                    self.log.debug("[get_access] Access denied")
827                    r = None
828                    continue
829                else:
830                    raise e
831
832            if r.has_key('RequestAccessResponseBody'):
833                # Through to here we have a valid response, not a fault.
834                # Access denied is a fault, so something better or worse than
835                # access denied has happened.
836                r = r['RequestAccessResponseBody']
837                self.log.debug("[get_access] Access granted")
838                break
839            else:
840                raise service_error(service_error.protocol,
841                        "Bad proxy response")
842       
843        if not r:
844            raise service_error(service_error.access, 
845                    "Access denied by %s (%s)" % (tb, uri))
846
847        e = r['emulab']
848        p = e['project']
849        tbparam[tb] = { 
850                "boss": e['boss'],
851                "host": e['ops'],
852                "domain": e['domain'],
853                "fs": e['fileServer'],
854                "eventserver": e['eventServer'],
855                "project": unpack_id(p['name']),
856                "emulab" : e,
857                "allocID" : r['allocID'],
858                }
859        # Make the testbed name be the label the user applied
860        p['testbed'] = {'localname': tb }
861
862        for u in p['user']:
863            role = u.get('role', None)
864            if role == 'experimentCreation':
865                tbparam[tb]['user'] = unpack_id(u['userID'])
866                break
867        else:
868            raise service_error(service_error.internal, 
869                    "No createExperimentUser from %s" %tb)
870
871        # Add attributes to barameter space.  We don't allow attributes to
872        # overlay any parameters already installed.
873        for a in e['fedAttr']:
874            try:
875                if a['attribute'] and isinstance(a['attribute'], basestring)\
876                        and not tbparam[tb].has_key(a['attribute'].lower()):
877                    tbparam[tb][a['attribute'].lower()] = a['value']
878            except KeyError:
879                self.log.error("Bad attribute in response: %s" % a)
880       
881    def release_access(self, tb, aid):
882        """
883        Release access to testbed through fedd
884        """
885
886        uri = self.tbmap.get(tb, None)
887        if not uri:
888            raise service_error(serice_error.server_config, 
889                    "Unknown testbed: %s" % tb)
890
891        if self.local_access.has_key(uri):
892            resp = self.local_access[uri].ReleaseAccess(\
893                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
894                    fedid(file=self.cert_file))
895            resp = { 'ReleaseAccessResponseBody': resp } 
896        else:
897            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
898                    self.cert_file, self.cert_pwd, self.trusted_certs)
899
900        # better error coding
901
902    def remote_splitter(self, uri, desc, master):
903
904        req = {
905                'description' : { 'ns2description': desc },
906                'master': master,
907                'include_fedkit': bool(self.fedkit),
908                'include_gatewaykit': bool(self.gatewaykit)
909            }
910
911        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
912                self.trusted_certs)
913
914        if r.has_key('Ns2SplitResponseBody'):
915            r = r['Ns2SplitResponseBody']
916            if r.has_key('output'):
917                return r['output'].splitlines()
918            else:
919                raise service_error(service_error.protocol, 
920                        "Bad splitter response (no output)")
921        else:
922            raise service_error(service_error.protocol, "Bad splitter response")
923
924    class start_segment:
925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
926                cert_pwd=None, trusted_certs=None, caller=None):
927            self.log = log
928            self.debug = debug
929            self.cert_file = cert_file
930            self.cert_pwd = cert_pwd
931            self.trusted_certs = None
932            self.caller = caller
933            self.testbed = testbed
934
935        def __call__(self, uri, aid, topo, master, attrs=None):
936            req = {
937                    'allocID': { 'fedid' : aid }, 
938                    'segmentdescription': { 
939                        'topdldescription': topo.to_dict(),
940                    },
941                    'master': master,
942                }
943            if attrs:
944                req['fedAttr'] = attrs
945
946            try:
947                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
948                        self.trusted_certs)
949                return True
950            except service_error, e:
951                self.log.error("Start segment failed on %s: %s" % \
952                        (self.testbed, e))
953                return False
954
955
956
957    class terminate_segment:
958        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
959                cert_pwd=None, trusted_certs=None, caller=None):
960            self.log = log
961            self.debug = debug
962            self.cert_file = cert_file
963            self.cert_pwd = cert_pwd
964            self.trusted_certs = None
965            self.caller = caller
966            self.testbed = testbed
967
968        def __call__(self, uri, aid ):
969            print "in terminate_segment: %s" % aid
970            req = {
971                    'allocID': aid , 
972                }
973            try:
974                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
975                        self.trusted_certs)
976                return True
977            except service_error, e:
978                self.log.error("Terminate segment failed on %s: %s" % \
979                        (self.testbed, e))
980                return False
981   
982
983    def allocate_resources(self, allocated, master, eid, expid, expcert, 
984            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
985        started = { }           # Testbeds where a sub-experiment started
986                                # successfully
987
988        # XXX
989        fail_soft = False
990
991        log = alloc_log or self.log
992
993        thread_pool = self.thread_pool(self.nthreads)
994        threads = [ ]
995
996        for tb in [ k for k in allocated.keys() if k != master]:
997            # Create and start a thread to start the segment, and save it to
998            # get the return value later
999            thread_pool.wait_for_slot()
1000            uri = self.tbmap.get(tb, None)
1001            if not uri:
1002                raise service_error(service_error.internal, 
1003                        "Unknown testbed %s !?" % tb)
1004
1005            if tbparams[tb].has_key('allocID') and \
1006                    tbparams[tb]['allocID'].has_key('fedid'):
1007                aid = tbparams[tb]['allocID']['fedid']
1008            else:
1009                raise service_error(service_error.internal, 
1010                        "No alloc id for testbed %s !?" % tb)
1011
1012            t  = self.pooled_thread(\
1013                    target=self.start_segment(log=log, debug=self.debug,
1014                        testbed=tb, cert_file=self.cert_file,
1015                        cert_pwd=self.cert_pwd,
1016                        trusted_certs=self.trusted_certs,
1017                        caller=self.call_StartSegment), 
1018                    args=(uri, aid, topo[tb], False, attrs), name=tb,
1019                    pdata=thread_pool, trace_file=self.trace_file)
1020            threads.append(t)
1021            t.start()
1022
1023        # Wait until all finish
1024        thread_pool.wait_for_all_done()
1025
1026        # If none failed, start the master
1027        failed = [ t.getName() for t in threads if not t.rv ]
1028
1029        if len(failed) == 0:
1030            uri = self.tbmap.get(master, None)
1031            if not uri:
1032                raise service_error(service_error.internal, 
1033                        "Unknown testbed %s !?" % master)
1034
1035            if tbparams[master].has_key('allocID') and \
1036                    tbparams[master]['allocID'].has_key('fedid'):
1037                aid = tbparams[master]['allocID']['fedid']
1038            else:
1039                raise service_error(service_error.internal, 
1040                    "No alloc id for testbed %s !?" % master)
1041            starter = self.start_segment(log=log, debug=self.debug,
1042                    testbed=master, cert_file=self.cert_file,
1043                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1044                    caller=self.call_StartSegment)
1045            if not starter(uri, aid, topo[master], True, attrs):
1046                failed.append(master)
1047
1048        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1049        # If one failed clean up, unless fail_soft is set
1050        if failed and False:
1051            if not fail_soft:
1052                thread_pool.clear()
1053                for tb in succeeded:
1054                    # Create and start a thread to stop the segment
1055                    thread_pool.wait_for_slot()
1056                    t  = self.pooled_thread(\
1057                            target=self.stop_segment(log=log,
1058                                testbed=tb,
1059                                keyfile=self.ssh_privkey_file,
1060                                debug=self.debug), 
1061                            args=(tb, eid, tbparams), name=tb,
1062                            pdata=thread_pool, trace_file=self.trace_file)
1063                    t.start()
1064                # Wait until all finish
1065                thread_pool.wait_for_all_done()
1066
1067                # release the allocations
1068                for tb in tbparams.keys():
1069                    self.release_access(tb, tbparams[tb]['allocID'])
1070                # Remove the placeholder
1071                self.state_lock.acquire()
1072                self.state[eid]['experimentStatus'] = 'failed'
1073                if self.state_filename: self.write_state()
1074                self.state_lock.release()
1075
1076                log.error("Swap in failed on %s" % ",".join(failed))
1077                return
1078        else:
1079            log.info("[start_segment]: Experiment %s active" % eid)
1080
1081        log.debug("[start_experiment]: removing %s" % tmpdir)
1082
1083        # Walk up tmpdir, deleting as we go
1084        for path, dirs, files in os.walk(tmpdir, topdown=False):
1085            for f in files:
1086                os.remove(os.path.join(path, f))
1087            for d in dirs:
1088                os.rmdir(os.path.join(path, d))
1089        os.rmdir(tmpdir)
1090
1091        # Insert the experiment into our state and update the disk copy
1092        self.state_lock.acquire()
1093        self.state[expid]['experimentStatus'] = 'active'
1094        self.state[eid] = self.state[expid]
1095        if self.state_filename: self.write_state()
1096        self.state_lock.release()
1097        return
1098
1099
1100    def create_experiment(self, req, fid):
1101        """
1102        The external interface to experiment creation called from the
1103        dispatcher.
1104
1105        Creates a working directory, splits the incoming description using the
1106        splitter script and parses out the avrious subsections using the
1107        lcasses above.  Once each sub-experiment is created, use pooled threads
1108        to instantiate them and start it all up.
1109        """
1110
1111        def add_kit(e, kit):
1112            """
1113            Add a Software object created from the list of (install, location)
1114            tuples passed as kit  to the software attribute of an object e.  We
1115            do this enough to break out the code, but it's kind of a hack to
1116            avoid changing the old tuple rep.
1117            """
1118
1119            s = [ topdl.Software(install=i, location=l) for i, l in kit]
1120
1121            if isinstance(e.software, list): e.software.extend(s)
1122            else: e.software = s
1123
1124
1125        if not self.auth.check_attribute(fid, 'create'):
1126            raise service_error(service_error.access, "Create access denied")
1127
1128        try:
1129            tmpdir = tempfile.mkdtemp(prefix="split-")
1130        except IOError:
1131            raise service_error(service_error.internal, "Cannot create tmp dir")
1132
1133        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1134        gw_secretkey_base = "fed.%s" % self.ssh_type
1135        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1136        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1137        tclfile = tmpdir + "/experiment.tcl"
1138        tbparams = { }
1139        try:
1140            access_user = self.accessdb[fid]
1141        except KeyError:
1142            raise service_error(service_error.internal,
1143                    "Access map and authorizer out of sync in " + \
1144                            "create_experiment for fedid %s"  % fid)
1145
1146        pid = "dummy"
1147        gid = "dummy"
1148        try:
1149            os.mkdir(tmpdir+"/keys")
1150        except OSError:
1151            raise service_error(service_error.internal,
1152                    "Can't make temporary dir")
1153
1154        req = req.get('CreateRequestBody', None)
1155        if not req:
1156            raise service_error(service_error.req,
1157                    "Bad request format (no CreateRequestBody)")
1158        # The tcl parser needs to read a file so put the content into that file
1159        descr=req.get('experimentdescription', None)
1160        if descr:
1161            file_content=descr.get('ns2description', None)
1162            if file_content:
1163                try:
1164                    f = open(tclfile, 'w')
1165                    f.write(file_content)
1166                    f.close()
1167                except IOError:
1168                    raise service_error(service_error.internal,
1169                            "Cannot write temp experiment description")
1170            else:
1171                raise service_error(service_error.req, 
1172                        "Only ns2descriptions supported")
1173        else:
1174            raise service_error(service_error.req, "No experiment description")
1175
1176        # Generate an ID for the experiment (slice) and a certificate that the
1177        # allocator can use to prove they own it.  We'll ship it back through
1178        # the encrypted connection.
1179        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1180
1181        if req.has_key('experimentID') and \
1182                req['experimentID'].has_key('localname'):
1183            overwrite = False
1184            eid = req['experimentID']['localname']
1185            # If there's an old failed experiment here with the same local name
1186            # and accessible by this user, we'll overwrite it, otherwise we'll
1187            # fall through and do the collision avoidance.
1188            old_expid = self.get_experiment_fedid(eid)
1189            if old_expid and self.check_experiment_access(fid, old_expid):
1190                self.state_lock.acquire()
1191                status = self.state[eid].get('experimentStatus', None)
1192                if status and status == 'failed':
1193                    # remove the old access attribute
1194                    self.auth.unset_attribute(fid, old_expid)
1195                    overwrite = True
1196                    del self.state[eid]
1197                    del self.state[old_expid]
1198                self.state_lock.release()
1199            self.state_lock.acquire()
1200            while (self.state.has_key(eid) and not overwrite):
1201                eid += random.choice(string.ascii_letters)
1202            # Initial state
1203            self.state[eid] = {
1204                    'experimentID' : \
1205                            [ { 'localname' : eid }, {'fedid': expid } ],
1206                    'experimentStatus': 'starting',
1207                    'experimentAccess': { 'X509' : expcert },
1208                    'owner': fid,
1209                    'log' : [],
1210                }
1211            self.state[expid] = self.state[eid]
1212            if self.state_filename: self.write_state()
1213            self.state_lock.release()
1214        else:
1215            eid = self.exp_stem
1216            for i in range(0,5):
1217                eid += random.choice(string.ascii_letters)
1218            self.state_lock.acquire()
1219            while (self.state.has_key(eid)):
1220                eid = self.exp_stem
1221                for i in range(0,5):
1222                    eid += random.choice(string.ascii_letters)
1223            # Initial state
1224            self.state[eid] = {
1225                    'experimentID' : \
1226                            [ { 'localname' : eid }, {'fedid': expid } ],
1227                    'experimentStatus': 'starting',
1228                    'experimentAccess': { 'X509' : expcert },
1229                    'owner': fid,
1230                    'log' : [],
1231                }
1232            self.state[expid] = self.state[eid]
1233            if self.state_filename: self.write_state()
1234            self.state_lock.release()
1235
1236        try: 
1237            # This catches exceptions to clear the placeholder if necessary
1238            try:
1239                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1240            except ValueError:
1241                raise service_error(service_error.server_config, 
1242                        "Bad key type (%s)" % self.ssh_type)
1243
1244            user = req.get('user', None)
1245            if user == None:
1246                raise service_error(service_error.req, "No user")
1247
1248            master = req.get('master', None)
1249            if not master:
1250                raise service_error(service_error.req,
1251                        "No master testbed label")
1252            export_project = req.get('exportProject', None)
1253            if not export_project:
1254                raise service_error(service_error.req, "No export project")
1255           
1256            if self.splitter_url:
1257                self.log.debug("Calling remote splitter at %s" % \
1258                        self.splitter_url)
1259                split_data = self.remote_splitter(self.splitter_url,
1260                        file_content, master)
1261            else:
1262                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1263                    str(self.muxmax), '-m', master]
1264
1265                if self.fedkit:
1266                    tclcmd.append('-k')
1267
1268                if self.gatewaykit:
1269                    tclcmd.append('-K')
1270
1271                tclcmd.extend([pid, gid, eid, tclfile])
1272
1273                self.log.debug("running local splitter %s", " ".join(tclcmd))
1274                # This is just fantastic.  As a side effect the parser copies
1275                # tb_compat.tcl into the current directory, so that directory
1276                # must be writable by the fedd user.  Doing this in the
1277                # temporary subdir ensures this is the case.
1278                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1279                        cwd=tmpdir)
1280                split_data = tclparser.stdout
1281
1282            allocated = { }         # Testbeds we can access
1283            # Allocate IP addresses: The allocator is a buddy system memory
1284            # allocator.  Allocate from the largest substrate to the
1285            # smallest to make the packing more likely to work - i.e.
1286            # avoiding internal fragmentation.
1287            top = topdl.topology_from_xml(file=split_data, top="experiment")
1288            subs = sorted(top.substrates, 
1289                    cmp=lambda x,y: cmp(len(x.interfaces), 
1290                        len(y.interfaces)),
1291                    reverse=True)
1292            ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1293            ifs = { }
1294            hosts = [ ]
1295            # The config urlpath
1296            configpath = "/%s/config" % expid
1297            # The config file system location
1298            configdir ="%s%s" % ( self.repodir, configpath)
1299
1300            for idx, s in enumerate(subs):
1301                a = ips.allocate(len(s.interfaces)+2)
1302                if a :
1303                    base, num = a
1304                    if num < len(s.interfaces) +2 : 
1305                        raise service_error(service_error.internal,
1306                                "Allocator returned wrong number of IPs??")
1307                else:
1308                    raise service_error(service_error.req, 
1309                            "Cannot allocate IP addresses")
1310
1311                base += 1
1312                for i in s.interfaces:
1313                    i.attribute.append(
1314                            topdl.Attribute('ip4_address', 
1315                                "%s" % ip_addr(base)))
1316                    hname = i.element.name[0]
1317                    if ifs.has_key(hname):
1318                        hosts.append("%s\t%s-%s %s-%d" % \
1319                                (ip_addr(base), hname, s.name, hname,
1320                                    ifs[hname]))
1321                    else:
1322                        ifs[hname] = 0
1323                        hosts.append("%s\t%s-%s %s-%d %s" % \
1324                                (ip_addr(base), hname, s.name, hname,
1325                                    ifs[hname], hname))
1326
1327                    ifs[hname] += 1
1328                    base += 1
1329            # save config files
1330            try:
1331                os.makedirs(configdir)
1332            except IOError, e:
1333                raise service_error(
1334                        "Cannot create config directory: %s" % e)
1335            # Find the testbeds to look up
1336            testbeds = set([ a.value for e in top.elements \
1337                    for a in e.attribute \
1338                        if a.attribute == 'testbed'] )
1339
1340
1341            # Make per testbed topologies.  Copy the main topo and remove
1342            # interfaces and nodes that don't live in the testbed.
1343            topo ={ }
1344            for tb in testbeds:
1345                self.get_access(tb, None, user, tbparams, master,
1346                        export_project, access_user)
1347                allocated[tb] = 1
1348                topo[tb] = top.clone()
1349                to_delete = [ ]
1350                for e in topo[tb].elements:
1351                    etb = e.get_attribute('testbed')
1352                    if etb and etb != tb:
1353                        for i in e.interface:
1354                            for s in i.subs:
1355                                try:
1356                                    s.interfaces.remove(i)
1357                                except ValueError:
1358                                    raise service_error(service_error.internal,
1359                                            "Can't remove interface??")
1360                        to_delete.append(e)
1361                for e in to_delete:
1362                    topo[tb].elements.remove(e)
1363                topo[tb].make_indices()
1364
1365                for e in topo[tb].elements:
1366                    if tb == master:
1367                        cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1368                    else:
1369                        cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1370                    scmd = e.get_attribute('startup')
1371                    if scmd:
1372                        cmd = "%s \\$USER '%s'" % (cmd, scmd)
1373
1374                    e.set_attribute('startup', cmd)
1375                    if self.fedkit: add_kit(e, self.fedkit)
1376
1377            # Copy configuration files into the remote file store
1378            try:
1379                f = open("%s/hosts" % configdir, "w")
1380                f.write('\n'.join(hosts))
1381                f.close()
1382            except IOError, e:
1383                raise service_error(service_error.internal, 
1384                        "Cannot write hosts file: %s" % e)
1385            try:
1386                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
1387                        (configdir, gw_pubkey_base))
1388                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
1389                        (configdir, gw_secretkey_base))
1390            except IOError, e:
1391                raise service_error(service_error.internal, 
1392                        "Cannot copy keyfiles: %s" % e)
1393
1394            # Allow the individual testbeds to access the configuration files.
1395            for tb in tbparams.keys():
1396                asignee = tbparams[tb]['allocID']['fedid']
1397                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1398                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1399                    print "assigned %s/%s" % (configpath, f)
1400
1401            # Now, for each substrate in the main topology, find those that
1402            # have nodes on more than one testbed.  Insert portal nodes
1403            # into the copies of those substrates on the sub topologies.
1404            for s in top.substrates:
1405                # tbs will contain an ip address on this subsrate that is in
1406                # each testbed.
1407                tbs = { }
1408                for i in s.interfaces:
1409                    e = i.element
1410                    tb = e.get_attribute('testbed')
1411                    if tb and not tbs.has_key(tb):
1412                        for i in e.interface:
1413                            if s in i.subs:
1414                                tbs[tb]= i.get_attribute('ip4_address')
1415                if len(tbs) < 2:
1416                    continue
1417
1418                # More than one testbed is on this substrate.  Insert
1419                # some portals into the subtopologies.  st == source testbed,
1420                # dt == destination testbed.
1421                segment_substrate = { }
1422                for st in tbs.keys():
1423                    segment_substrate[st] = { }
1424                    for dt in [ t for t in tbs.keys() if t != st]:
1425                        myname =  "%stunnel" % dt
1426                        desthost  =  "%stunnel" % st
1427                        sproject = tbparams[st].get('project', 'project')
1428                        dproject = tbparams[dt].get('project', 'project')
1429                        mproject = tbparams[master].get('project', 'project')
1430                        sdomain = tbparams[st].get('domain', ".example.com")
1431                        ddomain = tbparams[dt].get('domain', ".example.com")
1432                        mdomain = tbparams[master].get('domain', '.example.com')
1433                        muser = tbparams[master].get('user', 'root')
1434                        smbshare = tbparams[master].get('smbshare', 'USERS')
1435                        # XXX: active and type need to be unkludged
1436                        active = ("%s" % (st == master))
1437                        if not segment_substrate[st].has_key(dt):
1438                            # Put a substrate and a segment for the connected
1439                            # testbed in there.
1440                            tsubstrate = \
1441                                    topdl.Substrate(name='%s-%s' % (st, dt),
1442                                            attribute= [
1443                                                topdl.Attribute(
1444                                                    attribute='portal',
1445                                                    value='true')
1446                                                ]
1447                                            )
1448                            segment_element = topdl.Segment(
1449                                    id= tbparams[dt]['allocID'],
1450                                    type='emulab',
1451                                    uri = self.tbmap.get(dt, None),
1452                                    interface=[ 
1453                                        topdl.Interface(
1454                                            substrate=tsubstrate.name),
1455                                        ],
1456                                    attribute = [
1457                                        topdl.Attribute(attribute=n, value=v)
1458                                            for n, v in (\
1459                                                ('domain', ddomain),
1460                                                ('experiment', "%s/%s" % \
1461                                                        (dproject, eid)),)
1462                                        ],
1463                                    )
1464                            segment_substrate[st][dt] = tsubstrate
1465                            topo[st].substrates.append(tsubstrate)
1466                            topo[st].elements.append(segment_element)
1467                        portal = topdl.Computer(
1468                                name="%stunnel" % dt,
1469                                attribute=[ 
1470                                    topdl.Attribute(attribute=n,value=v)
1471                                        for n, v in (\
1472                                            ('portal', 'true'),
1473                                            ('domain', sdomain),
1474                                            ('masterdomain', mdomain),
1475                                            ('masterexperiment', "%s/%s" % \
1476                                                    (mproject, eid)),
1477                                            ('masteruser', muser),
1478                                            ('smbshare', smbshare),
1479                                            ('experiment', "%s/%s" % \
1480                                                    (sproject, eid)),
1481                                            ('peer', "%s" % desthost),
1482                                            ('peer_segment', "%s" % \
1483                                                    tbparams[dt]['allocID']['fedid']),
1484                                            ('scriptdir', 
1485                                                "/usr/local/federation/bin"),
1486                                            ('active', "%s" % active),
1487                                            ('portal_type', 'both'), 
1488                                            ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1489                                    ],
1490                                interface=[
1491                                    topdl.Interface(
1492                                        substrate=s.name,
1493                                        attribute=[ 
1494                                            topdl.Attribute(
1495                                                attribute='ip4_address', 
1496                                                value=tbs[dt]
1497                                            )
1498                                        ]),
1499                                    topdl.Interface(
1500                                        substrate=\
1501                                            segment_substrate[st][dt].name,
1502                                        attribute=[
1503                                            topdl.Attribute(attribute='portal',
1504                                                value='true')
1505                                            ]
1506                                        ),
1507                                    ],
1508                                )
1509                        if self.fedkit: add_kit(portal, self.fedkit)
1510                        if self.gatewaykit: add_kit(portal, self.gatewaykit)
1511
1512                        topo[st].elements.append(portal)
1513
1514            # Connect the gateway nodes into the topologies and clear out
1515            # substrates that are not in the topologies
1516            for tb in testbeds:
1517                topo[tb].incorporate_elements()
1518                topo[tb].substrates = \
1519                        [s for s in topo[tb].substrates \
1520                            if len(s.interfaces) >0]
1521
1522            # Copy the rpms and tarfiles to a distribution directory from
1523            # which the federants can retrieve them
1524            linkpath = "%s/software" %  expid
1525            softdir ="%s/%s" % ( self.repodir, linkpath)
1526            softmap = { }
1527            pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1528                    for p, t in l ])
1529            pkgs.update([x.location for e in top.elements \
1530                    for x in e.software])
1531            try:
1532                os.makedirs(softdir)
1533            except IOError, e:
1534                raise service_error(
1535                        "Cannot create software directory: %s" % e)
1536            for pkg in pkgs:
1537                loc = pkg
1538
1539                scheme, host, path = urlparse(loc)[0:3]
1540                dest = os.path.basename(path)
1541                if not scheme:
1542                    if not loc.startswith('/'):
1543                        loc = "/%s" % loc
1544                    loc = "file://%s" %loc
1545                try:
1546                    u = urlopen(loc)
1547                except Exception, e:
1548                    raise service_error(service_error.req, 
1549                            "Cannot open %s: %s" % (loc, e))
1550                try:
1551                    f = open("%s/%s" % (softdir, dest) , "w")
1552                    self.log.debug("Writing %s/%s" % (softdir,dest) )
1553                    data = u.read(4096)
1554                    while data:
1555                        f.write(data)
1556                        data = u.read(4096)
1557                    f.close()
1558                    u.close()
1559                except Exception, e:
1560                    raise service_error(service_error.internal,
1561                            "Could not copy %s: %s" % (loc, e))
1562                path = re.sub("/tmp", "", linkpath)
1563                # XXX
1564                softmap[pkg] = \
1565                        "https://users.isi.deterlab.net:23232/%s/%s" %\
1566                        ( path, dest)
1567
1568                # Allow the individual testbeds to access the software.
1569                for tb in tbparams.keys():
1570                    self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1571                            "/%s/%s" % ( path, dest))
1572
1573            # Convert the software locations in the segments into the local
1574            # copies on this host
1575            for soft in [ s for tb in topo.values() \
1576                    for e in tb.elements \
1577                        if getattr(e, 'software', False) \
1578                            for s in e.software ]:
1579                if softmap.has_key(soft.location):
1580                    soft.location = softmap[soft.location]
1581
1582            vtopo = topdl.topology_to_vtopo(top)
1583            vis = self.genviz(vtopo)
1584
1585            # save federant information
1586            for k in allocated.keys():
1587                tbparams[k]['federant'] = {\
1588                        'name': [ { 'localname' : eid} ],\
1589                        'emulab': tbparams[k]['emulab'],\
1590                        'allocID' : tbparams[k]['allocID'],\
1591                        'master' : k == master,\
1592                    }
1593
1594            self.state_lock.acquire()
1595            self.state[eid]['vtopo'] = vtopo
1596            self.state[eid]['vis'] = vis
1597            self.state[expid]['federant'] = \
1598                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1599                        if tbparams[tb].has_key('federant') ]
1600            if self.state_filename: 
1601                self.write_state()
1602            self.state_lock.release()
1603        except service_error, e:
1604            # If something goes wrong in the parse (usually an access error)
1605            # clear the placeholder state.  From here on out the code delays
1606            # exceptions.  Failing at this point returns a fault to the remote
1607            # caller.
1608
1609            self.state_lock.acquire()
1610            del self.state[eid]
1611            del self.state[expid]
1612            if self.state_filename: self.write_state()
1613            self.state_lock.release()
1614            raise e
1615
1616
1617        # Start the background swapper and return the starting state.  From
1618        # here on out, the state will stick around a while.
1619
1620        # Let users touch the state
1621        self.auth.set_attribute(fid, expid)
1622        self.auth.set_attribute(expid, expid)
1623        # Override fedids can manipulate state as well
1624        for o in self.overrides:
1625            self.auth.set_attribute(o, expid)
1626
1627        # Create a logger that logs to the experiment's state object as well as
1628        # to the main log file.
1629        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1630        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
1631        # XXX: there should be a global one of these rather than repeating the
1632        # code.
1633        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1634                    '%d %b %y %H:%M:%S'))
1635        alloc_log.addHandler(h)
1636       
1637        # XXX
1638        url_base = 'https://users.isi.deterlab.net:23232'
1639        attrs = [ 
1640                {
1641                    'attribute': 'ssh_pubkey', 
1642                    'value': '%s/%s/config/%s' % \
1643                            (url_base, expid, gw_pubkey_base)
1644                },
1645                {
1646                    'attribute': 'ssh_secretkey', 
1647                    'value': '%s/%s/config/%s' % \
1648                            (url_base, expid, gw_secretkey_base)
1649                },
1650                {
1651                    'attribute': 'hosts', 
1652                    'value': '%s/%s/config/hosts' % \
1653                            (url_base, expid)
1654                },
1655                {
1656                    'attribute': 'experiment_name',
1657                    'value': eid,
1658                },
1659            ]
1660
1661        # Start a thread to do the resource allocation
1662        t  = Thread(target=self.allocate_resources,
1663                args=(allocated, master, eid, expid, expcert, tbparams, 
1664                    topo, tmpdir, alloc_log, attrs),
1665                name=eid)
1666        t.start()
1667
1668        rv = {
1669                'experimentID': [
1670                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1671                ],
1672                'experimentStatus': 'starting',
1673                'experimentAccess': { 'X509' : expcert }
1674            }
1675
1676        return rv
1677   
1678    def get_experiment_fedid(self, key):
1679        """
1680        find the fedid associated with the localname key in the state database.
1681        """
1682
1683        rv = None
1684        self.state_lock.acquire()
1685        if self.state.has_key(key):
1686            if isinstance(self.state[key], dict):
1687                try:
1688                    kl = [ f['fedid'] for f in \
1689                            self.state[key]['experimentID']\
1690                                if f.has_key('fedid') ]
1691                except KeyError:
1692                    self.state_lock.release()
1693                    raise service_error(service_error.internal, 
1694                            "No fedid for experiment %s when getting "+\
1695                                    "fedid(!?)" % key)
1696                if len(kl) == 1:
1697                    rv = kl[0]
1698                else:
1699                    self.state_lock.release()
1700                    raise service_error(service_error.internal, 
1701                            "multiple fedids for experiment %s when " +\
1702                                    "getting fedid(!?)" % key)
1703            else:
1704                self.state_lock.release()
1705                raise service_error(service_error.internal, 
1706                        "Unexpected state for %s" % key)
1707        self.state_lock.release()
1708        return rv
1709
1710    def check_experiment_access(self, fid, key):
1711        """
1712        Confirm that the fid has access to the experiment.  Though a request
1713        may be made in terms of a local name, the access attribute is always
1714        the experiment's fedid.
1715        """
1716        if not isinstance(key, fedid):
1717            key = self.get_experiment_fedid(key)
1718
1719        if self.auth.check_attribute(fid, key):
1720            return True
1721        else:
1722            raise service_error(service_error.access, "Access Denied")
1723
1724
1725    def get_handler(self, path, fid):
1726        print "%s" %  path
1727        if self.auth.check_attribute(fid, path):
1728            return ("%s/%s" % (self.repodir, path), "application/binary")
1729        else:
1730            return (None, None)
1731
1732    def get_vtopo(self, req, fid):
1733        """
1734        Return the stored virtual topology for this experiment
1735        """
1736        rv = None
1737        state = None
1738
1739        req = req.get('VtopoRequestBody', None)
1740        if not req:
1741            raise service_error(service_error.req,
1742                    "Bad request format (no VtopoRequestBody)")
1743        exp = req.get('experiment', None)
1744        if exp:
1745            if exp.has_key('fedid'):
1746                key = exp['fedid']
1747                keytype = "fedid"
1748            elif exp.has_key('localname'):
1749                key = exp['localname']
1750                keytype = "localname"
1751            else:
1752                raise service_error(service_error.req, "Unknown lookup type")
1753        else:
1754            raise service_error(service_error.req, "No request?")
1755
1756        self.check_experiment_access(fid, key)
1757
1758        self.state_lock.acquire()
1759        if self.state.has_key(key):
1760            if self.state[key].has_key('vtopo'):
1761                rv = { 'experiment' : {keytype: key },\
1762                        'vtopo': self.state[key]['vtopo'],\
1763                    }
1764            else:
1765                state = self.state[key]['experimentStatus']
1766        self.state_lock.release()
1767
1768        if rv: return rv
1769        else: 
1770            if state:
1771                raise service_error(service_error.partial, 
1772                        "Not ready: %s" % state)
1773            else:
1774                raise service_error(service_error.req, "No such experiment")
1775
1776    def get_vis(self, req, fid):
1777        """
1778        Return the stored visualization for this experiment
1779        """
1780        rv = None
1781        state = None
1782
1783        req = req.get('VisRequestBody', None)
1784        if not req:
1785            raise service_error(service_error.req,
1786                    "Bad request format (no VisRequestBody)")
1787        exp = req.get('experiment', None)
1788        if exp:
1789            if exp.has_key('fedid'):
1790                key = exp['fedid']
1791                keytype = "fedid"
1792            elif exp.has_key('localname'):
1793                key = exp['localname']
1794                keytype = "localname"
1795            else:
1796                raise service_error(service_error.req, "Unknown lookup type")
1797        else:
1798            raise service_error(service_error.req, "No request?")
1799
1800        self.check_experiment_access(fid, key)
1801
1802        self.state_lock.acquire()
1803        if self.state.has_key(key):
1804            if self.state[key].has_key('vis'):
1805                rv =  { 'experiment' : {keytype: key },\
1806                        'vis': self.state[key]['vis'],\
1807                        }
1808            else:
1809                state = self.state[key]['experimentStatus']
1810        self.state_lock.release()
1811
1812        if rv: return rv
1813        else:
1814            if state:
1815                raise service_error(service_error.partial, 
1816                        "Not ready: %s" % state)
1817            else:
1818                raise service_error(service_error.req, "No such experiment")
1819
1820    def clean_info_response(self, rv):
1821        """
1822        Remove the information in the experiment's state object that is not in
1823        the info response.
1824        """
1825        # Remove the owner info (should always be there, but...)
1826        if rv.has_key('owner'): del rv['owner']
1827
1828        # Convert the log into the allocationLog parameter and remove the
1829        # log entry (with defensive programming)
1830        if rv.has_key('log'):
1831            rv['allocationLog'] = "".join(rv['log'])
1832            del rv['log']
1833        else:
1834            rv['allocationLog'] = ""
1835
1836        if rv['experimentStatus'] != 'active':
1837            if rv.has_key('federant'): del rv['federant']
1838        else:
1839            # remove the allocationID info from each federant
1840            for f in rv.get('federant', []):
1841                if f.has_key('allocID'): del f['allocID']
1842        return rv
1843
1844    def get_info(self, req, fid):
1845        """
1846        Return all the stored info about this experiment
1847        """
1848        rv = None
1849
1850        req = req.get('InfoRequestBody', None)
1851        if not req:
1852            raise service_error(service_error.req,
1853                    "Bad request format (no InfoRequestBody)")
1854        exp = req.get('experiment', None)
1855        if exp:
1856            if exp.has_key('fedid'):
1857                key = exp['fedid']
1858                keytype = "fedid"
1859            elif exp.has_key('localname'):
1860                key = exp['localname']
1861                keytype = "localname"
1862            else:
1863                raise service_error(service_error.req, "Unknown lookup type")
1864        else:
1865            raise service_error(service_error.req, "No request?")
1866
1867        self.check_experiment_access(fid, key)
1868
1869        # The state may be massaged by the service function that called
1870        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1871        # state.
1872        self.state_lock.acquire()
1873        if self.state.has_key(key):
1874            rv = copy.deepcopy(self.state[key])
1875        self.state_lock.release()
1876
1877        if rv:
1878            return self.clean_info_response(rv)
1879        else:
1880            raise service_error(service_error.req, "No such experiment")
1881
1882    def get_multi_info(self, req, fid):
1883        """
1884        Return all the stored info that this fedid can access
1885        """
1886        rv = { 'info': [ ] }
1887
1888        self.state_lock.acquire()
1889        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
1890            self.check_experiment_access(fid, key)
1891
1892            if self.state.has_key(key):
1893                e = copy.deepcopy(self.state[key])
1894                e = self.clean_info_response(e)
1895                rv['info'].append(e)
1896        self.state_lock.release()
1897        return rv
1898
1899    def terminate_experiment(self, req, fid):
1900        """
1901        Swap this experiment out on the federants and delete the shared
1902        information
1903        """
1904        tbparams = { }
1905        req = req.get('TerminateRequestBody', None)
1906        if not req:
1907            raise service_error(service_error.req,
1908                    "Bad request format (no TerminateRequestBody)")
1909        force = req.get('force', False)
1910        exp = req.get('experiment', None)
1911        if exp:
1912            if exp.has_key('fedid'):
1913                key = exp['fedid']
1914                keytype = "fedid"
1915            elif exp.has_key('localname'):
1916                key = exp['localname']
1917                keytype = "localname"
1918            else:
1919                raise service_error(service_error.req, "Unknown lookup type")
1920        else:
1921            raise service_error(service_error.req, "No request?")
1922
1923        self.check_experiment_access(fid, key)
1924
1925        dealloc_list = [ ]
1926
1927
1928        # Create a logger that logs to the dealloc_list as well as to the main
1929        # log file.
1930        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
1931        h = logging.StreamHandler(self.list_log(dealloc_list))
1932        # XXX: there should be a global one of these rather than repeating the
1933        # code.
1934        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1935                    '%d %b %y %H:%M:%S'))
1936        dealloc_log.addHandler(h)
1937
1938        self.state_lock.acquire()
1939        fed_exp = self.state.get(key, None)
1940
1941        if fed_exp:
1942            # This branch of the conditional holds the lock to generate a
1943            # consistent temporary tbparams variable to deallocate experiments.
1944            # It releases the lock to do the deallocations and reacquires it to
1945            # remove the experiment state when the termination is complete.
1946
1947            # First make sure that the experiment creation is complete.
1948            status = fed_exp.get('experimentStatus', None)
1949
1950            if status:
1951                if status in ('starting', 'terminating'):
1952                    if not force:
1953                        self.state_lock.release()
1954                        raise service_error(service_error.partial, 
1955                                'Experiment still being created or destroyed')
1956                    else:
1957                        self.log.warning('Experiment in %s state ' % status + \
1958                                'being terminated by force.')
1959            else:
1960                # No status??? trouble
1961                self.state_lock.release()
1962                raise service_error(service_error.internal,
1963                        "Experiment has no status!?")
1964
1965            ids = []
1966            #  experimentID is a list of dicts that are self-describing
1967            #  identifiers.  This finds all the fedids and localnames - the
1968            #  keys of self.state - and puts them into ids.
1969            for id in fed_exp.get('experimentID', []):
1970                if id.has_key('fedid'): ids.append(id['fedid'])
1971                if id.has_key('localname'): ids.append(id['localname'])
1972
1973            # Collect the allocation/segment ids
1974            for fed in fed_exp.get('federant', []):
1975                try:
1976                    print "looking at %s" % fed
1977                    tb = fed['emulab']['project']['testbed']['localname']
1978                    aid = fed['allocID']
1979                except KeyError, e:
1980                    print "Key error: %s" %e
1981                    continue
1982                tbparams[tb] = aid
1983            fed_exp['experimentStatus'] = 'terminating'
1984            if self.state_filename: self.write_state()
1985            self.state_lock.release()
1986
1987            # Stop everyone.  NB, wait_for_all waits until a thread starts and
1988            # then completes, so we can't wait if nothing starts.  So, no
1989            # tbparams, no start.
1990            if len(tbparams) > 0:
1991                thread_pool = self.thread_pool(self.nthreads)
1992                for tb in tbparams.keys():
1993                    # Create and start a thread to stop the segment
1994                    thread_pool.wait_for_slot()
1995                    uri = self.tbmap.get(tb, None)
1996                    t  = self.pooled_thread(\
1997                            target=self.terminate_segment(log=dealloc_log,
1998                                testbed=tb,
1999                                cert_file=self.cert_file, 
2000                                cert_pwd=self.cert_pwd,
2001                                trusted_certs=self.trusted_certs,
2002                                caller=self.call_TerminateSegment),
2003                            args=(uri, tbparams[tb]), name=tb,
2004                            pdata=thread_pool, trace_file=self.trace_file)
2005                    t.start()
2006                # Wait for completions
2007                thread_pool.wait_for_all_done()
2008
2009            # release the allocations (failed experiments have done this
2010            # already, and starting experiments may be in odd states, so we
2011            # ignore errors releasing those allocations
2012            try: 
2013                for tb in tbparams.keys():
2014                    self.release_access(tb, tbparams[tb])
2015            except service_error, e:
2016                if status != 'failed' and not force:
2017                    raise e
2018
2019            # Remove the terminated experiment
2020            self.state_lock.acquire()
2021            for id in ids:
2022                if self.state.has_key(id): del self.state[id]
2023
2024            if self.state_filename: self.write_state()
2025            self.state_lock.release()
2026
2027            return { 
2028                    'experiment': exp , 
2029                    'deallocationLog': "".join(dealloc_list),
2030                    }
2031        else:
2032            # Don't forget to release the lock
2033            self.state_lock.release()
2034            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.