source: fedd/federation/experiment_control.py @ 40dd8c1

axis_examplecompt_changesinfo-opsversion-2.00version-3.01version-3.02
Last change on this file since 40dd8c1 was 40dd8c1, checked in by Ted Faber <faber@…>, 15 years ago

make file_copy a utility route so many can use it.

  • Property mode set to 100644
File size: 73.2 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30
31import topdl
32import list_log
33from ip_allocator import ip_allocator
34from ip_addr import ip_addr
35
36
37class nullHandler(logging.Handler):
38    def emit(self, record): pass
39
40fl = logging.getLogger("fedd.experiment_control")
41fl.addHandler(nullHandler())
42
43class experiment_control_local:
44    """
45    Control of experiments that this system can directly access.
46
47    Includes experiment creation, termination and information dissemination.
48    Thred safe.
49    """
50
51    class ssh_cmd_timeout(RuntimeError): pass
52   
53    class thread_pool:
54        """
55        A class to keep track of a set of threads all invoked for the same
56        task.  Manages the mutual exclusion of the states.
57        """
58        def __init__(self, nthreads):
59            """
60            Start a pool.
61            """
62            self.changed = Condition()
63            self.started = 0
64            self.terminated = 0
65            self.nthreads = nthreads
66
67        def acquire(self):
68            """
69            Get the pool's lock.
70            """
71            self.changed.acquire()
72
73        def release(self):
74            """
75            Release the pool's lock.
76            """
77            self.changed.release()
78
79        def wait(self, timeout = None):
80            """
81            Wait for a pool thread to start or stop.
82            """
83            self.changed.wait(timeout)
84
85        def start(self):
86            """
87            Called by a pool thread to report starting.
88            """
89            self.changed.acquire()
90            self.started += 1
91            self.changed.notifyAll()
92            self.changed.release()
93
94        def terminate(self):
95            """
96            Called by a pool thread to report finishing.
97            """
98            self.changed.acquire()
99            self.terminated += 1
100            self.changed.notifyAll()
101            self.changed.release()
102
103        def clear(self):
104            """
105            Clear all pool data.
106            """
107            self.changed.acquire()
108            self.started = 0
109            self.terminated =0
110            self.changed.notifyAll()
111            self.changed.release()
112
113        def wait_for_slot(self):
114            """
115            Wait until we have a free slot to start another pooled thread
116            """
117            self.acquire()
118            while self.started - self.terminated >= self.nthreads:
119                self.wait()
120            self.release()
121
122        def wait_for_all_done(self):
123            """
124            Wait until all active threads finish (and at least one has started)
125            """
126            self.acquire()
127            while self.started == 0 or self.started > self.terminated:
128                self.wait()
129            self.release()
130
131    class pooled_thread(Thread):
132        """
133        One of a set of threads dedicated to a specific task.  Uses the
134        thread_pool class above for coordination.
135        """
136        def __init__(self, group=None, target=None, name=None, args=(), 
137                kwargs={}, pdata=None, trace_file=None):
138            Thread.__init__(self, group, target, name, args, kwargs)
139            self.rv = None          # Return value of the ops in this thread
140            self.exception = None   # Exception that terminated this thread
141            self.target=target      # Target function to run on start()
142            self.args = args        # Args to pass to target
143            self.kwargs = kwargs    # Additional kw args
144            self.pdata = pdata      # thread_pool for this class
145            # Logger for this thread
146            self.log = logging.getLogger("fedd.experiment_control")
147       
148        def run(self):
149            """
150            Emulate Thread.run, except add pool data manipulation and error
151            logging.
152            """
153            if self.pdata:
154                self.pdata.start()
155
156            if self.target:
157                try:
158                    self.rv = self.target(*self.args, **self.kwargs)
159                except service_error, s:
160                    self.exception = s
161                    self.log.error("Thread exception: %s %s" % \
162                            (s.code_string(), s.desc))
163                except:
164                    self.exception = sys.exc_info()[1]
165                    self.log.error(("Unexpected thread exception: %s" +\
166                            "Trace %s") % (self.exception,\
167                                traceback.format_exc()))
168            if self.pdata:
169                self.pdata.terminate()
170
171    call_RequestAccess = service_caller('RequestAccess')
172    call_ReleaseAccess = service_caller('ReleaseAccess')
173    call_StartSegment = service_caller('StartSegment')
174    call_TerminateSegment = service_caller('TerminateSegment')
175    call_Ns2Split = service_caller('Ns2Split')
176
177    def __init__(self, config=None, auth=None):
178        """
179        Intialize the various attributes, most from the config object
180        """
181
182        def parse_tarfile_list(tf):
183            """
184            Parse a tarfile list from the configuration.  This is a set of
185            paths and tarfiles separated by spaces.
186            """
187            rv = [ ]
188            if tf is not None:
189                tl = tf.split()
190                while len(tl) > 1:
191                    p, t = tl[0:2]
192                    del tl[0:2]
193                    rv.append((p, t))
194            return rv
195
196        self.thread_with_rv = experiment_control_local.pooled_thread
197        self.thread_pool = experiment_control_local.thread_pool
198        self.list_log = list_log.list_log
199
200        self.cert_file = config.get("experiment_control", "cert_file")
201        if self.cert_file:
202            self.cert_pwd = config.get("experiment_control", "cert_pwd")
203        else:
204            self.cert_file = config.get("globals", "cert_file")
205            self.cert_pwd = config.get("globals", "cert_pwd")
206
207        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
208                or config.get("globals", "trusted_certs")
209
210        self.repodir = config.get("experiment_control", "repodir")
211
212        self.exp_stem = "fed-stem"
213        self.log = logging.getLogger("fedd.experiment_control")
214        set_log_level(config, "experiment_control", self.log)
215        self.muxmax = 2
216        self.nthreads = 2
217        self.randomize_experiments = False
218
219        self.splitter = None
220        self.ssh_keygen = "/usr/bin/ssh-keygen"
221        self.ssh_identity_file = None
222
223
224        self.debug = config.getboolean("experiment_control", "create_debug")
225        self.state_filename = config.get("experiment_control", 
226                "experiment_state")
227        self.splitter_url = config.get("experiment_control", "splitter_uri")
228        self.fedkit = parse_tarfile_list(\
229                config.get("experiment_control", "fedkit"))
230        self.gatewaykit = parse_tarfile_list(\
231                config.get("experiment_control", "gatewaykit"))
232        accessdb_file = config.get("experiment_control", "accessdb")
233
234        self.ssh_pubkey_file = config.get("experiment_control", 
235                "ssh_pubkey_file")
236        self.ssh_privkey_file = config.get("experiment_control",
237                "ssh_privkey_file")
238        # NB for internal master/slave ops, not experiment setup
239        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
240
241        self.overrides = set([])
242        ovr = config.get('experiment_control', 'overrides')
243        if ovr:
244            for o in ovr.split(","):
245                o = o.strip()
246                if o.startswith('fedid:'): o = o[len('fedid:'):]
247                self.overrides.add(fedid(hexstr=o))
248
249        self.state = { }
250        self.state_lock = Lock()
251        self.tclsh = "/usr/local/bin/otclsh"
252        self.tcl_splitter = config.get("splitter", "tcl_splitter") or \
253                config.get("experiment_control", "tcl_splitter",
254                        "/usr/testbed/lib/ns2ir/parse.tcl")
255        mapdb_file = config.get("experiment_control", "mapdb")
256        self.trace_file = sys.stderr
257
258        self.def_expstart = \
259                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
260                "/tmp/federate";
261        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
262                "FEDDIR/hosts";
263        self.def_gwstart = \
264                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
265                "/tmp/bridge.log";
266        self.def_mgwstart = \
267                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
268                "/tmp/bridge.log";
269        self.def_gwimage = "FBSD61-TUNNEL2";
270        self.def_gwtype = "pc";
271        self.local_access = { }
272
273        if auth:
274            self.auth = auth
275        else:
276            self.log.error(\
277                    "[access]: No authorizer initialized, creating local one.")
278            auth = authorizer()
279
280
281        if self.ssh_pubkey_file:
282            try:
283                f = open(self.ssh_pubkey_file, 'r')
284                self.ssh_pubkey = f.read()
285                f.close()
286            except IOError:
287                raise service_error(service_error.internal,
288                        "Cannot read sshpubkey")
289        else:
290            raise service_error(service_error.internal, 
291                    "No SSH public key file?")
292
293        if not self.ssh_privkey_file:
294            raise service_error(service_error.internal, 
295                    "No SSH public key file?")
296
297
298        if mapdb_file:
299            self.read_mapdb(mapdb_file)
300        else:
301            self.log.warn("[experiment_control] No testbed map, using defaults")
302            self.tbmap = { 
303                    'deter':'https://users.isi.deterlab.net:23235',
304                    'emulab':'https://users.isi.deterlab.net:23236',
305                    'ucb':'https://users.isi.deterlab.net:23237',
306                    }
307
308        if accessdb_file:
309                self.read_accessdb(accessdb_file)
310        else:
311            raise service_error(service_error.internal,
312                    "No accessdb specified in config")
313
314        # Grab saved state.  OK to do this w/o locking because it's read only
315        # and only one thread should be in existence that can see self.state at
316        # this point.
317        if self.state_filename:
318            self.read_state()
319
320        # Dispatch tables
321        self.soap_services = {\
322                'Create': soap_handler('Create', self.create_experiment),
323                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
324                'Vis': soap_handler('Vis', self.get_vis),
325                'Info': soap_handler('Info', self.get_info),
326                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
327                'Terminate': soap_handler('Terminate', 
328                    self.terminate_experiment),
329        }
330
331        self.xmlrpc_services = {\
332                'Create': xmlrpc_handler('Create', self.create_experiment),
333                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
334                'Vis': xmlrpc_handler('Vis', self.get_vis),
335                'Info': xmlrpc_handler('Info', self.get_info),
336                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
337                'Terminate': xmlrpc_handler('Terminate',
338                    self.terminate_experiment),
339        }
340
341    # Call while holding self.state_lock
342    def write_state(self):
343        """
344        Write a new copy of experiment state after copying the existing state
345        to a backup.
346
347        State format is a simple pickling of the state dictionary.
348        """
349        if os.access(self.state_filename, os.W_OK):
350            copy_file(self.state_filename, \
351                    "%s.bak" % self.state_filename)
352        try:
353            f = open(self.state_filename, 'w')
354            pickle.dump(self.state, f)
355        except IOError, e:
356            self.log.error("Can't write file %s: %s" % \
357                    (self.state_filename, e))
358        except pickle.PicklingError, e:
359            self.log.error("Pickling problem: %s" % e)
360        except TypeError, e:
361            self.log.error("Pickling problem (TypeError): %s" % e)
362
363    # Call while holding self.state_lock
364    def read_state(self):
365        """
366        Read a new copy of experiment state.  Old state is overwritten.
367
368        State format is a simple pickling of the state dictionary.
369        """
370       
371        def get_experiment_id(state):
372            """
373            Pull the fedid experimentID out of the saved state.  This is kind
374            of a gross walk through the dict.
375            """
376
377            if state.has_key('experimentID'):
378                for e in state['experimentID']:
379                    if e.has_key('fedid'):
380                        return e['fedid']
381                else:
382                    return None
383            else:
384                return None
385
386        def get_alloc_ids(state):
387            """
388            Pull the fedids of the identifiers of each allocation from the
389            state.  Again, a dict dive that's best isolated.
390            """
391
392            return [ f['allocID']['fedid'] 
393                    for f in state.get('federant',[]) \
394                        if f.has_key('allocID') and \
395                            f['allocID'].has_key('fedid')]
396
397
398        try:
399            f = open(self.state_filename, "r")
400            self.state = pickle.load(f)
401            self.log.debug("[read_state]: Read state from %s" % \
402                    self.state_filename)
403        except IOError, e:
404            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
405                    % (self.state_filename, e))
406        except pickle.UnpicklingError, e:
407            self.log.warning(("[read_state]: No saved state: " + \
408                    "Unpickling failed: %s") % e)
409       
410        for s in self.state.values():
411            try:
412
413                eid = get_experiment_id(s)
414                if eid : 
415                    # Give the owner rights to the experiment
416                    self.auth.set_attribute(s['owner'], eid)
417                    # And holders of the eid as well
418                    self.auth.set_attribute(eid, eid)
419                    # allow overrides to control experiments as well
420                    for o in self.overrides:
421                        self.auth.set_attribute(o, eid)
422                    # Set permissions to allow reading of the software repo, if
423                    # any, as well.
424                    for a in get_alloc_ids(s):
425                        self.auth.set_attribute(a, 'repo/%s' % eid)
426                else:
427                    raise KeyError("No experiment id")
428            except KeyError, e:
429                self.log.warning("[read_state]: State ownership or identity " +\
430                        "misformatted in %s: %s" % (self.state_filename, e))
431
432
433    def read_accessdb(self, accessdb_file):
434        """
435        Read the mapping from fedids that can create experiments to their name
436        in the 3-level access namespace.  All will be asserted from this
437        testbed and can include the local username and porject that will be
438        asserted on their behalf by this fedd.  Each fedid is also added to the
439        authorization system with the "create" attribute.
440        """
441        self.accessdb = {}
442        # These are the regexps for parsing the db
443        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
444        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
445                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
446        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
447                "\s*->\s*(" + name_expr + ")\s*$")
448        lineno = 0
449
450        # Parse the mappings and store in self.authdb, a dict of
451        # fedid -> (proj, user)
452        try:
453            f = open(accessdb_file, "r")
454            for line in f:
455                lineno += 1
456                line = line.strip()
457                if len(line) == 0 or line.startswith('#'):
458                    continue
459                m = project_line.match(line)
460                if m:
461                    fid = fedid(hexstr=m.group(1))
462                    project, user = m.group(2,3)
463                    if not self.accessdb.has_key(fid):
464                        self.accessdb[fid] = []
465                    self.accessdb[fid].append((project, user))
466                    continue
467
468                m = user_line.match(line)
469                if m:
470                    fid = fedid(hexstr=m.group(1))
471                    project = None
472                    user = m.group(2)
473                    if not self.accessdb.has_key(fid):
474                        self.accessdb[fid] = []
475                    self.accessdb[fid].append((project, user))
476                    continue
477                self.log.warn("[experiment_control] Error parsing access " +\
478                        "db %s at line %d" %  (accessdb_file, lineno))
479        except IOError:
480            raise service_error(service_error.internal,
481                    "Error opening/reading %s as experiment " +\
482                            "control accessdb" %  accessdb_file)
483        f.close()
484
485        # Initialize the authorization attributes
486        for fid in self.accessdb.keys():
487            self.auth.set_attribute(fid, 'create')
488
489    def read_mapdb(self, file):
490        """
491        Read a simple colon separated list of mappings for the
492        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
493        """
494
495        self.tbmap = { }
496        lineno =0
497        try:
498            f = open(file, "r")
499            for line in f:
500                lineno += 1
501                line = line.strip()
502                if line.startswith('#') or len(line) == 0:
503                    continue
504                try:
505                    label, url = line.split(':', 1)
506                    self.tbmap[label] = url
507                except ValueError, e:
508                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
509                            "map db: %s %s" % (lineno, line, e))
510        except IOError, e:
511            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
512                    "open %s: %s" % (file, e))
513        f.close()
514       
515    def generate_ssh_keys(self, dest, type="rsa" ):
516        """
517        Generate a set of keys for the gateways to use to talk.
518
519        Keys are of type type and are stored in the required dest file.
520        """
521        valid_types = ("rsa", "dsa")
522        t = type.lower();
523        if t not in valid_types: raise ValueError
524        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
525
526        try:
527            trace = open("/dev/null", "w")
528        except IOError:
529            raise service_error(service_error.internal,
530                    "Cannot open /dev/null??");
531
532        # May raise CalledProcessError
533        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
534        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
535        if rv != 0:
536            raise service_error(service_error.internal, 
537                    "Cannot generate nonce ssh keys.  %s return code %d" \
538                            % (self.ssh_keygen, rv))
539
540    def gentopo(self, str):
541        """
542        Generate the topology dtat structure from the splitter's XML
543        representation of it.
544
545        The topology XML looks like:
546            <experiment>
547                <nodes>
548                    <node><vname></vname><ips>ip1:ip2</ips></node>
549                </nodes>
550                <lans>
551                    <lan>
552                        <vname></vname><vnode></vnode><ip></ip>
553                        <bandwidth></bandwidth><member>node:port</member>
554                    </lan>
555                </lans>
556        """
557        class topo_parse:
558            """
559            Parse the topology XML and create the dats structure.
560            """
561            def __init__(self):
562                # Typing of the subelements for data conversion
563                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
564                self.int_subelements = ( 'bandwidth',)
565                self.float_subelements = ( 'delay',)
566                # The final data structure
567                self.nodes = [ ]
568                self.lans =  [ ]
569                self.topo = { \
570                        'node': self.nodes,\
571                        'lan' : self.lans,\
572                    }
573                self.element = { }  # Current element being created
574                self.chars = ""     # Last text seen
575
576            def end_element(self, name):
577                # After each sub element the contents is added to the current
578                # element or to the appropriate list.
579                if name == 'node':
580                    self.nodes.append(self.element)
581                    self.element = { }
582                elif name == 'lan':
583                    self.lans.append(self.element)
584                    self.element = { }
585                elif name in self.str_subelements:
586                    self.element[name] = self.chars
587                    self.chars = ""
588                elif name in self.int_subelements:
589                    self.element[name] = int(self.chars)
590                    self.chars = ""
591                elif name in self.float_subelements:
592                    self.element[name] = float(self.chars)
593                    self.chars = ""
594
595            def found_chars(self, data):
596                self.chars += data.rstrip()
597
598
599        tp = topo_parse();
600        parser = xml.parsers.expat.ParserCreate()
601        parser.EndElementHandler = tp.end_element
602        parser.CharacterDataHandler = tp.found_chars
603
604        parser.Parse(str)
605
606        return tp.topo
607       
608
609    def genviz(self, topo):
610        """
611        Generate the visualization the virtual topology
612        """
613
614        neato = "/usr/local/bin/neato"
615        # These are used to parse neato output and to create the visualization
616        # file.
617        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="(\d+),(\d+)"')
618        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
619                "%s</type></node>"
620
621        try:
622            # Node names
623            nodes = [ n['vname'] for n in topo['node'] ]
624            topo_lans = topo['lan']
625        except KeyError, e:
626            raise service_error(service_error.internal, "Bad topology: %s" %e)
627
628        lans = { }
629        links = { }
630
631        # Walk through the virtual topology, organizing the connections into
632        # 2-node connections (links) and more-than-2-node connections (lans).
633        # When a lan is created, it's added to the list of nodes (there's a
634        # node in the visualization for the lan).
635        for l in topo_lans:
636            if links.has_key(l['vname']):
637                if len(links[l['vname']]) < 2:
638                    links[l['vname']].append(l['vnode'])
639                else:
640                    nodes.append(l['vname'])
641                    lans[l['vname']] = links[l['vname']]
642                    del links[l['vname']]
643                    lans[l['vname']].append(l['vnode'])
644            elif lans.has_key(l['vname']):
645                lans[l['vname']].append(l['vnode'])
646            else:
647                links[l['vname']] = [ l['vnode'] ]
648
649
650        # Open up a temporary file for dot to turn into a visualization
651        try:
652            df, dotname = tempfile.mkstemp()
653            dotfile = os.fdopen(df, 'w')
654        except IOError:
655            raise service_error(service_error.internal,
656                    "Failed to open file in genviz")
657
658        try:
659            dnull = open('/dev/null', 'w')
660        except IOError:
661            service_error(service_error.internal,
662                    "Failed to open /dev/null in genviz")
663
664        # Generate a dot/neato input file from the links, nodes and lans
665        try:
666            print >>dotfile, "graph G {"
667            for n in nodes:
668                print >>dotfile, '\t"%s"' % n
669            for l in links.keys():
670                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
671            for l in lans.keys():
672                for n in lans[l]:
673                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
674            print >>dotfile, "}"
675            dotfile.close()
676        except TypeError:
677            raise service_error(service_error.internal,
678                    "Single endpoint link in vtopo")
679        except IOError:
680            raise service_error(service_error.internal, "Cannot write dot file")
681
682        # Use dot to create a visualization
683        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
684                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
685                close_fds=True)
686        dnull.close()
687
688        # Translate dot to vis format
689        vis_nodes = [ ]
690        vis = { 'node': vis_nodes }
691        for line in dot.stdout:
692            m = vis_re.match(line)
693            if m:
694                vn = m.group(1)
695                vis_node = {'name': vn, \
696                        'x': float(m.group(2)),\
697                        'y' : float(m.group(3)),\
698                    }
699                if vn in links.keys() or vn in lans.keys():
700                    vis_node['type'] = 'lan'
701                else:
702                    vis_node['type'] = 'node'
703                vis_nodes.append(vis_node)
704        rv = dot.wait()
705
706        os.remove(dotname)
707        if rv == 0 : return vis
708        else: return None
709
710    def get_access(self, tb, nodes, user, tbparam, master, export_project,
711            access_user):
712        """
713        Get access to testbed through fedd and set the parameters for that tb
714        """
715        uri = self.tbmap.get(tb, None)
716        if not uri:
717            raise service_error(serice_error.server_config, 
718                    "Unknown testbed: %s" % tb)
719
720        # currently this lumps all users into one service access group
721        service_keys = [ a for u in user \
722                for a in u.get('access', []) \
723                    if a.has_key('sshPubkey')]
724
725        if len(service_keys) == 0:
726            raise service_error(service_error.req, 
727                    "Must have at least one SSH pubkey for services")
728
729
730        for p, u in access_user:
731            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
732                    "to %s") %  ((p or "None"), u, uri))
733
734            if p:
735                # Request with user and project specified
736                req = {\
737                        'destinationTestbed' : { 'uri' : uri },
738                        'project': { 
739                            'name': {'localname': p},
740                            'user': [ {'userID': { 'localname': u } } ],
741                            },
742                        'user':  user,
743                        'allocID' : { 'localname': 'test' },
744                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
745                        'serviceAccess' : service_keys
746                    }
747            else:
748                # Request with only user specified
749                req = {\
750                        'destinationTestbed' : { 'uri' : uri },
751                        'user':  [ {'userID': { 'localname': u } } ],
752                        'allocID' : { 'localname': 'test' },
753                        'createAccess' : [ { 'sshPubkey' : self.ssh_pubkey } ],
754                        'serviceAccess' : service_keys
755                    }
756
757            if tb == master:
758                # NB, the export_project parameter is a dict that includes
759                # the type
760                req['exportProject'] = export_project
761
762            # node resources if any
763            if nodes != None and len(nodes) > 0:
764                rnodes = [ ]
765                for n in nodes:
766                    rn = { }
767                    image, hw, count = n.split(":")
768                    if image: rn['image'] = [ image ]
769                    if hw: rn['hardware'] = [ hw ]
770                    if count and int(count) >0 : rn['count'] = int(count)
771                    rnodes.append(rn)
772                req['resources']= { }
773                req['resources']['node'] = rnodes
774
775            try:
776                if self.local_access.has_key(uri):
777                    # Local access call
778                    req = { 'RequestAccessRequestBody' : req }
779                    r = self.local_access[uri].RequestAccess(req, 
780                            fedid(file=self.cert_file))
781                    r = { 'RequestAccessResponseBody' : r }
782                else:
783                    r = self.call_RequestAccess(uri, req, 
784                            self.cert_file, self.cert_pwd, self.trusted_certs)
785            except service_error, e:
786                if e.code == service_error.access:
787                    self.log.debug("[get_access] Access denied")
788                    r = None
789                    continue
790                else:
791                    raise e
792
793            if r.has_key('RequestAccessResponseBody'):
794                # Through to here we have a valid response, not a fault.
795                # Access denied is a fault, so something better or worse than
796                # access denied has happened.
797                r = r['RequestAccessResponseBody']
798                self.log.debug("[get_access] Access granted")
799                break
800            else:
801                raise service_error(service_error.protocol,
802                        "Bad proxy response")
803       
804        if not r:
805            raise service_error(service_error.access, 
806                    "Access denied by %s (%s)" % (tb, uri))
807
808        e = r['emulab']
809        p = e['project']
810        tbparam[tb] = { 
811                "boss": e['boss'],
812                "host": e['ops'],
813                "domain": e['domain'],
814                "fs": e['fileServer'],
815                "eventserver": e['eventServer'],
816                "project": unpack_id(p['name']),
817                "emulab" : e,
818                "allocID" : r['allocID'],
819                }
820        # Make the testbed name be the label the user applied
821        p['testbed'] = {'localname': tb }
822
823        for u in p['user']:
824            role = u.get('role', None)
825            if role == 'experimentCreation':
826                tbparam[tb]['user'] = unpack_id(u['userID'])
827                break
828        else:
829            raise service_error(service_error.internal, 
830                    "No createExperimentUser from %s" %tb)
831
832        # Add attributes to barameter space.  We don't allow attributes to
833        # overlay any parameters already installed.
834        for a in e['fedAttr']:
835            try:
836                if a['attribute'] and isinstance(a['attribute'], basestring)\
837                        and not tbparam[tb].has_key(a['attribute'].lower()):
838                    tbparam[tb][a['attribute'].lower()] = a['value']
839            except KeyError:
840                self.log.error("Bad attribute in response: %s" % a)
841       
842    def release_access(self, tb, aid):
843        """
844        Release access to testbed through fedd
845        """
846
847        uri = self.tbmap.get(tb, None)
848        if not uri:
849            raise service_error(serice_error.server_config, 
850                    "Unknown testbed: %s" % tb)
851
852        if self.local_access.has_key(uri):
853            resp = self.local_access[uri].ReleaseAccess(\
854                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
855                    fedid(file=self.cert_file))
856            resp = { 'ReleaseAccessResponseBody': resp } 
857        else:
858            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
859                    self.cert_file, self.cert_pwd, self.trusted_certs)
860
861        # better error coding
862
863    def remote_splitter(self, uri, desc, master):
864
865        req = {
866                'description' : { 'ns2description': desc },
867                'master': master,
868                'include_fedkit': bool(self.fedkit),
869                'include_gatewaykit': bool(self.gatewaykit)
870            }
871
872        r = self.call_Ns2Split(uri, req, self.cert_file, self.cert_pwd, 
873                self.trusted_certs)
874
875        if r.has_key('Ns2SplitResponseBody'):
876            r = r['Ns2SplitResponseBody']
877            if r.has_key('output'):
878                return r['output'].splitlines()
879            else:
880                raise service_error(service_error.protocol, 
881                        "Bad splitter response (no output)")
882        else:
883            raise service_error(service_error.protocol, "Bad splitter response")
884
885    class start_segment:
886        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
887                cert_pwd=None, trusted_certs=None, caller=None,
888                log_collector=None):
889            self.log = log
890            self.debug = debug
891            self.cert_file = cert_file
892            self.cert_pwd = cert_pwd
893            self.trusted_certs = None
894            self.caller = caller
895            self.testbed = testbed
896            self.log_collector = log_collector
897
898        def __call__(self, uri, aid, topo, master, attrs=None):
899            req = {
900                    'allocID': { 'fedid' : aid }, 
901                    'segmentdescription': { 
902                        'topdldescription': topo.to_dict(),
903                    },
904                    'master': master,
905                }
906            if attrs:
907                req['fedAttr'] = attrs
908
909            try:
910                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
911                        self.trusted_certs)
912                if r.has_key('StartSegmentResponseBody'):
913                    lval = r['StartSegmentResponseBody'].get('allocationLog',
914                            None)
915                    if lval and self.log_collector:
916                        for line in  lval.splitlines(True):
917                            self.log_collector.write(line)
918                else:
919                    raise service_error(service_error.internal, 
920                            "Bad response!?: %s" %r)
921                return True
922            except service_error, e:
923                self.log.error("Start segment failed on %s: %s" % \
924                        (self.testbed, e))
925                return False
926
927
928
929    class terminate_segment:
930        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
931                cert_pwd=None, trusted_certs=None, caller=None):
932            self.log = log
933            self.debug = debug
934            self.cert_file = cert_file
935            self.cert_pwd = cert_pwd
936            self.trusted_certs = None
937            self.caller = caller
938            self.testbed = testbed
939
940        def __call__(self, uri, aid ):
941            req = {
942                    'allocID': aid , 
943                }
944            try:
945                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
946                        self.trusted_certs)
947                return True
948            except service_error, e:
949                self.log.error("Terminate segment failed on %s: %s" % \
950                        (self.testbed, e))
951                return False
952   
953
954    def allocate_resources(self, allocated, master, eid, expid, expcert, 
955            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
956            attrs=None):
957        started = { }           # Testbeds where a sub-experiment started
958                                # successfully
959
960        # XXX
961        fail_soft = False
962
963        log = alloc_log or self.log
964
965        thread_pool = self.thread_pool(self.nthreads)
966        threads = [ ]
967
968        for tb in [ k for k in allocated.keys() if k != master]:
969            # Create and start a thread to start the segment, and save it to
970            # get the return value later
971            thread_pool.wait_for_slot()
972            uri = self.tbmap.get(tb, None)
973            if not uri:
974                raise service_error(service_error.internal, 
975                        "Unknown testbed %s !?" % tb)
976
977            if tbparams[tb].has_key('allocID') and \
978                    tbparams[tb]['allocID'].has_key('fedid'):
979                aid = tbparams[tb]['allocID']['fedid']
980            else:
981                raise service_error(service_error.internal, 
982                        "No alloc id for testbed %s !?" % tb)
983
984            t  = self.pooled_thread(\
985                    target=self.start_segment(log=log, debug=self.debug,
986                        testbed=tb, cert_file=self.cert_file,
987                        cert_pwd=self.cert_pwd,
988                        trusted_certs=self.trusted_certs,
989                        caller=self.call_StartSegment,
990                        log_collector=log_collector), 
991                    args=(uri, aid, topo[tb], False, attrs), name=tb,
992                    pdata=thread_pool, trace_file=self.trace_file)
993            threads.append(t)
994            t.start()
995
996        # Wait until all finish
997        thread_pool.wait_for_all_done()
998
999        # If none failed, start the master
1000        failed = [ t.getName() for t in threads if not t.rv ]
1001
1002        if len(failed) == 0:
1003            uri = self.tbmap.get(master, None)
1004            if not uri:
1005                raise service_error(service_error.internal, 
1006                        "Unknown testbed %s !?" % master)
1007
1008            if tbparams[master].has_key('allocID') and \
1009                    tbparams[master]['allocID'].has_key('fedid'):
1010                aid = tbparams[master]['allocID']['fedid']
1011            else:
1012                raise service_error(service_error.internal, 
1013                    "No alloc id for testbed %s !?" % master)
1014            starter = self.start_segment(log=log, debug=self.debug,
1015                    testbed=master, cert_file=self.cert_file,
1016                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1017                    caller=self.call_StartSegment,
1018                    log_collector=log_collector)
1019            if not starter(uri, aid, topo[master], True, attrs):
1020                failed.append(master)
1021
1022        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1023        # If one failed clean up, unless fail_soft is set
1024        if failed and False:
1025            if not fail_soft:
1026                thread_pool.clear()
1027                for tb in succeeded:
1028                    # Create and start a thread to stop the segment
1029                    thread_pool.wait_for_slot()
1030                    t  = self.pooled_thread(\
1031                            target=self.stop_segment(log=log,
1032                                testbed=tb,
1033                                keyfile=self.ssh_privkey_file,
1034                                debug=self.debug), 
1035                            args=(tb, eid, tbparams), name=tb,
1036                            pdata=thread_pool, trace_file=self.trace_file)
1037                    t.start()
1038                # Wait until all finish
1039                thread_pool.wait_for_all_done()
1040
1041                # release the allocations
1042                for tb in tbparams.keys():
1043                    self.release_access(tb, tbparams[tb]['allocID'])
1044                # Remove the placeholder
1045                self.state_lock.acquire()
1046                self.state[eid]['experimentStatus'] = 'failed'
1047                if self.state_filename: self.write_state()
1048                self.state_lock.release()
1049
1050                log.error("Swap in failed on %s" % ",".join(failed))
1051                return
1052        else:
1053            log.info("[start_segment]: Experiment %s active" % eid)
1054
1055        log.debug("[start_experiment]: removing %s" % tmpdir)
1056
1057        # Walk up tmpdir, deleting as we go
1058        for path, dirs, files in os.walk(tmpdir, topdown=False):
1059            for f in files:
1060                os.remove(os.path.join(path, f))
1061            for d in dirs:
1062                os.rmdir(os.path.join(path, d))
1063        os.rmdir(tmpdir)
1064
1065        # Insert the experiment into our state and update the disk copy
1066        self.state_lock.acquire()
1067        self.state[expid]['experimentStatus'] = 'active'
1068        self.state[eid] = self.state[expid]
1069        if self.state_filename: self.write_state()
1070        self.state_lock.release()
1071        return
1072
1073
1074    def add_kit(self, e, kit):
1075        """
1076        Add a Software object created from the list of (install, location)
1077        tuples passed as kit  to the software attribute of an object e.  We
1078        do this enough to break out the code, but it's kind of a hack to
1079        avoid changing the old tuple rep.
1080        """
1081
1082        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1083
1084        if isinstance(e.software, list): e.software.extend(s)
1085        else: e.software = s
1086
1087
1088    def create_experiment_state(self, fid, req, expid, expcert):
1089        """
1090        Create the initial entry in the experiment's state.  The expid and
1091        expcert are the experiment's fedid and certifacte that represents that
1092        ID, which are installed in the experiment state.  If the request
1093        includes a suggested local name that is used if possible.  If the local
1094        name is already taken by an experiment owned by this user that has
1095        failed, it is overwriutten.  Otherwise new letters are added until a
1096        valid localname is found.  The generated local name is returned.
1097        """
1098
1099        if req.has_key('experimentID') and \
1100                req['experimentID'].has_key('localname'):
1101            overwrite = False
1102            eid = req['experimentID']['localname']
1103            # If there's an old failed experiment here with the same local name
1104            # and accessible by this user, we'll overwrite it, otherwise we'll
1105            # fall through and do the collision avoidance.
1106            old_expid = self.get_experiment_fedid(eid)
1107            if old_expid and self.check_experiment_access(fid, old_expid):
1108                self.state_lock.acquire()
1109                status = self.state[eid].get('experimentStatus', None)
1110                if status and status == 'failed':
1111                    # remove the old access attribute
1112                    self.auth.unset_attribute(fid, old_expid)
1113                    overwrite = True
1114                    del self.state[eid]
1115                    del self.state[old_expid]
1116                self.state_lock.release()
1117            self.state_lock.acquire()
1118            while (self.state.has_key(eid) and not overwrite):
1119                eid += random.choice(string.ascii_letters)
1120            # Initial state
1121            self.state[eid] = {
1122                    'experimentID' : \
1123                            [ { 'localname' : eid }, {'fedid': expid } ],
1124                    'experimentStatus': 'starting',
1125                    'experimentAccess': { 'X509' : expcert },
1126                    'owner': fid,
1127                    'log' : [],
1128                }
1129            self.state[expid] = self.state[eid]
1130            if self.state_filename: self.write_state()
1131            self.state_lock.release()
1132        else:
1133            eid = self.exp_stem
1134            for i in range(0,5):
1135                eid += random.choice(string.ascii_letters)
1136            self.state_lock.acquire()
1137            while (self.state.has_key(eid)):
1138                eid = self.exp_stem
1139                for i in range(0,5):
1140                    eid += random.choice(string.ascii_letters)
1141            # Initial state
1142            self.state[eid] = {
1143                    'experimentID' : \
1144                            [ { 'localname' : eid }, {'fedid': expid } ],
1145                    'experimentStatus': 'starting',
1146                    'experimentAccess': { 'X509' : expcert },
1147                    'owner': fid,
1148                    'log' : [],
1149                }
1150            self.state[expid] = self.state[eid]
1151            if self.state_filename: self.write_state()
1152            self.state_lock.release()
1153
1154        return eid
1155
1156
1157    def allocate_ips_to_topo(self, top):
1158        """
1159        Add an ip4_address attribute to all the hosts in teh topology, based on
1160        the shared substrates on which they sit.  An /etc/hosts file is also
1161        created and returned as a list of hostfiles entries.
1162        """
1163        subs = sorted(top.substrates, 
1164                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1165                reverse=True)
1166        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1167        ifs = { }
1168        hosts = [ ]
1169
1170        for idx, s in enumerate(subs):
1171            a = ips.allocate(len(s.interfaces)+2)
1172            if a :
1173                base, num = a
1174                if num < len(s.interfaces) +2 : 
1175                    raise service_error(service_error.internal,
1176                            "Allocator returned wrong number of IPs??")
1177            else:
1178                raise service_error(service_error.req, 
1179                        "Cannot allocate IP addresses")
1180
1181            base += 1
1182            for i in s.interfaces:
1183                i.attribute.append(
1184                        topdl.Attribute('ip4_address', 
1185                            "%s" % ip_addr(base)))
1186                hname = i.element.name[0]
1187                if ifs.has_key(hname):
1188                    hosts.append("%s\t%s-%s %s-%d" % \
1189                            (ip_addr(base), hname, s.name, hname,
1190                                ifs[hname]))
1191                else:
1192                    ifs[hname] = 0
1193                    hosts.append("%s\t%s-%s %s-%d %s" % \
1194                            (ip_addr(base), hname, s.name, hname,
1195                                ifs[hname], hname))
1196
1197                ifs[hname] += 1
1198                base += 1
1199        return hosts
1200
1201    def get_access_to_testbeds(self, testbeds, user, access_user, 
1202            export_project, master, allocated, tbparams):
1203        """
1204        Request access to the various testbeds required for this instantiation
1205        (passed in as testbeds).  User, access_user, expoert_project and master
1206        are used to construct the correct requests.  Per-testbed parameters are
1207        returned in tbparams.
1208        """
1209        for tb in testbeds:
1210            self.get_access(tb, None, user, tbparams, master,
1211                    export_project, access_user)
1212            allocated[tb] = 1
1213
1214    def split_topology(self, top, topo, testbeds, eid, master, tbparams):
1215        """
1216        Create the sub-topologies that are needed for experimetn instantiation.
1217        Along the way attach startup commands to the computers in the
1218        subtopologies.
1219        """
1220        for tb in testbeds:
1221            topo[tb] = top.clone()
1222            to_delete = [ ]
1223            for e in topo[tb].elements:
1224                etb = e.get_attribute('testbed')
1225                if etb and etb != tb:
1226                    for i in e.interface:
1227                        for s in i.subs:
1228                            try:
1229                                s.interfaces.remove(i)
1230                            except ValueError:
1231                                raise service_error(service_error.internal,
1232                                        "Can't remove interface??")
1233                    to_delete.append(e)
1234            for e in to_delete:
1235                topo[tb].elements.remove(e)
1236            topo[tb].make_indices()
1237
1238            for e in [ e for e in topo[tb].elements \
1239                    if isinstance(e,topdl.Computer)]:
1240                if tb == master:
1241                    cmd = 'sudo -H /usr/local/federation/bin/make_hosts /proj/%s/exp/%s/tmp/hosts >& /tmp/federate' % (tbparams[tb].get('project', 'project'), eid)
1242                else:
1243                    cmd = "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& /tmp/federate"
1244                scmd = e.get_attribute('startup')
1245                if scmd:
1246                    cmd = "%s \\$USER '%s'" % (cmd, scmd)
1247
1248                e.set_attribute('startup', cmd)
1249                if self.fedkit: self.add_kit(e, self.fedkit)
1250
1251    def add_portals(self, top, topo, eid, master, tbparams):
1252        """
1253        For each substrate in the main topology, find those that
1254        have nodes on more than one testbed.  Insert portal nodes
1255        into the copies of those substrates on the sub topologies.
1256        """
1257        for s in top.substrates:
1258            # tbs will contain an ip address on this subsrate that is in
1259            # each testbed.
1260            tbs = { }
1261            for i in s.interfaces:
1262                e = i.element
1263                tb = e.get_attribute('testbed')
1264                if tb and not tbs.has_key(tb):
1265                    for i in e.interface:
1266                        if s in i.subs:
1267                            tbs[tb]= i.get_attribute('ip4_address')
1268            if len(tbs) < 2:
1269                continue
1270
1271            # More than one testbed is on this substrate.  Insert
1272            # some portals into the subtopologies.  st == source testbed,
1273            # dt == destination testbed.
1274            segment_substrate = { }
1275            for st in tbs.keys():
1276                segment_substrate[st] = { }
1277                for dt in [ t for t in tbs.keys() if t != st]:
1278                    myname =  "%stunnel" % dt
1279                    desthost  =  "%stunnel" % st
1280                    sproject = tbparams[st].get('project', 'project')
1281                    dproject = tbparams[dt].get('project', 'project')
1282                    mproject = tbparams[master].get('project', 'project')
1283                    sdomain = tbparams[st].get('domain', ".example.com")
1284                    ddomain = tbparams[dt].get('domain', ".example.com")
1285                    mdomain = tbparams[master].get('domain', '.example.com')
1286                    muser = tbparams[master].get('user', 'root')
1287                    smbshare = tbparams[master].get('smbshare', 'USERS')
1288                    # XXX: active and type need to be unkludged
1289                    active = ("%s" % (st == master))
1290                    if not segment_substrate[st].has_key(dt):
1291                        # Put a substrate and a segment for the connected
1292                        # testbed in there.
1293                        tsubstrate = \
1294                                topdl.Substrate(name='%s-%s' % (st, dt),
1295                                        attribute= [
1296                                            topdl.Attribute(
1297                                                attribute='portal',
1298                                                value='true')
1299                                            ]
1300                                        )
1301                        segment_element = topdl.Segment(
1302                                id= tbparams[dt]['allocID'],
1303                                type='emulab',
1304                                uri = self.tbmap.get(dt, None),
1305                                interface=[ 
1306                                    topdl.Interface(
1307                                        substrate=tsubstrate.name),
1308                                    ],
1309                                attribute = [
1310                                    topdl.Attribute(attribute=n, value=v)
1311                                        for n, v in (\
1312                                            ('domain', ddomain),
1313                                            ('experiment', "%s/%s" % \
1314                                                    (dproject, eid)),)
1315                                    ],
1316                                )
1317                        segment_substrate[st][dt] = tsubstrate
1318                        topo[st].substrates.append(tsubstrate)
1319                        topo[st].elements.append(segment_element)
1320                    portal = topdl.Computer(
1321                            name="%stunnel" % dt,
1322                            attribute=[ 
1323                                topdl.Attribute(attribute=n,value=v)
1324                                    for n, v in (\
1325                                        ('portal', 'true'),
1326                                        ('domain', sdomain),
1327                                        ('masterdomain', mdomain),
1328                                        ('masterexperiment', "%s/%s" % \
1329                                                (mproject, eid)),
1330                                        ('masteruser', muser),
1331                                        ('smbshare', smbshare),
1332                                        ('experiment', "%s/%s" % \
1333                                                (sproject, eid)),
1334                                        ('peer', "%s" % desthost),
1335                                        ('peer_segment', "%s" % \
1336                                                tbparams[dt]['allocID']['fedid']),
1337                                        ('scriptdir', 
1338                                            "/usr/local/federation/bin"),
1339                                        ('active', "%s" % active),
1340                                        ('portal_type', 'both'), 
1341                                        ('startup', 'sudo -H /usr/local/federation/bin/fed-tun.pl -f /proj/%s/exp/%s/tmp/%s.%s.%s%s.gw.conf >& /tmp/bridge.log' % (sproject, eid, myname.lower(), eid.lower(), sproject.lower(), sdomain.lower())))
1342                                ],
1343                            interface=[
1344                                topdl.Interface(
1345                                    substrate=s.name,
1346                                    attribute=[ 
1347                                        topdl.Attribute(
1348                                            attribute='ip4_address', 
1349                                            value=tbs[dt]
1350                                        )
1351                                    ]),
1352                                topdl.Interface(
1353                                    substrate=\
1354                                        segment_substrate[st][dt].name,
1355                                    attribute=[
1356                                        topdl.Attribute(attribute='portal',
1357                                            value='true')
1358                                        ]
1359                                    ),
1360                                ],
1361                            )
1362                    if self.fedkit: self.add_kit(portal, self.fedkit)
1363                    if self.gatewaykit: self.add_kit(portal, self.gatewaykit)
1364
1365                    topo[st].elements.append(portal)
1366
1367        # Connect the gateway nodes into the topologies and clear out
1368        # substrates that are not in the topologies
1369        for tb in tbparams.keys():
1370            topo[tb].incorporate_elements()
1371            topo[tb].substrates = \
1372                    [s for s in topo[tb].substrates \
1373                        if len(s.interfaces) >0]
1374
1375    def wrangle_software(self, expid, top, topo, tbparams):
1376        """
1377        Copy software out to the repository directory, allocate permissions and
1378        rewrite the segment topologies to look for the software in local
1379        places.
1380        """
1381
1382        # Copy the rpms and tarfiles to a distribution directory from
1383        # which the federants can retrieve them
1384        linkpath = "%s/software" %  expid
1385        softdir ="%s/%s" % ( self.repodir, linkpath)
1386        softmap = { }
1387        # These are in a list of tuples format (each kit).  This comprehension
1388        # unwraps them into a single list of tuples that initilaizes the set of
1389        # tuples.
1390        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1391                for p, t in l ])
1392        pkgs.update([x.location for e in top.elements \
1393                for x in e.software])
1394        try:
1395            os.makedirs(softdir)
1396        except IOError, e:
1397            raise service_error(
1398                    "Cannot create software directory: %s" % e)
1399        # The actual copying.  Everything's converted into a url for copying.
1400        for pkg in pkgs:
1401            loc = pkg
1402
1403            scheme, host, path = urlparse(loc)[0:3]
1404            dest = os.path.basename(path)
1405            if not scheme:
1406                if not loc.startswith('/'):
1407                    loc = "/%s" % loc
1408                loc = "file://%s" %loc
1409            try:
1410                u = urlopen(loc)
1411            except Exception, e:
1412                raise service_error(service_error.req, 
1413                        "Cannot open %s: %s" % (loc, e))
1414            try:
1415                f = open("%s/%s" % (softdir, dest) , "w")
1416                self.log.debug("Writing %s/%s" % (softdir,dest) )
1417                data = u.read(4096)
1418                while data:
1419                    f.write(data)
1420                    data = u.read(4096)
1421                f.close()
1422                u.close()
1423            except Exception, e:
1424                raise service_error(service_error.internal,
1425                        "Could not copy %s: %s" % (loc, e))
1426            path = re.sub("/tmp", "", linkpath)
1427            # XXX
1428            softmap[pkg] = \
1429                    "https://users.isi.deterlab.net:23232/%s/%s" %\
1430                    ( path, dest)
1431
1432            # Allow the individual segments to access the software.
1433            for tb in tbparams.keys():
1434                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1435                        "/%s/%s" % ( path, dest))
1436
1437        # Convert the software locations in the segments into the local
1438        # copies on this host
1439        for soft in [ s for tb in topo.values() \
1440                for e in tb.elements \
1441                    if getattr(e, 'software', False) \
1442                        for s in e.software ]:
1443            if softmap.has_key(soft.location):
1444                soft.location = softmap[soft.location]
1445
1446
1447    def create_experiment(self, req, fid):
1448        """
1449        The external interface to experiment creation called from the
1450        dispatcher.
1451
1452        Creates a working directory, splits the incoming description using the
1453        splitter script and parses out the avrious subsections using the
1454        lcasses above.  Once each sub-experiment is created, use pooled threads
1455        to instantiate them and start it all up.
1456        """
1457        if not self.auth.check_attribute(fid, 'create'):
1458            raise service_error(service_error.access, "Create access denied")
1459
1460        try:
1461            tmpdir = tempfile.mkdtemp(prefix="split-")
1462            os.mkdir(tmpdir+"/keys")
1463        except IOError:
1464            raise service_error(service_error.internal, "Cannot create tmp dir")
1465
1466        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1467        gw_secretkey_base = "fed.%s" % self.ssh_type
1468        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1469        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1470        tclfile = tmpdir + "/experiment.tcl"
1471        tbparams = { }
1472        try:
1473            access_user = self.accessdb[fid]
1474        except KeyError:
1475            raise service_error(service_error.internal,
1476                    "Access map and authorizer out of sync in " + \
1477                            "create_experiment for fedid %s"  % fid)
1478
1479        pid = "dummy"
1480        gid = "dummy"
1481
1482        req = req.get('CreateRequestBody', None)
1483        if not req:
1484            raise service_error(service_error.req,
1485                    "Bad request format (no CreateRequestBody)")
1486        # The tcl parser needs to read a file so put the content into that file
1487        descr=req.get('experimentdescription', None)
1488        if descr:
1489            file_content=descr.get('ns2description', None)
1490            if file_content:
1491                try:
1492                    f = open(tclfile, 'w')
1493                    f.write(file_content)
1494                    f.close()
1495                except IOError:
1496                    raise service_error(service_error.internal,
1497                            "Cannot write temp experiment description")
1498            else:
1499                raise service_error(service_error.req, 
1500                        "Only ns2descriptions supported")
1501        else:
1502            raise service_error(service_error.req, "No experiment description")
1503
1504        # Generate an ID for the experiment (slice) and a certificate that the
1505        # allocator can use to prove they own it.  We'll ship it back through
1506        # the encrypted connection.
1507        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1508
1509        eid = self.create_experiment_state(fid, req, expid, expcert)
1510        try: 
1511            # This catches exceptions to clear the placeholder if necessary
1512            try:
1513                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1514            except ValueError:
1515                raise service_error(service_error.server_config, 
1516                        "Bad key type (%s)" % self.ssh_type)
1517
1518            user = req.get('user', None)
1519            if user == None:
1520                raise service_error(service_error.req, "No user")
1521
1522            master = req.get('master', None)
1523            if not master:
1524                raise service_error(service_error.req,
1525                        "No master testbed label")
1526            export_project = req.get('exportProject', None)
1527            if not export_project:
1528                raise service_error(service_error.req, "No export project")
1529           
1530            # Translate to topdl
1531            if self.splitter_url:
1532                # XXX: need remote topdl translator
1533                self.log.debug("Calling remote splitter at %s" % \
1534                        self.splitter_url)
1535                split_data = self.remote_splitter(self.splitter_url,
1536                        file_content, master)
1537            else:
1538                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1539                    str(self.muxmax), '-m', master]
1540
1541                if self.fedkit:
1542                    tclcmd.append('-k')
1543
1544                if self.gatewaykit:
1545                    tclcmd.append('-K')
1546
1547                tclcmd.extend([pid, gid, eid, tclfile])
1548
1549                self.log.debug("running local splitter %s", " ".join(tclcmd))
1550                # This is just fantastic.  As a side effect the parser copies
1551                # tb_compat.tcl into the current directory, so that directory
1552                # must be writable by the fedd user.  Doing this in the
1553                # temporary subdir ensures this is the case.
1554                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1555                        cwd=tmpdir)
1556                split_data = tclparser.stdout
1557
1558            top = topdl.topology_from_xml(file=split_data, top="experiment")
1559
1560            hosts = self.allocate_ips_to_topo(top)
1561             # Find the testbeds to look up
1562            testbeds = set([ a.value for e in top.elements \
1563                    for a in e.attribute \
1564                    if a.attribute == 'testbed'] )
1565
1566            allocated = { }         # Testbeds we can access
1567            topo ={ }               # Sub topologies
1568            self.get_access_to_testbeds(testbeds, user, access_user, 
1569                    export_project, master, allocated, tbparams)
1570            self.split_topology(top, topo, testbeds, eid, master, tbparams)
1571
1572            # Copy configuration files into the remote file store
1573            # The config urlpath
1574            configpath = "/%s/config" % expid
1575            # The config file system location
1576            configdir ="%s%s" % ( self.repodir, configpath)
1577            try:
1578                os.makedirs(configdir)
1579            except IOError, e:
1580                raise service_error(
1581                        "Cannot create config directory: %s" % e)
1582            try:
1583                f = open("%s/hosts" % configdir, "w")
1584                f.write('\n'.join(hosts))
1585                f.close()
1586            except IOError, e:
1587                raise service_error(service_error.internal, 
1588                        "Cannot write hosts file: %s" % e)
1589            try:
1590                copy_file("%s" % gw_pubkey, "%s/%s" % \
1591                        (configdir, gw_pubkey_base))
1592                copy_file("%s" % gw_secretkey, "%s/%s" % \
1593                        (configdir, gw_secretkey_base))
1594            except IOError, e:
1595                raise service_error(service_error.internal, 
1596                        "Cannot copy keyfiles: %s" % e)
1597
1598            # Allow the individual testbeds to access the configuration files.
1599            for tb in tbparams.keys():
1600                asignee = tbparams[tb]['allocID']['fedid']
1601                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1602                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1603
1604            self.add_portals(top, topo, eid, master, tbparams)
1605            self.wrangle_software(expid, top, topo, tbparams)
1606
1607            vtopo = topdl.topology_to_vtopo(top)
1608            vis = self.genviz(vtopo)
1609
1610            # save federant information
1611            for k in allocated.keys():
1612                tbparams[k]['federant'] = {\
1613                        'name': [ { 'localname' : eid} ],\
1614                        'emulab': tbparams[k]['emulab'],\
1615                        'allocID' : tbparams[k]['allocID'],\
1616                        'master' : k == master,\
1617                    }
1618
1619            self.state_lock.acquire()
1620            self.state[eid]['vtopo'] = vtopo
1621            self.state[eid]['vis'] = vis
1622            self.state[expid]['federant'] = \
1623                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1624                        if tbparams[tb].has_key('federant') ]
1625            if self.state_filename: 
1626                self.write_state()
1627            self.state_lock.release()
1628        except service_error, e:
1629            # If something goes wrong in the parse (usually an access error)
1630            # clear the placeholder state.  From here on out the code delays
1631            # exceptions.  Failing at this point returns a fault to the remote
1632            # caller.
1633
1634            self.state_lock.acquire()
1635            del self.state[eid]
1636            del self.state[expid]
1637            if self.state_filename: self.write_state()
1638            self.state_lock.release()
1639            raise e
1640
1641
1642        # Start the background swapper and return the starting state.  From
1643        # here on out, the state will stick around a while.
1644
1645        # Let users touch the state
1646        self.auth.set_attribute(fid, expid)
1647        self.auth.set_attribute(expid, expid)
1648        # Override fedids can manipulate state as well
1649        for o in self.overrides:
1650            self.auth.set_attribute(o, expid)
1651
1652        # Create a logger that logs to the experiment's state object as well as
1653        # to the main log file.
1654        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1655        alloc_collector = self.list_log(self.state[eid]['log'])
1656        h = logging.StreamHandler(alloc_collector)
1657        # XXX: there should be a global one of these rather than repeating the
1658        # code.
1659        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1660                    '%d %b %y %H:%M:%S'))
1661        alloc_log.addHandler(h)
1662       
1663        # XXX
1664        url_base = 'https://users.isi.deterlab.net:23232'
1665        attrs = [ 
1666                {
1667                    'attribute': 'ssh_pubkey', 
1668                    'value': '%s/%s/config/%s' % \
1669                            (url_base, expid, gw_pubkey_base)
1670                },
1671                {
1672                    'attribute': 'ssh_secretkey', 
1673                    'value': '%s/%s/config/%s' % \
1674                            (url_base, expid, gw_secretkey_base)
1675                },
1676                {
1677                    'attribute': 'hosts', 
1678                    'value': '%s/%s/config/hosts' % \
1679                            (url_base, expid)
1680                },
1681                {
1682                    'attribute': 'experiment_name',
1683                    'value': eid,
1684                },
1685            ]
1686
1687        # Start a thread to do the resource allocation
1688        t  = Thread(target=self.allocate_resources,
1689                args=(allocated, master, eid, expid, expcert, tbparams, 
1690                    topo, tmpdir, alloc_log, alloc_collector, attrs),
1691                name=eid)
1692        t.start()
1693
1694        rv = {
1695                'experimentID': [
1696                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1697                ],
1698                'experimentStatus': 'starting',
1699                'experimentAccess': { 'X509' : expcert }
1700            }
1701
1702        return rv
1703   
1704    def get_experiment_fedid(self, key):
1705        """
1706        find the fedid associated with the localname key in the state database.
1707        """
1708
1709        rv = None
1710        self.state_lock.acquire()
1711        if self.state.has_key(key):
1712            if isinstance(self.state[key], dict):
1713                try:
1714                    kl = [ f['fedid'] for f in \
1715                            self.state[key]['experimentID']\
1716                                if f.has_key('fedid') ]
1717                except KeyError:
1718                    self.state_lock.release()
1719                    raise service_error(service_error.internal, 
1720                            "No fedid for experiment %s when getting "+\
1721                                    "fedid(!?)" % key)
1722                if len(kl) == 1:
1723                    rv = kl[0]
1724                else:
1725                    self.state_lock.release()
1726                    raise service_error(service_error.internal, 
1727                            "multiple fedids for experiment %s when " +\
1728                                    "getting fedid(!?)" % key)
1729            else:
1730                self.state_lock.release()
1731                raise service_error(service_error.internal, 
1732                        "Unexpected state for %s" % key)
1733        self.state_lock.release()
1734        return rv
1735
1736    def check_experiment_access(self, fid, key):
1737        """
1738        Confirm that the fid has access to the experiment.  Though a request
1739        may be made in terms of a local name, the access attribute is always
1740        the experiment's fedid.
1741        """
1742        if not isinstance(key, fedid):
1743            key = self.get_experiment_fedid(key)
1744
1745        if self.auth.check_attribute(fid, key):
1746            return True
1747        else:
1748            raise service_error(service_error.access, "Access Denied")
1749
1750
1751    def get_handler(self, path, fid):
1752        if self.auth.check_attribute(fid, path):
1753            return ("%s/%s" % (self.repodir, path), "application/binary")
1754        else:
1755            return (None, None)
1756
1757    def get_vtopo(self, req, fid):
1758        """
1759        Return the stored virtual topology for this experiment
1760        """
1761        rv = None
1762        state = None
1763
1764        req = req.get('VtopoRequestBody', None)
1765        if not req:
1766            raise service_error(service_error.req,
1767                    "Bad request format (no VtopoRequestBody)")
1768        exp = req.get('experiment', None)
1769        if exp:
1770            if exp.has_key('fedid'):
1771                key = exp['fedid']
1772                keytype = "fedid"
1773            elif exp.has_key('localname'):
1774                key = exp['localname']
1775                keytype = "localname"
1776            else:
1777                raise service_error(service_error.req, "Unknown lookup type")
1778        else:
1779            raise service_error(service_error.req, "No request?")
1780
1781        self.check_experiment_access(fid, key)
1782
1783        self.state_lock.acquire()
1784        if self.state.has_key(key):
1785            if self.state[key].has_key('vtopo'):
1786                rv = { 'experiment' : {keytype: key },\
1787                        'vtopo': self.state[key]['vtopo'],\
1788                    }
1789            else:
1790                state = self.state[key]['experimentStatus']
1791        self.state_lock.release()
1792
1793        if rv: return rv
1794        else: 
1795            if state:
1796                raise service_error(service_error.partial, 
1797                        "Not ready: %s" % state)
1798            else:
1799                raise service_error(service_error.req, "No such experiment")
1800
1801    def get_vis(self, req, fid):
1802        """
1803        Return the stored visualization for this experiment
1804        """
1805        rv = None
1806        state = None
1807
1808        req = req.get('VisRequestBody', None)
1809        if not req:
1810            raise service_error(service_error.req,
1811                    "Bad request format (no VisRequestBody)")
1812        exp = req.get('experiment', None)
1813        if exp:
1814            if exp.has_key('fedid'):
1815                key = exp['fedid']
1816                keytype = "fedid"
1817            elif exp.has_key('localname'):
1818                key = exp['localname']
1819                keytype = "localname"
1820            else:
1821                raise service_error(service_error.req, "Unknown lookup type")
1822        else:
1823            raise service_error(service_error.req, "No request?")
1824
1825        self.check_experiment_access(fid, key)
1826
1827        self.state_lock.acquire()
1828        if self.state.has_key(key):
1829            if self.state[key].has_key('vis'):
1830                rv =  { 'experiment' : {keytype: key },\
1831                        'vis': self.state[key]['vis'],\
1832                        }
1833            else:
1834                state = self.state[key]['experimentStatus']
1835        self.state_lock.release()
1836
1837        if rv: return rv
1838        else:
1839            if state:
1840                raise service_error(service_error.partial, 
1841                        "Not ready: %s" % state)
1842            else:
1843                raise service_error(service_error.req, "No such experiment")
1844
1845    def clean_info_response(self, rv):
1846        """
1847        Remove the information in the experiment's state object that is not in
1848        the info response.
1849        """
1850        # Remove the owner info (should always be there, but...)
1851        if rv.has_key('owner'): del rv['owner']
1852
1853        # Convert the log into the allocationLog parameter and remove the
1854        # log entry (with defensive programming)
1855        if rv.has_key('log'):
1856            rv['allocationLog'] = "".join(rv['log'])
1857            del rv['log']
1858        else:
1859            rv['allocationLog'] = ""
1860
1861        if rv['experimentStatus'] != 'active':
1862            if rv.has_key('federant'): del rv['federant']
1863        else:
1864            # remove the allocationID info from each federant
1865            for f in rv.get('federant', []):
1866                if f.has_key('allocID'): del f['allocID']
1867        return rv
1868
1869    def get_info(self, req, fid):
1870        """
1871        Return all the stored info about this experiment
1872        """
1873        rv = None
1874
1875        req = req.get('InfoRequestBody', None)
1876        if not req:
1877            raise service_error(service_error.req,
1878                    "Bad request format (no InfoRequestBody)")
1879        exp = req.get('experiment', None)
1880        if exp:
1881            if exp.has_key('fedid'):
1882                key = exp['fedid']
1883                keytype = "fedid"
1884            elif exp.has_key('localname'):
1885                key = exp['localname']
1886                keytype = "localname"
1887            else:
1888                raise service_error(service_error.req, "Unknown lookup type")
1889        else:
1890            raise service_error(service_error.req, "No request?")
1891
1892        self.check_experiment_access(fid, key)
1893
1894        # The state may be massaged by the service function that called
1895        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1896        # state.
1897        self.state_lock.acquire()
1898        if self.state.has_key(key):
1899            rv = copy.deepcopy(self.state[key])
1900        self.state_lock.release()
1901
1902        if rv:
1903            return self.clean_info_response(rv)
1904        else:
1905            raise service_error(service_error.req, "No such experiment")
1906
1907    def get_multi_info(self, req, fid):
1908        """
1909        Return all the stored info that this fedid can access
1910        """
1911        rv = { 'info': [ ] }
1912
1913        self.state_lock.acquire()
1914        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
1915            self.check_experiment_access(fid, key)
1916
1917            if self.state.has_key(key):
1918                e = copy.deepcopy(self.state[key])
1919                e = self.clean_info_response(e)
1920                rv['info'].append(e)
1921        self.state_lock.release()
1922        return rv
1923
1924    def terminate_experiment(self, req, fid):
1925        """
1926        Swap this experiment out on the federants and delete the shared
1927        information
1928        """
1929        tbparams = { }
1930        req = req.get('TerminateRequestBody', None)
1931        if not req:
1932            raise service_error(service_error.req,
1933                    "Bad request format (no TerminateRequestBody)")
1934        force = req.get('force', False)
1935        exp = req.get('experiment', None)
1936        if exp:
1937            if exp.has_key('fedid'):
1938                key = exp['fedid']
1939                keytype = "fedid"
1940            elif exp.has_key('localname'):
1941                key = exp['localname']
1942                keytype = "localname"
1943            else:
1944                raise service_error(service_error.req, "Unknown lookup type")
1945        else:
1946            raise service_error(service_error.req, "No request?")
1947
1948        self.check_experiment_access(fid, key)
1949
1950        dealloc_list = [ ]
1951
1952
1953        # Create a logger that logs to the dealloc_list as well as to the main
1954        # log file.
1955        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
1956        h = logging.StreamHandler(self.list_log(dealloc_list))
1957        # XXX: there should be a global one of these rather than repeating the
1958        # code.
1959        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1960                    '%d %b %y %H:%M:%S'))
1961        dealloc_log.addHandler(h)
1962
1963        self.state_lock.acquire()
1964        fed_exp = self.state.get(key, None)
1965
1966        if fed_exp:
1967            # This branch of the conditional holds the lock to generate a
1968            # consistent temporary tbparams variable to deallocate experiments.
1969            # It releases the lock to do the deallocations and reacquires it to
1970            # remove the experiment state when the termination is complete.
1971
1972            # First make sure that the experiment creation is complete.
1973            status = fed_exp.get('experimentStatus', None)
1974
1975            if status:
1976                if status in ('starting', 'terminating'):
1977                    if not force:
1978                        self.state_lock.release()
1979                        raise service_error(service_error.partial, 
1980                                'Experiment still being created or destroyed')
1981                    else:
1982                        self.log.warning('Experiment in %s state ' % status + \
1983                                'being terminated by force.')
1984            else:
1985                # No status??? trouble
1986                self.state_lock.release()
1987                raise service_error(service_error.internal,
1988                        "Experiment has no status!?")
1989
1990            ids = []
1991            #  experimentID is a list of dicts that are self-describing
1992            #  identifiers.  This finds all the fedids and localnames - the
1993            #  keys of self.state - and puts them into ids.
1994            for id in fed_exp.get('experimentID', []):
1995                if id.has_key('fedid'): ids.append(id['fedid'])
1996                if id.has_key('localname'): ids.append(id['localname'])
1997
1998            # Collect the allocation/segment ids
1999            for fed in fed_exp.get('federant', []):
2000                try:
2001                    tb = fed['emulab']['project']['testbed']['localname']
2002                    aid = fed['allocID']
2003                except KeyError, e:
2004                    continue
2005                tbparams[tb] = aid
2006            fed_exp['experimentStatus'] = 'terminating'
2007            if self.state_filename: self.write_state()
2008            self.state_lock.release()
2009
2010            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2011            # then completes, so we can't wait if nothing starts.  So, no
2012            # tbparams, no start.
2013            if len(tbparams) > 0:
2014                thread_pool = self.thread_pool(self.nthreads)
2015                for tb in tbparams.keys():
2016                    # Create and start a thread to stop the segment
2017                    thread_pool.wait_for_slot()
2018                    uri = self.tbmap.get(tb, None)
2019                    t  = self.pooled_thread(\
2020                            target=self.terminate_segment(log=dealloc_log,
2021                                testbed=tb,
2022                                cert_file=self.cert_file, 
2023                                cert_pwd=self.cert_pwd,
2024                                trusted_certs=self.trusted_certs,
2025                                caller=self.call_TerminateSegment),
2026                            args=(uri, tbparams[tb]), name=tb,
2027                            pdata=thread_pool, trace_file=self.trace_file)
2028                    t.start()
2029                # Wait for completions
2030                thread_pool.wait_for_all_done()
2031
2032            # release the allocations (failed experiments have done this
2033            # already, and starting experiments may be in odd states, so we
2034            # ignore errors releasing those allocations
2035            try: 
2036                for tb in tbparams.keys():
2037                    self.release_access(tb, tbparams[tb])
2038            except service_error, e:
2039                if status != 'failed' and not force:
2040                    raise e
2041
2042            # Remove the terminated experiment
2043            self.state_lock.acquire()
2044            for id in ids:
2045                if self.state.has_key(id): del self.state[id]
2046
2047            if self.state_filename: self.write_state()
2048            self.state_lock.release()
2049
2050            return { 
2051                    'experiment': exp , 
2052                    'deallocationLog': "".join(dealloc_list),
2053                    }
2054        else:
2055            # Don't forget to release the lock
2056            self.state_lock.release()
2057            raise service_error(service_error.req, "No saved state")
Note: See TracBrowser for help on using the repository browser.