source: fedd/federation/experiment_control.py @ 895a133

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 895a133 was 895a133, checked in by Ted Faber <faber@…>, 15 years ago

Better modularity of experiment creation.

  • Property mode set to 100644
File size: 73.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32from ip_allocator import ip_allocator
33from ip_addr import ip_addr
34
35
36class nullHandler(logging.Handler):
37    def emit(self, record): pass
38
39fl = logging.getLogger("fedd.experiment_control")
40fl.addHandler(nullHandler())
41
42class experiment_control_local:
43    """
44    Control of experiments that this system can directly access.
45
46    Includes experiment creation, termination and information dissemination.
47    Thred safe.
48    """
49
50    class ssh_cmd_timeout(RuntimeError): pass
51
52    class list_log:
53        """
54        Provide an interface that lets logger.StreamHandler s write to a list
55        of strings.
56        """
57        def __init__(self, l=[]):
58            """
59            Link to an existing list or just create a log
60            """
61            self.ll = l
62            self.lock = Lock()
63        def write(self, str):
64            """
65            Add the string to the log.  Lock for consistency.
66            """
67            self.lock.acquire()
68            self.ll.append(str)
69            self.lock.release()
70
71        def flush(self):
72            """
73            No-op that StreamHandlers expect
74            """
75            pass
76
77   
78    class thread_pool:
79        """
80        A class to keep track of a set of threads all invoked for the same
81        task.  Manages the mutual exclusion of the states.
82        """
83        def __init__(self, nthreads):
84            """
85            Start a pool.
86            """
87            self.changed = Condition()
88            self.started = 0
89            self.terminated = 0
90            self.nthreads = nthreads
91
92        def acquire(self):
93            """
94            Get the pool's lock.
95            """
96            self.changed.acquire()
97
98        def release(self):
99            """
100            Release the pool's lock.
101            """
102            self.changed.release()
103
104        def wait(self, timeout = None):
105            """
106            Wait for a pool thread to start or stop.
107            """
108            self.changed.wait(timeout)
109
110        def start(self):
111            """
112            Called by a pool thread to report starting.
113            """
114            self.changed.acquire()
115            self.started += 1
116            self.changed.notifyAll()
117            self.changed.release()
118
119        def terminate(self):
120            """
121            Called by a pool thread to report finishing.
122            """
123            self.changed.acquire()
124            self.terminated += 1
125            self.changed.notifyAll()
126            self.changed.release()
127
128        def clear(self):
129            """
130            Clear all pool data.
131            """
132            self.changed.acquire()
133            self.started = 0
134            self.terminated =0
135            self.changed.notifyAll()
136            self.changed.release()
137
138        def wait_for_slot(self):
139            """
140            Wait until we have a free slot to start another pooled thread
141            """
142            self.acquire()
143            while self.started - self.terminated >= self.nthreads:
144                self.wait()
145            self.release()
146
147        def wait_for_all_done(self):
148            """
149            Wait until all active threads finish (and at least one has started)
150            """
151            self.acquire()
152            while self.started == 0 or self.started > self.terminated:
153                self.wait()
154            self.release()
155
156    class pooled_thread(Thread):
157        """
158        One of a set of threads dedicated to a specific task.  Uses the
159        thread_pool class above for coordination.
160        """
161        def __init__(self, group=None, target=None, name=None, args=(), 
162                kwargs={}, pdata=None, trace_file=None):
163            Thread.__init__(self, group, target, name, args, kwargs)
164            self.rv = None          # Return value of the ops in this thread
165            self.exception = None   # Exception that terminated this thread
166            self.target=target      # Target function to run on start()
167            self.args = args        # Args to pass to target
168            self.kwargs = kwargs    # Additional kw args
169            self.pdata = pdata      # thread_pool for this class
170            # Logger for this thread
171            self.log = logging.getLogger("fedd.experiment_control")
172       
173        def run(self):
174            """
175            Emulate Thread.run, except add pool data manipulation and error
176            logging.
177            """
178            if self.pdata:
179                self.pdata.start()
180
181            if self.target:
182                try:
183                    self.rv = self.target(*self.args, **self.kwargs)
184                except service_error, s:
185                    self.exception = s
186                    self.log.error("Thread exception: %s %s" % \
187                            (s.code_string(), s.desc))
188                except:
189                    self.exception = sys.exc_info()[1]
190                    self.log.error(("Unexpected thread exception: %s" +\
191                            "Trace %s") % (self.exception,\
192                                traceback.format_exc()))
193            if self.pdata:
194                self.pdata.terminate()
195
196    call_RequestAccess = service_caller('RequestAccess')
197    call_ReleaseAccess = service_caller('ReleaseAccess')
198    call_StartSegment = service_caller('StartSegment')
199    call_TerminateSegment = service_caller('TerminateSegment')
200    call_Ns2Split = service_caller('Ns2Split')
201
202    def __init__(self, config=None, auth=None):
203        """
204        Intialize the various attributes, most from the config object
205        """
206
207        def parse_tarfile_list(tf):
208            """
209            Parse a tarfile list from the configuration.  This is a set of
210            paths and tarfiles separated by spaces.
211            """
212            rv = [ ]
213            if tf is not None:
214                tl = tf.split()
215                while len(tl) > 1:
216                    p, t = tl[0:2]
217                    del tl[0:2]
218                    rv.append((p, t))
219            return rv
220
221        self.thread_with_rv = experiment_control_local.pooled_thread
222        self.thread_pool = experiment_control_local.thread_pool
223        self.list_log = experiment_control_local.list_log
224
225        self.cert_file = config.get("experiment_control", "cert_file")
226        if self.cert_file:
227            self.cert_pwd = config.get("experiment_control", "cert_pwd")
228        else:
229            self.cert_file = config.get("globals", "cert_file")
230            self.cert_pwd = config.get("globals", "cert_pwd")
231
232        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
233                or config.get("globals", "trusted_certs")
234
235        self.repodir = config.get("experiment_control", "repodir")
236
237        self.exp_stem = "fed-stem"
238        self.log = logging.getLogger("fedd.experiment_control")
239        set_log_level(config, "experiment_control", self.log)
240        self.muxmax = 2
241        self.nthreads = 2
242        self.randomize_experiments = False
243
244        self.splitter = None
245        self.ssh_keygen = "/usr/bin/ssh-keygen"
246        self.ssh_identity_file = None
247
248
249        self.debug = config.getboolean("experiment_control", "create_debug")
250        self.state_filename = config.get("experiment_control", 
251                "experiment_state")
252        self.splitter_url = config.get("experiment_control", "splitter_uri")
253        self.fedkit = parse_tarfile_list(\
254                config.get("experiment_control", "fedkit"))
255        self.gatewaykit = parse_tarfile_list(\
256                config.get("experiment_control", "gatewaykit"))
257        accessdb_file = config.get("experiment_control", "accessdb")
258
259        self.ssh_pubkey_file = config.get("experiment_control", 
260                "ssh_pubkey_file")
261        self.ssh_privkey_file = config.get("experiment_control",
262                "ssh_privkey_file")
263        # NB for internal master/slave ops, not experiment setup
264        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
265
266        self.overrides = set([])
267        ovr = config.get('experiment_control', 'overrides')
268        if ovr:
269            for o in ovr.split(","):
270                o = o.strip()
271                if o.startswith('fedid:'): o = o[len('fedid:'):]
272                self.overrides.add(fedid(hexstr=o))
273
274        self.state = { }
275        self.state_lock = Lock()
276        self.tclsh = "/usr/local/bin/otclsh"
277        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
278                config.get("experiment_control", "tcl_splitter",
279                        "/usr/testbed/lib/ns2ir/parse.tcl")
280        mapdb_file = config.get("experiment_control", "mapdb")
281        self.trace_file = sys.stderr
282
283        self.def_expstart = \
284                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
285                "/tmp/federate";
286        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
287                "FEDDIR/hosts";
288        self.def_gwstart = \
289                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
290                "/tmp/bridge.log";
291        self.def_mgwstart = \
292                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
293                "/tmp/bridge.log";
294        self.def_gwimage = "FBSD61-TUNNEL2";
295        self.def_gwtype = "pc";
296        self.local_access = { }
297
298        if auth:
299            self.auth = auth
300        else:
301            self.log.error(\
302                    "[access]: No authorizer initialized, creating local one.")
303            auth = authorizer()
304
305
306        if self.ssh_pubkey_file:
307            try:
308                f = open(self.ssh_pubkey_file, 'r')
309                self.ssh_pubkey = f.read()
310                f.close()
311            except IOError:
312                raise service_error(service_error.internal,
313                        "Cannot read sshpubkey")
314        else:
315            raise service_error(service_error.internal, 
316                    "No SSH public key file?")
317
318        if not self.ssh_privkey_file:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322
323        if mapdb_file:
324            self.read_mapdb(mapdb_file)
325        else:
326            self.log.warn("[experiment_control] No testbed map, using defaults")
327            self.tbmap = { 
328                    'deter':'https://users.isi.deterlab.net:23235',
329                    'emulab':'https://users.isi.deterlab.net:23236',
330                    'ucb':'https://users.isi.deterlab.net:23237',
331                    }
332
333        if accessdb_file:
334                self.read_accessdb(accessdb_file)
335        else:
336            raise service_error(service_error.internal,
337                    "No accessdb specified in config")
338
339        # Grab saved state.  OK to do this w/o locking because it's read only
340        # and only one thread should be in existence that can see self.state at
341        # this point.
342        if self.state_filename:
343            self.read_state()
344
345        # Dispatch tables
346        self.soap_services = {\
347                'Create': soap_handler('Create', self.create_experiment),
348                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
349                'Vis': soap_handler('Vis', self.get_vis),
350                'Info': soap_handler('Info', self.get_info),
351                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
352                'Terminate': soap_handler('Terminate', 
353                    self.terminate_experiment),
354        }
355
356        self.xmlrpc_services = {\
357                'Create': xmlrpc_handler('Create', self.create_experiment),
358                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
359                'Vis': xmlrpc_handler('Vis', self.get_vis),
360                'Info': xmlrpc_handler('Info', self.get_info),
361                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
362                'Terminate': xmlrpc_handler('Terminate',
363                    self.terminate_experiment),
364        }
365
366    def copy_file(self, src, dest, size=1024):
367        """
368        Exceedingly simple file copy.
369        """
370        s = open(src,'r')
371        d = open(dest, 'w')
372
373        buf = "x"
374        while buf != "":
375            buf = s.read(size)
376            d.write(buf)
377        s.close()
378        d.close()
379
380    # Call while holding self.state_lock
381    def write_state(self):
382        """
383        Write a new copy of experiment state after copying the existing state
384        to a backup.
385
386        State format is a simple pickling of the state dictionary.
387        """
388        if os.access(self.state_filename, os.W_OK):
389            self.copy_file(self.state_filename, \
390                    "%s.bak" % self.state_filename)
391        try:
392            f = open(self.state_filename, 'w')
393            pickle.dump(self.state, f)
394        except IOError, e:
395            self.log.error("Can't write file %s: %s" % \
396                    (self.state_filename, e))
397        except pickle.PicklingError, e:
398            self.log.error("Pickling problem: %s" % e)
399        except TypeError, e:
400            self.log.error("Pickling problem (TypeError): %s" % e)
401
402    # Call while holding self.state_lock
403    def read_state(self):
404        """
405        Read a new copy of experiment state.  Old state is overwritten.
406
407        State format is a simple pickling of the state dictionary.
408        """
409       
410        def get_experiment_id(state):
411            """
412            Pull the fedid experimentID out of the saved state.  This is kind
413            of a gross walk through the dict.
414            """
415
416            if state.has_key('experimentID'):
417                for e in state['experimentID']:
418                    if e.has_key('fedid'):
419                        return e['fedid']
420                else:
421                    return None
422            else:
423                return None
424
425        def get_alloc_ids(state):
426            """
427            Pull the fedids of the identifiers of each allocation from the
428            state.  Again, a dict dive that's best isolated.
429            """
430
431            return [ f['allocID']['fedid'] 
432                    for f in state.get('federant',[]) \
433                        if f.has_key('allocID') and \
434                            f['allocID'].has_key('fedid')]
435
436
437        try:
438            f = open(self.state_filename, "r")
439            self.state = pickle.load(f)
440            self.log.debug("[read_state]: Read state from %s" % \
441                    self.state_filename)
442        except IOError, e:
443            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
444                    % (self.state_filename, e))
445        except pickle.UnpicklingError, e:
446            self.log.warning(("[read_state]: No saved state: " + \
447                    "Unpickling failed: %s") % e)
448       
449        for s in self.state.values():
450            try:
451
452                eid = get_experiment_id(s)
453                if eid : 
454                    # Give the owner rights to the experiment
455                    self.auth.set_attribute(s['owner'], eid)
456                    # And holders of the eid as well
457                    self.auth.set_attribute(eid, eid)
458                    # allow overrides to control experiments as well
459                    for o in self.overrides:
460                        self.auth.set_attribute(o, eid)
461                    # Set permissions to allow reading of the software repo, if
462                    # any, as well.
463                    for a in get_alloc_ids(s):
464                        self.auth.set_attribute(a, 'repo/%s' % eid)
465                else:
466                    raise KeyError("No experiment id")
467            except KeyError, e:
468                self.log.warning("[read_state]: State ownership or identity " +\
469                        "misformatted in %s: %s" % (self.state_filename, e))
470
471
472    def read_accessdb(self, accessdb_file):
473        """
474        Read the mapping from fedids that can create experiments to their name
475        in the 3-level access namespace.  All will be asserted from this
476        testbed and can include the local username and porject that will be
477        asserted on their behalf by this fedd.  Each fedid is also added to the
478        authorization system with the "create" attribute.
479        """
480        self.accessdb = {}
481        # These are the regexps for parsing the db
482        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
483        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
484                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
485        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
486                "\s*->\s*(" + name_expr + ")\s*$")
487        lineno = 0
488
489        # Parse the mappings and store in self.authdb, a dict of
490        # fedid -> (proj, user)
491        try:
492            f = open(accessdb_file, "r")
493            for line in f:
494                lineno += 1
495                line = line.strip()
496                if len(line) == 0 or line.startswith('#'):
497                    continue
498                m = project_line.match(line)
499                if m:
500                    fid = fedid(hexstr=m.group(1))
501                    project, user = m.group(2,3)
502                    if not self.accessdb.has_key(fid):
503                        self.accessdb[fid] = []
504                    self.accessdb[fid].append((project, user))
505                    continue
506
507                m = user_line.match(line)
508                if m:
509                    fid = fedid(hexstr=m.group(1))
510                    project = None
511                    user = m.group(2)
512                    if not self.accessdb.has_key(fid):
513                        self.accessdb[fid] = []
514                    self.accessdb[fid].append((project, user))
515                    continue
516                self.log.warn("[experiment_control] Error parsing access " +\
517                        "db %s at line %d" %  (accessdb_file, lineno))
518        except IOError:
519            raise service_error(service_error.internal,
520                    "Error opening/reading %s as experiment " +\
521                            "control accessdb" %  accessdb_file)
522        f.close()
523
524        # Initialize the authorization attributes
525        for fid in self.accessdb.keys():
526            self.auth.set_attribute(fid, 'create')
527
528    def read_mapdb(self, file):
529        """
530        Read a simple colon separated list of mappings for the
531        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
532        """
533
534        self.tbmap = { }
535        lineno =0
536        try:
537            f = open(file, "r")
538            for line in f:
539                lineno += 1
540                line = line.strip()
541                if line.startswith('#') or len(line) == 0:
542                    continue
543                try:
544                    label, url = line.split(':', 1)
545                    self.tbmap[label] = url
546                except ValueError, e:
547                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
548                            "map db: %s %s" % (lineno, line, e))
549        except IOError, e:
550            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
551                    "open %s: %s" % (file, e))
552        f.close()
553       
554    def generate_ssh_keys(self, dest, type="rsa" ):
555        """
556        Generate a set of keys for the gateways to use to talk.
557
558        Keys are of type type and are stored in the required dest file.
559        """
560        valid_types = ("rsa", "dsa")
561        t = type.lower();
562        if t not in valid_types: raise ValueError
563        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
564
565        try:
566            trace = open("/dev/null", "w")
567        except IOError:
568            raise service_error(service_error.internal,
569                    "Cannot open /dev/null??");
570
571        # May raise CalledProcessError
572        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
573        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
574        if rv != 0:
575            raise service_error(service_error.internal, 
576                    "Cannot generate nonce ssh keys.  %s return code %d" \
577                            % (self.ssh_keygen, rv))
578
579    def gentopo(self, str):
580        """
581        Generate the topology dtat structure from the splitter's XML
582        representation of it.
583
584        The topology XML looks like:
585            <experiment>
586                <nodes>
587                    <node><vname></vname><ips>ip1:ip2</ips></node>
588                </nodes>
589                <lans>
590                    <lan>
591                        <vname></vname><vnode></vnode><ip></ip>
592                        <bandwidth></bandwidth><member>node:port</member>
593                    </lan>
594                </lans>
595        """
596        class topo_parse:
597            """
598            Parse the topology XML and create the dats structure.
599            """
600            def __init__(self):
601                # Typing of the subelements for data conversion
602                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
603                self.int_subelements = ( 'bandwidth',)
604                self.float_subelements = ( 'delay',)
605                # The final data structure
606                self.nodes = [ ]
607                self.lans =  [ ]
608                self.topo = { \
609                        'node': self.nodes,\
610                        'lan' : self.lans,\
611                    }
612                self.element = { }  # Current element being created
613                self.chars = ""     # Last text seen
614
615            def end_element(self, name):
616                # After each sub element the contents is added to the current
617                # element or to the appropriate list.
618                if name == 'node':
619                    self.nodes.append(self.element)
620                    self.element = { }
621                elif name == 'lan':
622                    self.lans.append(self.element)
623                    self.element = { }
624                elif name in self.str_subelements:
625                    self.element[name] = self.chars
626                    self.chars = ""
627                elif name in self.int_subelements:
628                    self.element[name] = int(self.chars)
629                    self.chars = ""
630                elif name in self.float_subelements:
631                    self.element[name] = float(self.chars)
632                    self.chars = ""
633
634            def found_chars(self, data):
635                self.chars += data.rstrip()
636
637
638        tp = topo_parse();
639        parser = xml.parsers.expat.ParserCreate()
640        parser.EndElementHandler = tp.end_element
641        parser.CharacterDataHandler = tp.found_chars
642
643        parser.Parse(str)
644
645        return tp.topo
646       
647
648    def genviz(self, topo):
649        """
650        Generate the visualization the virtual topology
651        """
652
653        neato = "/usr/local/bin/neato"
654        # These are used to parse neato output and to create the visualization
655        # file.
656        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
657        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
658                "%s</type></node>"
659
660        try:
661            # Node names
662            nodes = [ n['vname'] for n in topo['node'] ]
663            topo_lans = topo['lan']
664        except KeyError, e:
665            raise service_error(service_error.internal, "Bad topology: %s" %e)
666
667        lans = { }
668        links = { }
669
670        # Walk through the virtual topology, organizing the connections into
671        # 2-node connections (links) and more-than-2-node connections (lans).
672        # When a lan is created, it's added to the list of nodes (there's a
673        # node in the visualization for the lan).
674        for l in topo_lans:
675            if links.has_key(l['vname']):
676                if len(links[l['vname']]) < 2:
677                    links[l['vname']].append(l['vnode'])
678                else:
679                    nodes.append(l['vname'])
680                    lans[l['vname']] = links[l['vname']]
681                    del links[l['vname']]
682                    lans[l['vname']].append(l['vnode'])
683            elif lans.has_key(l['vname']):
684                lans[l['vname']].append(l['vnode'])
685            else:
686                links[l['vname']] = [ l['vnode'] ]
687
688
689        # Open up a temporary file for dot to turn into a visualization
690        try:
691            df, dotname = tempfile.mkstemp()
692            dotfile = os.fdopen(df, 'w')
693        except IOError:
694            raise service_error(service_error.internal,
695                    "Failed to open file in genviz")
696
697        try:
698            dnull = open('/dev/null', 'w')
699        except IOError:
700            service_error(service_error.internal,
701                    "Failed to open /dev/null in genviz")
702
703        # Generate a dot/neato input file from the links, nodes and lans
704        try:
705            print >>dotfile, "graph G {"
706            for n in nodes:
707                print >>dotfile, '\t"%s"' % n
708            for l in links.keys():
709                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
710            for l in lans.keys():
711                for n in lans[l]:
712                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
713            print >>dotfile, "}"
714            dotfile.close()
715        except TypeError:
716            raise service_error(service_error.internal,
717                    "Single endpoint link in vtopo")
718        except IOError:
719            raise service_error(service_error.internal, "Cannot write dot file")
720
721        # Use dot to create a visualization
722        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
723                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
724                close_fds=True)
725        dnull.close()
726
727        # Translate dot to vis format
728        vis_nodes = [ ]
729        vis = { 'node': vis_nodes }
730        for line in dot.stdout:
731            m = vis_re.match(line)
732            if m:
733                vn = m.group(1)
734                vis_node = {'name': vn, \
735                        'x': float(m.group(2)),\
736                        'y' : float(m.group(3)),\
737                    }
738                if vn in links.keys() or vn in lans.keys():
739                    vis_node['type'] = 'lan'
740                else:
741                    vis_node['type'] = 'node'
742                vis_nodes.append(vis_node)
743        rv = dot.wait()
744
745        os.remove(dotname)
746        if rv == 0 : return vis
747        else: return None
748
749    def get_access(self, tb, nodes, user, tbparam, master, export_project,
750            access_user):
751        """
752        Get access to testbed through fedd and set the parameters for that tb
753        """
754        uri = self.tbmap.get(tb, None)
755        if not uri:
756            raise service_error(serice_error.server_config, 
757                    "Unknown testbed: %s" % tb)
758
759        # currently this lumps all users into one service access group
760        service_keys = [ a for u in user \
761                for a in u.get('access', []) \
762                    if a.has_key('sshPubkey')]
763
764        if len(service_keys) == 0:
765            raise service_error(service_error.req, 
766                    "Must have at least one SSH pubkey for services")
767
768
769        for p, u in access_user:
770            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
771                    "to %s") %  ((p or "None"), u, uri))
772
773            if p:
774                # Request with user and project specified
775                req = {\
776                        'destinationTestbed' : { 'uri' : uri },
777                        'project': { 
778                            'name': {'localname': p},
779                            'user': [ {'userID': { 'localname': u } } ],
780                            },
781                        'user':  user,
782                        'allocID' : { 'localname': 'test' },
783                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
784                        'serviceAccess' : service_keys
785                    }
786            else:
787                # Request with only user specified
788                req = {\
789                        'destinationTestbed' : { 'uri' : uri },
790                        'user':  [ {'userID': { 'localname': u } } ],
791                        'allocID' : { 'localname': 'test' },
792                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
793                        'serviceAccess' : service_keys
794                    }
795
796            if tb == master:
797                # NB, the export_project parameter is a dict that includes
798                # the type
799                req['exportProject'] = export_project
800
801            # node resources if any
802            if nodes != None and len(nodes) > 0:
803                rnodes = [ ]
804                for n in nodes:
805                    rn = { }
806                    image, hw, count = n.split(":")
807                    if image: rn['image'] = [ image ]
808                    if hw: rn['hardware'] = [ hw ]
809                    if count and int(count) >0 : rn['count'] = int(count)
810                    rnodes.append(rn)
811                req['resources']= { }
812                req['resources']['node'] = rnodes
813
814            try:
815                if self.local_access.has_key(uri):
816                    # Local access call
817                    req = { 'RequestAccessRequestBody' : req }
818                    r = self.local_access[uri].RequestAccess(req, 
819                            fedid(file=self.cert_file))
820                    r = { 'RequestAccessResponseBody' : r }
821                else:
822                    r = self.call_RequestAccess(uri, req, 
823                            self.cert_file, self.cert_pwd, self.trusted_certs)
824            except service_error, e:
825                if e.code == service_error.access:
826                    self.log.debug("[get_access] Access denied")
827                    r = None
828                    continue
829                else:
830                    raise e
831
832            if r.has_key('RequestAccessResponseBody'):
833                # Through to here we have a valid response, not a fault.
834                # Access denied is a fault, so something better or worse than
835                # access denied has happened.
836                r = r['RequestAccessResponseBody']
837                self.log.debug("[get_access] Access granted")
838                break
839            else:
840                raise service_error(service_error.protocol,
841                        "Bad proxy response")
842       
843        if not r:
844            raise service_error(service_error.access, 
845                    "Access denied by %s (%s)" % (tb, uri))
846
847        e = r['emulab']
848        p = e['project']
849        tbparam[tb] = { 
850                "boss": e['boss'],
851                "host": e['ops'],
852                "domain": e['domain'],
853                "fs": e['fileServer'],
854                "eventserver": e['eventServer'],
855                "project": unpack_id(p['name']),
856                "emulab" : e,
857                "allocID" : r['allocID'],
858                }
859        # Make the testbed name be the label the user applied
860        p['testbed'] = {'localname': tb }
861
862        for u in p['user']:
863            role = u.get('role', None)
864            if role == 'experimentCreation':
865                tbparam[tb]['user'] = unpack_id(u['userID'])
866                break
867        else:
868            raise service_error(service_error.internal, 
869                    "No createExperimentUser from %s" %tb)
870
871        # Add attributes to barameter space.  We don't allow attributes to
872        # overlay any parameters already installed.
873        for a in e['fedAttr']:
874            try:
875                if a['attribute'] and isinstance(a['attribute'], basestring)\
876                        and not tbparam[tb].has_key(a['attribute'].lower()):
877                    tbparam[tb][a['attribute'].lower()] = a['value']
878            except KeyError:
879                self.log.error("Bad attribute in response: %s" % a)
880       
881    def release_access(self, tb, aid):
882        """
883        Release access to testbed through fedd
884        """
885
886        uri = self.tbmap.get(tb, None)
887        if not uri:
888            raise service_error(serice_error.server_config, 
889                    "Unknown testbed: %s" % tb)
890
891        if self.local_access.has_key(uri):
892            resp = self.local_access[uri].ReleaseAccess(\
893                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
894                    fedid(file=self.cert_file))
895            resp = { 'ReleaseAccessResponseBody': resp } 
896        else:
897            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
898                    self.cert_file, self.cert_pwd, self.trusted_certs)
899
900        # better error coding
901
902    def remote_splitter(self, uri, desc, master):
903
904        req = {
905                'description' : { 'ns2description': desc },
906                'master': master,
907                'include_fedkit': bool(self.fedkit),
908                'include_gatewaykit': bool(self.gatewaykit)
909            }
910
911        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
912                self.trusted_certs)
913
914        if r.has_key('Ns2SplitResponseBody'):
915            r = r['Ns2SplitResponseBody']
916            if r.has_key('output'):
917                return r['output'].splitlines()
918            else:
919                raise service_error(service_error.protocol, 
920                        "Bad splitter response (no output)")
921        else:
922            raise service_error(service_error.protocol, "Bad splitter response")
923
924    class start_segment:
925        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
926                cert_pwd=None, trusted_certs=None, caller=None):
927            self.log = log
928            self.debug = debug
929            self.cert_file = cert_file
930            self.cert_pwd = cert_pwd
931            self.trusted_certs = None
932            self.caller = caller
933            self.testbed = testbed
934
935        def __call__(self, uri, aid, topo, master, attrs=None):
936            req = {
937                    'allocID': { 'fedid' : aid }, 
938                    'segmentdescription': { 
939                        'topdldescription': topo.to_dict(),
940                    },
941                    'master': master,
942                }
943            if attrs:
944                req['fedAttr'] = attrs
945
946            try:
947                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
948                        self.trusted_certs)
949                return True
950            except service_error, e:
951                self.log.error("Start segment failed on %s: %s" % \
952                        (self.testbed, e))
953                return False
954
955
956
957    class terminate_segment:
958        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
959                cert_pwd=None, trusted_certs=None, caller=None):
960            self.log = log
961            self.debug = debug
962            self.cert_file = cert_file
963            self.cert_pwd = cert_pwd
964            self.trusted_certs = None
965            self.caller = caller
966            self.testbed = testbed
967
968        def __call__(self, uri, aid ):
969            req = {
970                    'allocID': aid , 
971                }
972            try:
973                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
974                        self.trusted_certs)
975                return True
976            except service_error, e:
977                self.log.error("Terminate segment failed on %s: %s" % \
978                        (self.testbed, e))
979                return False
980   
981
982    def allocate_resources(self, allocated, master, eid, expid, expcert, 
983            tbparams, topo, tmpdir, alloc_log=None, attrs=None):
984        started = { }           # Testbeds where a sub-experiment started
985                                # successfully
986
987        # XXX
988        fail_soft = False
989
990        log = alloc_log or self.log
991
992        thread_pool = self.thread_pool(self.nthreads)
993        threads = [ ]
994
995        for tb in [ k for k in allocated.keys() if k != master]:
996            # Create and start a thread to start the segment, and save it to
997            # get the return value later
998            thread_pool.wait_for_slot()
999            uri = self.tbmap.get(tb, None)
1000            if not uri:
1001                raise service_error(service_error.internal, 
1002                        "Unknown testbed %s !?" % tb)
1003
1004            if tbparams[tb].has_key('allocID') and \
1005                    tbparams[tb]['allocID'].has_key('fedid'):
1006                aid = tbparams[tb]['allocID']['fedid']
1007            else:
1008                raise service_error(service_error.internal, 
1009                        "No alloc id for testbed %s !?" % tb)
1010
1011            t  = self.pooled_thread(\
1012                    target=self.start_segment(log=log, debug=self.debug,
1013                        testbed=tb, cert_file=self.cert_file,
1014                        cert_pwd=self.cert_pwd,
1015                        trusted_certs=self.trusted_certs,
1016                        caller=self.call_StartSegment), 
1017                    args=(uri, aid, topo[tb], False, attrs), name=tb,
1018                    pdata=thread_pool, trace_file=self.trace_file)
1019            threads.append(t)
1020            t.start()
1021
1022        # Wait until all finish
1023        thread_pool.wait_for_all_done()
1024
1025        # If none failed, start the master
1026        failed = [ t.getName() for t in threads if not t.rv ]
1027
1028        if len(failed) == 0:
1029            uri = self.tbmap.get(master, None)
1030            if not uri:
1031                raise service_error(service_error.internal, 
1032                        "Unknown testbed %s !?" % master)
1033
1034            if tbparams[master].has_key('allocID') and \
1035                    tbparams[master]['allocID'].has_key('fedid'):
1036                aid = tbparams[master]['allocID']['fedid']
1037            else:
1038                raise service_error(service_error.internal, 
1039                    "No alloc id for testbed %s !?" % master)
1040            starter = self.start_segment(log=log, debug=self.debug,
1041                    testbed=master, cert_file=self.cert_file,
1042                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1043                    caller=self.call_StartSegment)
1044            if not starter(uri, aid, topo[master], True, attrs):
1045                failed.append(master)
1046
1047        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1048        # If one failed clean up, unless fail_soft is set
1049        if failed and False:
1050            if not fail_soft:
1051                thread_pool.clear()
1052                for tb in succeeded:
1053                    # Create and start a thread to stop the segment
1054                    thread_pool.wait_for_slot()
1055                    t  = self.pooled_thread(\
1056                            target=self.stop_segment(log=log,
1057                                testbed=tb,
1058                                keyfile=self.ssh_privkey_file,
1059                                debug=self.debug), 
1060                            args=(tb, eid, tbparams), name=tb,
1061                            pdata=thread_pool, trace_file=self.trace_file)
1062                    t.start()
1063                # Wait until all finish
1064                thread_pool.wait_for_all_done()
1065
1066                # release the allocations
1067                for tb in tbparams.keys():
1068                    self.release_access(tb, tbparams[tb]['allocID'])
1069                # Remove the placeholder
1070                self.state_lock.acquire()
1071                self.state[eid]['experimentStatus'] = 'failed'
1072                if self.state_filename: self.write_state()
1073                self.state_lock.release()
1074
1075                log.error("Swap in failed on %s" % ",".join(failed))
1076                return
1077        else:
1078            log.info("[start_segment]: Experiment %s active" % eid)
1079
1080        log.debug("[start_experiment]: removing %s" % tmpdir)
1081
1082        # Walk up tmpdir, deleting as we go
1083        for path, dirs, files in os.walk(tmpdir, topdown=False):
1084            for f in files:
1085                os.remove(os.path.join(path, f))
1086            for d in dirs:
1087                os.rmdir(os.path.join(path, d))
1088        os.rmdir(tmpdir)
1089
1090        # Insert the experiment into our state and update the disk copy
1091        self.state_lock.acquire()
1092        self.state[expid]['experimentStatus'] = 'active'
1093        self.state[eid] = self.state[expid]
1094        if self.state_filename: self.write_state()
1095        self.state_lock.release()
1096        return
1097
1098
1099    def add_kit(self, e, kit):
1100        """
1101        Add a Software object created from the list of (install, location)
1102        tuples passed as kit  to the software attribute of an object e.  We
1103        do this enough to break out the code, but it's kind of a hack to
1104        avoid changing the old tuple rep.
1105        """
1106
1107        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1108
1109        if isinstance(e.software, list): e.software.extend(s)
1110        else: e.software = s
1111
1112
1113    def create_experiment_state(self, fid, req, expid, expcert):
1114        """
1115        Create the initial entry in the experiment's state.  The expid and
1116        expcert are the experiment's fedid and certifacte that represents that
1117        ID, which are installed in the experiment state.  If the request
1118        includes a suggested local name that is used if possible.  If the local
1119        name is already taken by an experiment owned by this user that has
1120        failed, it is overwriutten.  Otherwise new letters are added until a
1121        valid localname is found.  The generated local name is returned.
1122        """
1123
1124        if req.has_key('experimentID') and \
1125                req['experimentID'].has_key('localname'):
1126            overwrite = False
1127            eid = req['experimentID']['localname']
1128            # If there's an old failed experiment here with the same local name
1129            # and accessible by this user, we'll overwrite it, otherwise we'll
1130            # fall through and do the collision avoidance.
1131            old_expid = self.get_experiment_fedid(eid)
1132            if old_expid and self.check_experiment_access(fid, old_expid):
1133                self.state_lock.acquire()
1134                status = self.state[eid].get('experimentStatus', None)
1135                if status and status == 'failed':
1136                    # remove the old access attribute
1137                    self.auth.unset_attribute(fid, old_expid)
1138                    overwrite = True
1139                    del self.state[eid]
1140                    del self.state[old_expid]
1141                self.state_lock.release()
1142            self.state_lock.acquire()
1143            while (self.state.has_key(eid) and not overwrite):
1144                eid += random.choice(string.ascii_letters)
1145            # Initial state
1146            self.state[eid] = {
1147                    'experimentID' : \
1148                            [ { 'localname' : eid }, {'fedid': expid } ],
1149                    'experimentStatus': 'starting',
1150                    'experimentAccess': { 'X509' : expcert },
1151                    'owner': fid,
1152                    'log' : [],
1153                }
1154            self.state[expid] = self.state[eid]
1155            if self.state_filename: self.write_state()
1156            self.state_lock.release()
1157        else:
1158            eid = self.exp_stem
1159            for i in range(0,5):
1160                eid += random.choice(string.ascii_letters)
1161            self.state_lock.acquire()
1162            while (self.state.has_key(eid)):
1163                eid = self.exp_stem
1164                for i in range(0,5):
1165                    eid += random.choice(string.ascii_letters)
1166            # Initial state
1167            self.state[eid] = {
1168                    'experimentID' : \
1169                            [ { 'localname' : eid }, {'fedid': expid } ],
1170                    'experimentStatus': 'starting',
1171                    'experimentAccess': { 'X509' : expcert },
1172                    'owner': fid,
1173                    'log' : [],
1174                }
1175            self.state[expid] = self.state[eid]
1176            if self.state_filename: self.write_state()
1177            self.state_lock.release()
1178
1179        return eid
1180
1181
1182    def allocate_ips_to_topo(self, top):
1183        """
1184        Add an ip4_address attribute to all the hosts in teh topology, based on
1185        the shared substrates on which they sit.  An /etc/hosts file is also
1186        created and returned as a list of hostfiles entries.
1187        """
1188        subs = sorted(top.substrates, 
1189                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1190                reverse=True)
1191        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1192        ifs = { }
1193        hosts = [ ]
1194
1195        for idx, s in enumerate(subs):
1196            a = ips.allocate(len(s.interfaces)+2)
1197            if a :
1198                base, num = a
1199                if num < len(s.interfaces) +2 : 
1200                    raise service_error(service_error.internal,
1201                            "Allocator returned wrong number of IPs??")
1202            else:
1203                raise service_error(service_error.req, 
1204                        "Cannot allocate IP addresses")
1205
1206            base += 1
1207            for i in s.interfaces:
1208                i.attribute.append(
1209                        topdl.Attribute('ip4_address', 
1210                            "%s" % ip_addr(base)))
1211                hname = i.element.name[0]
1212                if ifs.has_key(hname):
1213                    hosts.append("%s\t%s-%s %s-%d" % \
1214                            (ip_addr(base), hname, s.name, hname,
1215                                ifs[hname]))
1216                else:
1217                    ifs[hname] = 0
1218                    hosts.append("%s\t%s-%s %s-%d %s" % \
1219                            (ip_addr(base), hname, s.name, hname,
1220                                ifs[hname], hname))
1221
1222                ifs[hname] += 1
1223                base += 1
1224        return hosts
1225
1226    def get_access_to_testbeds(self, testbeds, user, access_user, 
1227            export_project, master, allocated, tbparams):
1228        """
1229        Request access to the various testbeds required for this instantiation
1230        (passed in as testbeds).  User, access_user, expoert_project and master
1231        are used to construct the correct requests.  Per-testbed parameters are
1232        returned in tbparams.
1233        """
1234        for tb in testbeds:
1235            self.get_access(tb, None, user, tbparams, master,
1236                    export_project, access_user)
1237            allocated[tb] = 1
1238
1239    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1240        """
1241        Create the sub-topologies that are needed for experimetn instantiation.
1242        Along the way attach startup commands to the computers in the
1243        subtopologies.
1244        """
1245        for tb in testbeds:
1246            topo[tb] = top.clone()
1247            to_delete = [ ]
1248            for e in topo[tb].elements:
1249                etb = e.get_attribute('testbed')
1250                if etb and etb != tb:
1251                    for i in e.interface:
1252                        for s in i.subs:
1253                            try:
1254                                s.interfaces.remove(i)
1255                            except ValueError:
1256                                raise service_error(service_error.internal,
1257                                        "Can't remove interface??")
1258                    to_delete.append(e)
1259            for e in to_delete:
1260                topo[tb].elements.remove(e)
1261            topo[tb].make_indices()
1262
1263            for e in [ e for e in topo[tb].elements \
1264                    if isinstance(e,topdl.Computer)]:
1265                if tb == master:
1266                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1267                else:
1268                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1269                scmd = e.get_attribute('startup')
1270                if scmd:
1271                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1272
1273                e.set_attribute('startup', cmd)
1274                if self.fedkit: self.add_kit(e, self.fedkit)
1275
1276    def add_portals(self, top, topo, eid, master, tbparams):
1277        """
1278        For each substrate in the main topology, find those that
1279        have nodes on more than one testbed.  Insert portal nodes
1280        into the copies of those substrates on the sub topologies.
1281        """
1282        for s in top.substrates:
1283            # tbs will contain an ip address on this subsrate that is in
1284            # each testbed.
1285            tbs = { }
1286            for i in s.interfaces:
1287                e = i.element
1288                tb = e.get_attribute('testbed')
1289                if tb and not tbs.has_key(tb):
1290                    for i in e.interface:
1291                        if s in i.subs:
1292                            tbs[tb]= i.get_attribute('ip4_address')
1293            if len(tbs) < 2:
1294                continue
1295
1296            # More than one testbed is on this substrate.  Insert
1297            # some portals into the subtopologies.  st == source testbed,
1298            # dt == destination testbed.
1299            segment_substrate = { }
1300            for st in tbs.keys():
1301                segment_substrate[st] = { }
1302                for dt in [ t for t in tbs.keys() if t != st]:
1303                    myname =  "%stunnel" % dt
1304                    desthost  =  "%stunnel" % st
1305                    sproject = tbparams[st].get('project', 'project')
1306                    dproject = tbparams[dt].get('project', 'project')
1307                    mproject = tbparams[master].get('project', 'project')
1308                    sdomain = tbparams[st].get('domain', ".example.com")
1309                    ddomain = tbparams[dt].get('domain', ".example.com")
1310                    mdomain = tbparams[master].get('domain', '.example.com')
1311                    muser = tbparams[master].get('user', 'root')
1312                    smbshare = tbparams[master].get('smbshare', 'USERS')
1313                    # XXX: active and type need to be unkludged
1314                    active = ("%s" % (st == master))
1315                    if not segment_substrate[st].has_key(dt):
1316                        # Put a substrate and a segment for the connected
1317                        # testbed in there.
1318                        tsubstrate = \
1319                                topdl.Substrate(name='%s-%s' % (st, dt),
1320                                        attribute= [
1321                                            topdl.Attribute(
1322                                                attribute='portal',
1323                                                value='true')
1324                                            ]
1325                                        )
1326                        segment_element = topdl.Segment(
1327                                id= tbparams[dt]['allocID'],
1328                                type='emulab',
1329                                uri = self.tbmap.get(dt, None),
1330                                interface=[ 
1331                                    topdl.Interface(
1332                                        substrate=tsubstrate.name),
1333                                    ],
1334                                attribute = [
1335                                    topdl.Attribute(attribute=n, value=v)
1336                                        for n, v in (\
1337                                            ('domain', ddomain),
1338                                            ('experiment', "%s/%s" % \
1339                                                    (dproject, eid)),)
1340                                    ],
1341                                )
1342                        segment_substrate[st][dt] = tsubstrate
1343                        topo[st].substrates.append(tsubstrate)
1344                        topo[st].elements.append(segment_element)
1345                    portal = topdl.Computer(
1346                            name="%stunnel" % dt,
1347                            attribute=[ 
1348                                topdl.Attribute(attribute=n,value=v)
1349                                    for n, v in (\
1350                                        ('portal', 'true'),
1351                                        ('domain', sdomain),
1352                                        ('masterdomain', mdomain),
1353                                        ('masterexperiment', "%s/%s" % \
1354                                                (mproject, eid)),
1355                                        ('masteruser', muser),
1356                                        ('smbshare', smbshare),
1357                                        ('experiment', "%s/%s" % \
1358                                                (sproject, eid)),
1359                                        ('peer', "%s" % desthost),
1360                                        ('peer_segment', "%s" % \
1361                                                tbparams[dt]['allocID']['fedid']),
1362                                        ('scriptdir', 
1363                                            "/usr/local/federation/bin"),
1364                                        ('active', "%s" % active),
1365                                        ('portal_type', 'both'), 
1366                                        ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1367                                ],
1368                            interface=[
1369                                topdl.Interface(
1370                                    substrate=s.name,
1371                                    attribute=[ 
1372                                        topdl.Attribute(
1373                                            attribute='ip4_address', 
1374                                            value=tbs[dt]
1375                                        )
1376                                    ]),
1377                                topdl.Interface(
1378                                    substrate=\
1379                                        segment_substrate[st][dt].name,
1380                                    attribute=[
1381                                        topdl.Attribute(attribute='portal',
1382                                            value='true')
1383                                        ]
1384                                    ),
1385                                ],
1386                            )
1387                    if self.fedkit: self.add_kit(portal, self.fedkit)
1388                    if self.gatewaykit: self.add_kit(portal, self.gatewaykit)
1389
1390                    topo[st].elements.append(portal)
1391
1392        # Connect the gateway nodes into the topologies and clear out
1393        # substrates that are not in the topologies
1394        for tb in tbparams.keys():
1395            topo[tb].incorporate_elements()
1396            topo[tb].substrates = \
1397                    [s for s in topo[tb].substrates \
1398                        if len(s.interfaces) >0]
1399
1400    def wrangle_software(self, expid, top, topo, tbparams):
1401        """
1402        Copy software out to the repository directory, allocate permissions and
1403        rewrite the segment topologies to look for the software in local
1404        places.
1405        """
1406
1407        # Copy the rpms and tarfiles to a distribution directory from
1408        # which the federants can retrieve them
1409        linkpath = "%s/software" %  expid
1410        softdir ="%s/%s" % ( self.repodir, linkpath)
1411        softmap = { }
1412        # These are in a list of tuples format (each kit).  This comprehension
1413        # unwraps them into a single list of tuples that initilaizes the set of
1414        # tuples.
1415        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1416                for p, t in l ])
1417        pkgs.update([x.location for e in top.elements \
1418                for x in e.software])
1419        try:
1420            os.makedirs(softdir)
1421        except IOError, e:
1422            raise service_error(
1423                    "Cannot create software directory: %s" % e)
1424        # The actual copying.  Everything's converted into a url for copying.
1425        for pkg in pkgs:
1426            loc = pkg
1427
1428            scheme, host, path = urlparse(loc)[0:3]
1429            dest = os.path.basename(path)
1430            if not scheme:
1431                if not loc.startswith('/'):
1432                    loc = "/%s" % loc
1433                loc = "file://%s" %loc
1434            try:
1435                u = urlopen(loc)
1436            except Exception, e:
1437                raise service_error(service_error.req, 
1438                        "Cannot open %s: %s" % (loc, e))
1439            try:
1440                f = open("%s/%s" % (softdir, dest) , "w")
1441                self.log.debug("Writing %s/%s" % (softdir,dest) )
1442                data = u.read(4096)
1443                while data:
1444                    f.write(data)
1445                    data = u.read(4096)
1446                f.close()
1447                u.close()
1448            except Exception, e:
1449                raise service_error(service_error.internal,
1450                        "Could not copy %s: %s" % (loc, e))
1451            path = re.sub("/tmp", "", linkpath)
1452            # XXX
1453            softmap[pkg] = \
1454                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1455                    ( path, dest)
1456
1457            # Allow the individual segments to access the software.
1458            for tb in tbparams.keys():
1459                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1460                        "/%s/%s" % ( path, dest))
1461
1462        # Convert the software locations in the segments into the local
1463        # copies on this host
1464        for soft in [ s for tb in topo.values() \
1465                for e in tb.elements \
1466                    if getattr(e, 'software', False) \
1467                        for s in e.software ]:
1468            if softmap.has_key(soft.location):
1469                soft.location = softmap[soft.location]
1470
1471
1472    def create_experiment(self, req, fid):
1473        """
1474        The external interface to experiment creation called from the
1475        dispatcher.
1476
1477        Creates a working directory, splits the incoming description using the
1478        splitter script and parses out the avrious subsections using the
1479        lcasses above.  Once each sub-experiment is created, use pooled threads
1480        to instantiate them and start it all up.
1481        """
1482        if not self.auth.check_attribute(fid, 'create'):
1483            raise service_error(service_error.access, "Create access denied")
1484
1485        try:
1486            tmpdir = tempfile.mkdtemp(prefix="split-")
1487            os.mkdir(tmpdir+"/keys")
1488        except IOError:
1489            raise service_error(service_error.internal, "Cannot create tmp dir")
1490
1491        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1492        gw_secretkey_base = "fed.%s" % self.ssh_type
1493        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1494        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1495        tclfile = tmpdir + "/experiment.tcl"
1496        tbparams = { }
1497        try:
1498            access_user = self.accessdb[fid]
1499        except KeyError:
1500            raise service_error(service_error.internal,
1501                    "Access map and authorizer out of sync in " + \
1502                            "create_experiment for fedid %s"  % fid)
1503
1504        pid = "dummy"
1505        gid = "dummy"
1506
1507        req = req.get('CreateRequestBody', None)
1508        if not req:
1509            raise service_error(service_error.req,
1510                    "Bad request format (no CreateRequestBody)")
1511        # The tcl parser needs to read a file so put the content into that file
1512        descr=req.get('experimentdescription', None)
1513        if descr:
1514            file_content=descr.get('ns2description', None)
1515            if file_content:
1516                try:
1517                    f = open(tclfile, 'w')
1518                    f.write(file_content)
1519                    f.close()
1520                except IOError:
1521                    raise service_error(service_error.internal,
1522                            "Cannot write temp experiment description")
1523            else:
1524                raise service_error(service_error.req, 
1525                        "Only ns2descriptions supported")
1526        else:
1527            raise service_error(service_error.req, "No experiment description")
1528
1529        # Generate an ID for the experiment (slice) and a certificate that the
1530        # allocator can use to prove they own it.  We'll ship it back through
1531        # the encrypted connection.
1532        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1533
1534        eid = self.create_experiment_state(fid, req, expid, expcert)
1535        try: 
1536            # This catches exceptions to clear the placeholder if necessary
1537            try:
1538                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1539            except ValueError:
1540                raise service_error(service_error.server_config, 
1541                        "Bad key type (%s)" % self.ssh_type)
1542
1543            user = req.get('user', None)
1544            if user == None:
1545                raise service_error(service_error.req, "No user")
1546
1547            master = req.get('master', None)
1548            if not master:
1549                raise service_error(service_error.req,
1550                        "No master testbed label")
1551            export_project = req.get('exportProject', None)
1552            if not export_project:
1553                raise service_error(service_error.req, "No export project")
1554           
1555            # Translate to topdl
1556            if self.splitter_url:
1557                # XXX: need remote topdl translator
1558                self.log.debug("Calling remote splitter at %s" % \
1559                        self.splitter_url)
1560                split_data = self.remote_splitter(self.splitter_url,
1561                        file_content, master)
1562            else:
1563                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1564                    str(self.muxmax), '-m', master]
1565
1566                if self.fedkit:
1567                    tclcmd.append('-k')
1568
1569                if self.gatewaykit:
1570                    tclcmd.append('-K')
1571
1572                tclcmd.extend([pid, gid, eid, tclfile])
1573
1574                self.log.debug("running local splitter %s", " ".join(tclcmd))
1575                # This is just fantastic.  As a side effect the parser copies
1576                # tb_compat.tcl into the current directory, so that directory
1577                # must be writable by the fedd user.  Doing this in the
1578                # temporary subdir ensures this is the case.
1579                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1580                        cwd=tmpdir)
1581                split_data = tclparser.stdout
1582
1583            top = topdl.topology_from_xml(file=split_data, top="experiment")
1584
1585            hosts = self.allocate_ips_to_topo(top)
1586             # Find the testbeds to look up
1587            testbeds = set([ a.value for e in top.elements \
1588                    for a in e.attribute \
1589                    if a.attribute == 'testbed'] )
1590
1591            allocated = { }         # Testbeds we can access
1592            topo ={ }               # Sub topologies
1593            self.get_access_to_testbeds(testbeds, user, access_user, 
1594                    export_project, master, allocated, tbparams)
1595            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1596
1597            # Copy configuration files into the remote file store
1598            # The config urlpath
1599            configpath = "/%s/config" % expid
1600            # The config file system location
1601            configdir ="%s%s" % ( self.repodir, configpath)
1602            try:
1603                os.makedirs(configdir)
1604            except IOError, e:
1605                raise service_error(
1606                        "Cannot create config directory: %s" % e)
1607            try:
1608                f = open("%s/hosts" % configdir, "w")
1609                f.write('\n'.join(hosts))
1610                f.close()
1611            except IOError, e:
1612                raise service_error(service_error.internal, 
1613                        "Cannot write hosts file: %s" % e)
1614            try:
1615                self.copy_file("%s" % gw_pubkey, "%s/%s" % \
1616                        (configdir, gw_pubkey_base))
1617                self.copy_file("%s" % gw_secretkey, "%s/%s" % \
1618                        (configdir, gw_secretkey_base))
1619            except IOError, e:
1620                raise service_error(service_error.internal, 
1621                        "Cannot copy keyfiles: %s" % e)
1622
1623            # Allow the individual testbeds to access the configuration files.
1624            for tb in tbparams.keys():
1625                asignee = tbparams[tb]['allocID']['fedid']
1626                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1627                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1628
1629            self.add_portals(top, topo, eid, master, tbparams)
1630            self.wrangle_software(expid, top, topo, tbparams)
1631
1632            vtopo = topdl.topology_to_vtopo(top)
1633            vis = self.genviz(vtopo)
1634
1635            # save federant information
1636            for k in allocated.keys():
1637                tbparams[k]['federant'] = {\
1638                        'name': [ { 'localname' : eid} ],\
1639                        'emulab': tbparams[k]['emulab'],\
1640                        'allocID' : tbparams[k]['allocID'],\
1641                        'master' : k == master,\
1642                    }
1643
1644            self.state_lock.acquire()
1645            self.state[eid]['vtopo'] = vtopo
1646            self.state[eid]['vis'] = vis
1647            self.state[expid]['federant'] = \
1648                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1649                        if tbparams[tb].has_key('federant') ]
1650            if self.state_filename: 
1651                self.write_state()
1652            self.state_lock.release()
1653        except service_error, e:
1654            # If something goes wrong in the parse (usually an access error)
1655            # clear the placeholder state.  From here on out the code delays
1656            # exceptions.  Failing at this point returns a fault to the remote
1657            # caller.
1658
1659            self.state_lock.acquire()
1660            del self.state[eid]
1661            del self.state[expid]
1662            if self.state_filename: self.write_state()
1663            self.state_lock.release()
1664            raise e
1665
1666
1667        # Start the background swapper and return the starting state.  From
1668        # here on out, the state will stick around a while.
1669
1670        # Let users touch the state
1671        self.auth.set_attribute(fid, expid)
1672        self.auth.set_attribute(expid, expid)
1673        # Override fedids can manipulate state as well
1674        for o in self.overrides:
1675            self.auth.set_attribute(o, expid)
1676
1677        # Create a logger that logs to the experiment's state object as well as
1678        # to the main log file.
1679        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1680        h = logging.StreamHandler(self.list_log(self.state[eid]['log']))
1681        # XXX: there should be a global one of these rather than repeating the
1682        # code.
1683        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1684                    '%d %b %y %H:%M:%S'))
1685        alloc_log.addHandler(h)
1686       
1687        # XXX
1688        url_base = 'https://users.isi.deterlab.net:23232'
1689        attrs = [ 
1690                {
1691                    'attribute': 'ssh_pubkey', 
1692                    'value': '%s/%s/config/%s' % \
1693                            (url_base, expid, gw_pubkey_base)
1694                },
1695                {
1696                    'attribute': 'ssh_secretkey', 
1697                    'value': '%s/%s/config/%s' % \
1698                            (url_base, expid, gw_secretkey_base)
1699                },
1700                {
1701                    'attribute': 'hosts', 
1702                    'value': '%s/%s/config/hosts' % \
1703                            (url_base, expid)
1704                },
1705                {
1706                    'attribute': 'experiment_name',
1707                    'value': eid,
1708                },
1709            ]
1710
1711        # Start a thread to do the resource allocation
1712        t  = Thread(target=self.allocate_resources,
1713                args=(allocated, master, eid, expid, expcert, tbparams, 
1714                    topo, tmpdir, alloc_log, attrs),
1715                name=eid)
1716        t.start()
1717
1718        rv = {
1719                'experimentID': [
1720                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1721                ],
1722                'experimentStatus': 'starting',
1723                'experimentAccess': { 'X509' : expcert }
1724            }
1725
1726        return rv
1727   
1728    def get_experiment_fedid(self, key):
1729        """
1730        find the fedid associated with the localname key in the state database.
1731        """
1732
1733        rv = None
1734        self.state_lock.acquire()
1735        if self.state.has_key(key):
1736            if isinstance(self.state[key], dict):
1737                try:
1738                    kl = [ f['fedid'] for f in \
1739                            self.state[key]['experimentID']\
1740                                if f.has_key('fedid') ]
1741                except KeyError:
1742                    self.state_lock.release()
1743                    raise service_error(service_error.internal, 
1744                            "No fedid for experiment %s when getting "+\
1745                                    "fedid(!?)" % key)
1746                if len(kl) == 1:
1747                    rv = kl[0]
1748                else:
1749                    self.state_lock.release()
1750                    raise service_error(service_error.internal, 
1751                            "multiple fedids for experiment %s when " +\
1752                                    "getting fedid(!?)" % key)
1753            else:
1754                self.state_lock.release()
1755                raise service_error(service_error.internal, 
1756                        "Unexpected state for %s" % key)
1757        self.state_lock.release()
1758        return rv
1759
1760    def check_experiment_access(self, fid, key):
1761        """
1762        Confirm that the fid has access to the experiment.  Though a request
1763        may be made in terms of a local name, the access attribute is always
1764        the experiment's fedid.
1765        """
1766        if not isinstance(key, fedid):
1767            key = self.get_experiment_fedid(key)
1768
1769        if self.auth.check_attribute(fid, key):
1770            return True
1771        else:
1772            raise service_error(service_error.access, "Access Denied")
1773
1774
1775    def get_handler(self, path, fid):
1776        if self.auth.check_attribute(fid, path):
1777            return ("%s/%s" % (self.repodir, path), "application/binary")
1778        else:
1779            return (None, None)
1780
1781    def get_vtopo(self, req, fid):
1782        """
1783        Return the stored virtual topology for this experiment
1784        """
1785        rv = None
1786        state = None
1787
1788        req = req.get('VtopoRequestBody', None)
1789        if not req:
1790            raise service_error(service_error.req,
1791                    "Bad request format (no VtopoRequestBody)")
1792        exp = req.get('experiment', None)
1793        if exp:
1794            if exp.has_key('fedid'):
1795                key = exp['fedid']
1796                keytype = "fedid"
1797            elif exp.has_key('localname'):
1798                key = exp['localname']
1799                keytype = "localname"
1800            else:
1801                raise service_error(service_error.req, "Unknown lookup type")
1802        else:
1803            raise service_error(service_error.req, "No request?")
1804
1805        self.check_experiment_access(fid, key)
1806
1807        self.state_lock.acquire()
1808        if self.state.has_key(key):
1809            if self.state[key].has_key('vtopo'):
1810                rv = { 'experiment' : {keytype: key },\
1811                        'vtopo': self.state[key]['vtopo'],\
1812                    }
1813            else:
1814                state = self.state[key]['experimentStatus']
1815        self.state_lock.release()
1816
1817        if rv: return rv
1818        else: 
1819            if state:
1820                raise service_error(service_error.partial, 
1821                        "Not ready: %s" % state)
1822            else:
1823                raise service_error(service_error.req, "No such experiment")
1824
1825    def get_vis(self, req, fid):
1826        """
1827        Return the stored visualization for this experiment
1828        """
1829        rv = None
1830        state = None
1831
1832        req = req.get('VisRequestBody', None)
1833        if not req:
1834            raise service_error(service_error.req,
1835                    "Bad request format (no VisRequestBody)")
1836        exp = req.get('experiment', None)
1837        if exp:
1838            if exp.has_key('fedid'):
1839                key = exp['fedid']
1840                keytype = "fedid"
1841            elif exp.has_key('localname'):
1842                key = exp['localname']
1843                keytype = "localname"
1844            else:
1845                raise service_error(service_error.req, "Unknown lookup type")
1846        else:
1847            raise service_error(service_error.req, "No request?")
1848
1849        self.check_experiment_access(fid, key)
1850
1851        self.state_lock.acquire()
1852        if self.state.has_key(key):
1853            if self.state[key].has_key('vis'):
1854                rv =  { 'experiment' : {keytype: key },\
1855                        'vis': self.state[key]['vis'],\
1856                        }
1857            else:
1858                state = self.state[key]['experimentStatus']
1859        self.state_lock.release()
1860
1861        if rv: return rv
1862        else:
1863            if state:
1864                raise service_error(service_error.partial, 
1865                        "Not ready: %s" % state)
1866            else:
1867                raise service_error(service_error.req, "No such experiment")
1868
1869    def clean_info_response(self, rv):
1870        """
1871        Remove the information in the experiment's state object that is not in
1872        the info response.
1873        """
1874        # Remove the owner info (should always be there, but...)
1875        if rv.has_key('owner'): del rv['owner']
1876
1877        # Convert the log into the allocationLog parameter and remove the
1878        # log entry (with defensive programming)
1879        if rv.has_key('log'):
1880            rv['allocationLog'] = "".join(rv['log'])
1881            del rv['log']
1882        else:
1883            rv['allocationLog'] = ""
1884
1885        if rv['experimentStatus'] != 'active':
1886            if rv.has_key('federant'): del rv['federant']
1887        else:
1888            # remove the allocationID info from each federant
1889            for f in rv.get('federant', []):
1890                if f.has_key('allocID'): del f['allocID']
1891        return rv
1892
1893    def get_info(self, req, fid):
1894        """
1895        Return all the stored info about this experiment
1896        """
1897        rv = None
1898
1899        req = req.get('InfoRequestBody', None)
1900        if not req:
1901            raise service_error(service_error.req,
1902                    "Bad request format (no InfoRequestBody)")
1903        exp = req.get('experiment', None)
1904        if exp:
1905            if exp.has_key('fedid'):
1906                key = exp['fedid']
1907                keytype = "fedid"
1908            elif exp.has_key('localname'):
1909                key = exp['localname']
1910                keytype = "localname"
1911            else:
1912                raise service_error(service_error.req, "Unknown lookup type")
1913        else:
1914            raise service_error(service_error.req, "No request?")
1915
1916        self.check_experiment_access(fid, key)
1917
1918        # The state may be massaged by the service function that called
1919        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1920        # state.
1921        self.state_lock.acquire()
1922        if self.state.has_key(key):
1923            rv = copy.deepcopy(self.state[key])
1924        self.state_lock.release()
1925
1926        if rv:
1927            return self.clean_info_response(rv)
1928        else:
1929            raise service_error(service_error.req, "No such experiment")
1930
1931    def get_multi_info(self, req, fid):
1932        """
1933        Return all the stored info that this fedid can access
1934        """
1935        rv = { 'info': [ ] }
1936
1937        self.state_lock.acquire()
1938        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
1939            self.check_experiment_access(fid, key)
1940
1941            if self.state.has_key(key):
1942                e = copy.deepcopy(self.state[key])
1943                e = self.clean_info_response(e)
1944                rv['info'].append(e)
1945        self.state_lock.release()
1946        return rv
1947
1948    def terminate_experiment(self, req, fid):
1949        """
1950        Swap this experiment out on the federants and delete the shared
1951        information
1952        """
1953        tbparams = { }
1954        req = req.get('TerminateRequestBody', None)
1955        if not req:
1956            raise service_error(service_error.req,
1957                    "Bad request format (no TerminateRequestBody)")
1958        force = req.get('force', False)
1959        exp = req.get('experiment', None)
1960        if exp:
1961            if exp.has_key('fedid'):
1962                key = exp['fedid']
1963                keytype = "fedid"
1964            elif exp.has_key('localname'):
1965                key = exp['localname']
1966                keytype = "localname"
1967            else:
1968                raise service_error(service_error.req, "Unknown lookup type")
1969        else:
1970            raise service_error(service_error.req, "No request?")
1971
1972        self.check_experiment_access(fid, key)
1973
1974        dealloc_list = [ ]
1975
1976
1977        # Create a logger that logs to the dealloc_list as well as to the main
1978        # log file.
1979        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
1980        h = logging.StreamHandler(self.list_log(dealloc_list))
1981        # XXX: there should be a global one of these rather than repeating the
1982        # code.
1983        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1984                    '%d %b %y %H:%M:%S'))
1985        dealloc_log.addHandler(h)
1986
1987        self.state_lock.acquire()
1988        fed_exp = self.state.get(key, None)
1989
1990        if fed_exp:
1991            # This branch of the conditional holds the lock to generate a
1992            # consistent temporary tbparams variable to deallocate experiments.
1993            # It releases the lock to do the deallocations and reacquires it to
1994            # remove the experiment state when the termination is complete.
1995
1996            # First make sure that the experiment creation is complete.
1997            status = fed_exp.get('experimentStatus', None)
1998
1999            if status:
2000                if status in ('starting', 'terminating'):
2001                    if not force:
2002                        self.state_lock.release()
2003                        raise service_error(service_error.partial, 
2004                                'Experiment still being created or destroyed')
2005                    else:
2006                        self.log.warning('Experiment in %s state ' % status + \
2007                                'being terminated by force.')
2008            else:
2009                # No status??? trouble
2010                self.state_lock.release()
2011                raise service_error(service_error.internal,
2012                        "Experiment has no status!?")
2013
2014            ids = []
2015            #  experimentID is a list of dicts that are self-describing
2016            #  identifiers.  This finds all the fedids and localnames - the
2017            #  keys of self.state - and puts them into ids.
2018            for id in fed_exp.get('experimentID', []):
2019                if id.has_key('fedid'): ids.append(id['fedid'])
2020                if id.has_key('localname'): ids.append(id['localname'])
2021
2022            # Collect the allocation/segment ids
2023            for fed in fed_exp.get('federant', []):
2024                try:
2025                    tb = fed['emulab']['project']['testbed']['localname']
2026                    aid = fed['allocID']
2027                except KeyError, e:
2028                    continue
2029                tbparams[tb] = aid
2030            fed_exp['experimentStatus'] = 'terminating'
2031            if self.state_filename: self.write_state()
2032            self.state_lock.release()
2033
2034            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2035            # then completes, so we can't wait if nothing starts.  So, no
2036            # tbparams, no start.
2037            if len(tbparams) > 0:
2038                thread_pool = self.thread_pool(self.nthreads)
2039                for tb in tbparams.keys():
2040                    # Create and start a thread to stop the segment
2041                    thread_pool.wait_for_slot()
2042                    uri = self.tbmap.get(tb, None)
2043                    t  = self.pooled_thread(\
2044                            target=self.terminate_segment(log=dealloc_log,
2045                                testbed=tb,
2046                                cert_file=self.cert_file, 
2047                                cert_pwd=self.cert_pwd,
2048                                trusted_certs=self.trusted_certs,
2049                                caller=self.call_TerminateSegment),
2050                            args=(uri, tbparams[tb]), name=tb,
2051                            pdata=thread_pool, trace_file=self.trace_file)
2052                    t.start()
2053                # Wait for completions
2054                thread_pool.wait_for_all_done()
2055
2056            # release the allocations (failed experiments have done this
2057            # already, and starting experiments may be in odd states, so we
2058            # ignore errors releasing those allocations
2059            try: 
2060                for tb in tbparams.keys():
2061                    self.release_access(tb, tbparams[tb])
2062            except service_error, e:
2063                if status != 'failed' and not force:
2064                    raise e
2065
2066            # Remove the terminated experiment
2067            self.state_lock.acquire()
2068            for id in ids:
2069                if self.state.has_key(id): del self.state[id]
2070
2071            if self.state_filename: self.write_state()
2072            self.state_lock.release()
2073
2074            return { 
2075                    'experiment': exp , 
2076                    'deallocationLog': "".join(dealloc_list),
2077                    }
2078        else:
2079            # Don't forget to release the lock
2080            self.state_lock.release()
2081            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.