source: fedd/federation/experiment_control.py @ 0c4b12c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 0c4b12c was 0c4b12c, checked in by Ted Faber <faber@…>, 14 years ago

more de mastering

  • Property mode set to 100644
File size: 78.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45class experiment_control_local:
46    """
47    Control of experiments that this system can directly access.
48
49    Includes experiment creation, termination and information dissemination.
50    Thred safe.
51    """
52
53    class ssh_cmd_timeout(RuntimeError): pass
54   
55    class thread_pool:
56        """
57        A class to keep track of a set of threads all invoked for the same
58        task.  Manages the mutual exclusion of the states.
59        """
60        def __init__(self, nthreads):
61            """
62            Start a pool.
63            """
64            self.changed = Condition()
65            self.started = 0
66            self.terminated = 0
67            self.nthreads = nthreads
68
69        def acquire(self):
70            """
71            Get the pool's lock.
72            """
73            self.changed.acquire()
74
75        def release(self):
76            """
77            Release the pool's lock.
78            """
79            self.changed.release()
80
81        def wait(self, timeout = None):
82            """
83            Wait for a pool thread to start or stop.
84            """
85            self.changed.wait(timeout)
86
87        def start(self):
88            """
89            Called by a pool thread to report starting.
90            """
91            self.changed.acquire()
92            self.started += 1
93            self.changed.notifyAll()
94            self.changed.release()
95
96        def terminate(self):
97            """
98            Called by a pool thread to report finishing.
99            """
100            self.changed.acquire()
101            self.terminated += 1
102            self.changed.notifyAll()
103            self.changed.release()
104
105        def clear(self):
106            """
107            Clear all pool data.
108            """
109            self.changed.acquire()
110            self.started = 0
111            self.terminated =0
112            self.changed.notifyAll()
113            self.changed.release()
114
115        def wait_for_slot(self):
116            """
117            Wait until we have a free slot to start another pooled thread
118            """
119            self.acquire()
120            while self.started - self.terminated >= self.nthreads:
121                self.wait()
122            self.release()
123
124        def wait_for_all_done(self, timeout=None):
125            """
126            Wait until all active threads finish (and at least one has
127            started).  If a timeout is given, return after waiting that long
128            for termination.  If all threads are done (and one has started in
129            the since the last clear()) return True, otherwise False.
130            """
131            if timeout:
132                deadline = time.time() + timeout
133            self.acquire()
134            while self.started == 0 or self.started > self.terminated:
135                self.wait(timeout)
136                if timeout:
137                    if time.time() > deadline:
138                        break
139                    timeout = deadline - time.time()
140            self.release()
141            return not (self.started == 0 or self.started > self.terminated)
142
143    class pooled_thread(Thread):
144        """
145        One of a set of threads dedicated to a specific task.  Uses the
146        thread_pool class above for coordination.
147        """
148        def __init__(self, group=None, target=None, name=None, args=(), 
149                kwargs={}, pdata=None, trace_file=None):
150            Thread.__init__(self, group, target, name, args, kwargs)
151            self.rv = None          # Return value of the ops in this thread
152            self.exception = None   # Exception that terminated this thread
153            self.target=target      # Target function to run on start()
154            self.args = args        # Args to pass to target
155            self.kwargs = kwargs    # Additional kw args
156            self.pdata = pdata      # thread_pool for this class
157            # Logger for this thread
158            self.log = logging.getLogger("fedd.experiment_control")
159       
160        def run(self):
161            """
162            Emulate Thread.run, except add pool data manipulation and error
163            logging.
164            """
165            if self.pdata:
166                self.pdata.start()
167
168            if self.target:
169                try:
170                    self.rv = self.target(*self.args, **self.kwargs)
171                except service_error, s:
172                    self.exception = s
173                    self.log.error("Thread exception: %s %s" % \
174                            (s.code_string(), s.desc))
175                except:
176                    self.exception = sys.exc_info()[1]
177                    self.log.error(("Unexpected thread exception: %s" +\
178                            "Trace %s") % (self.exception,\
179                                traceback.format_exc()))
180            if self.pdata:
181                self.pdata.terminate()
182
183    call_RequestAccess = service_caller('RequestAccess')
184    call_ReleaseAccess = service_caller('ReleaseAccess')
185    call_StartSegment = service_caller('StartSegment')
186    call_TerminateSegment = service_caller('TerminateSegment')
187    call_Ns2Topdl = service_caller('Ns2Topdl')
188
189    def __init__(self, config=None, auth=None):
190        """
191        Intialize the various attributes, most from the config object
192        """
193
194        def parse_tarfile_list(tf):
195            """
196            Parse a tarfile list from the configuration.  This is a set of
197            paths and tarfiles separated by spaces.
198            """
199            rv = [ ]
200            if tf is not None:
201                tl = tf.split()
202                while len(tl) > 1:
203                    p, t = tl[0:2]
204                    del tl[0:2]
205                    rv.append((p, t))
206            return rv
207
208        self.thread_with_rv = experiment_control_local.pooled_thread
209        self.thread_pool = experiment_control_local.thread_pool
210        self.list_log = list_log.list_log
211
212        self.cert_file = config.get("experiment_control", "cert_file")
213        if self.cert_file:
214            self.cert_pwd = config.get("experiment_control", "cert_pwd")
215        else:
216            self.cert_file = config.get("globals", "cert_file")
217            self.cert_pwd = config.get("globals", "cert_pwd")
218
219        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
220                or config.get("globals", "trusted_certs")
221
222        self.repodir = config.get("experiment_control", "repodir")
223        self.repo_url = config.get("experiment_control", "repo_url", 
224                "https://users.isi.deterlab.net:23235");
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 10
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.cleanup = not config.getboolean("experiment_control", 
240                "leave_tmpfiles")
241        self.state_filename = config.get("experiment_control", 
242                "experiment_state")
243        self.store_filename = config.get("experiment_control", 
244                "synch_store")
245        self.store_url = config.get("experiment_control", "store_url")
246        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
247        self.fedkit = parse_tarfile_list(\
248                config.get("experiment_control", "fedkit"))
249        self.gatewaykit = parse_tarfile_list(\
250                config.get("experiment_control", "gatewaykit"))
251        accessdb_file = config.get("experiment_control", "accessdb")
252
253        self.ssh_pubkey_file = config.get("experiment_control", 
254                "ssh_pubkey_file")
255        self.ssh_privkey_file = config.get("experiment_control",
256                "ssh_privkey_file")
257        # NB for internal master/slave ops, not experiment setup
258        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
259
260        self.overrides = set([])
261        ovr = config.get('experiment_control', 'overrides')
262        if ovr:
263            for o in ovr.split(","):
264                o = o.strip()
265                if o.startswith('fedid:'): o = o[len('fedid:'):]
266                self.overrides.add(fedid(hexstr=o))
267
268        self.state = { }
269        self.state_lock = Lock()
270        self.tclsh = "/usr/local/bin/otclsh"
271        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
272                config.get("experiment_control", "tcl_splitter",
273                        "/usr/testbed/lib/ns2ir/parse.tcl")
274        mapdb_file = config.get("experiment_control", "mapdb")
275        self.trace_file = sys.stderr
276
277        self.def_expstart = \
278                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
279                "/tmp/federate";
280        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
281                "FEDDIR/hosts";
282        self.def_gwstart = \
283                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
284                "/tmp/bridge.log";
285        self.def_mgwstart = \
286                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
287                "/tmp/bridge.log";
288        self.def_gwimage = "FBSD61-TUNNEL2";
289        self.def_gwtype = "pc";
290        self.local_access = { }
291
292        if auth:
293            self.auth = auth
294        else:
295            self.log.error(\
296                    "[access]: No authorizer initialized, creating local one.")
297            auth = authorizer()
298
299
300        if self.ssh_pubkey_file:
301            try:
302                f = open(self.ssh_pubkey_file, 'r')
303                self.ssh_pubkey = f.read()
304                f.close()
305            except IOError:
306                raise service_error(service_error.internal,
307                        "Cannot read sshpubkey")
308        else:
309            raise service_error(service_error.internal, 
310                    "No SSH public key file?")
311
312        if not self.ssh_privkey_file:
313            raise service_error(service_error.internal, 
314                    "No SSH public key file?")
315
316
317        if mapdb_file:
318            self.read_mapdb(mapdb_file)
319        else:
320            self.log.warn("[experiment_control] No testbed map, using defaults")
321            self.tbmap = { 
322                    'deter':'https://users.isi.deterlab.net:23235',
323                    'emulab':'https://users.isi.deterlab.net:23236',
324                    'ucb':'https://users.isi.deterlab.net:23237',
325                    }
326
327        if accessdb_file:
328                self.read_accessdb(accessdb_file)
329        else:
330            raise service_error(service_error.internal,
331                    "No accessdb specified in config")
332
333        # Grab saved state.  OK to do this w/o locking because it's read only
334        # and only one thread should be in existence that can see self.state at
335        # this point.
336        if self.state_filename:
337            self.read_state()
338
339        if self.store_filename:
340            self.read_store()
341        else:
342            self.log.warning("No saved synch store")
343            self.synch_store = synch_store
344
345        # Dispatch tables
346        self.soap_services = {\
347                'New': soap_handler('New', self.new_experiment),
348                'Create': soap_handler('Create', self.create_experiment),
349                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
350                'Vis': soap_handler('Vis', self.get_vis),
351                'Info': soap_handler('Info', self.get_info),
352                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
353                'Terminate': soap_handler('Terminate', 
354                    self.terminate_experiment),
355                'GetValue': soap_handler('GetValue', self.GetValue),
356                'SetValue': soap_handler('SetValue', self.SetValue),
357        }
358
359        self.xmlrpc_services = {\
360                'New': xmlrpc_handler('New', self.new_experiment),
361                'Create': xmlrpc_handler('Create', self.create_experiment),
362                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
363                'Vis': xmlrpc_handler('Vis', self.get_vis),
364                'Info': xmlrpc_handler('Info', self.get_info),
365                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
366                'Terminate': xmlrpc_handler('Terminate',
367                    self.terminate_experiment),
368                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
369                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
370        }
371
372    # Call while holding self.state_lock
373    def write_state(self):
374        """
375        Write a new copy of experiment state after copying the existing state
376        to a backup.
377
378        State format is a simple pickling of the state dictionary.
379        """
380        if os.access(self.state_filename, os.W_OK):
381            copy_file(self.state_filename, \
382                    "%s.bak" % self.state_filename)
383        try:
384            f = open(self.state_filename, 'w')
385            pickle.dump(self.state, f)
386        except IOError, e:
387            self.log.error("Can't write file %s: %s" % \
388                    (self.state_filename, e))
389        except pickle.PicklingError, e:
390            self.log.error("Pickling problem: %s" % e)
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    @staticmethod
395    def get_alloc_ids(state):
396        """
397        Pull the fedids of the identifiers of each allocation from the
398        state.  Again, a dict dive that's best isolated.
399
400        Used by read_store and read state
401        """
402
403        return [ f['allocID']['fedid'] 
404                for f in state.get('federant',[]) \
405                    if f.has_key('allocID') and \
406                        f['allocID'].has_key('fedid')]
407
408    # Call while holding self.state_lock
409    def read_state(self):
410        """
411        Read a new copy of experiment state.  Old state is overwritten.
412
413        State format is a simple pickling of the state dictionary.
414        """
415       
416        def get_experiment_id(state):
417            """
418            Pull the fedid experimentID out of the saved state.  This is kind
419            of a gross walk through the dict.
420            """
421
422            if state.has_key('experimentID'):
423                for e in state['experimentID']:
424                    if e.has_key('fedid'):
425                        return e['fedid']
426                else:
427                    return None
428            else:
429                return None
430
431        try:
432            f = open(self.state_filename, "r")
433            self.state = pickle.load(f)
434            self.log.debug("[read_state]: Read state from %s" % \
435                    self.state_filename)
436        except IOError, e:
437            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
438                    % (self.state_filename, e))
439        except pickle.UnpicklingError, e:
440            self.log.warning(("[read_state]: No saved state: " + \
441                    "Unpickling failed: %s") % e)
442       
443        for s in self.state.values():
444            try:
445
446                eid = get_experiment_id(s)
447                if eid : 
448                    # Give the owner rights to the experiment
449                    self.auth.set_attribute(s['owner'], eid)
450                    # And holders of the eid as well
451                    self.auth.set_attribute(eid, eid)
452                    # allow overrides to control experiments as well
453                    for o in self.overrides:
454                        self.auth.set_attribute(o, eid)
455                    # Set permissions to allow reading of the software repo, if
456                    # any, as well.
457                    for a in self.get_alloc_ids(s):
458                        self.auth.set_attribute(a, 'repo/%s' % eid)
459                else:
460                    raise KeyError("No experiment id")
461            except KeyError, e:
462                self.log.warning("[read_state]: State ownership or identity " +\
463                        "misformatted in %s: %s" % (self.state_filename, e))
464
465
466    def read_accessdb(self, accessdb_file):
467        """
468        Read the mapping from fedids that can create experiments to their name
469        in the 3-level access namespace.  All will be asserted from this
470        testbed and can include the local username and porject that will be
471        asserted on their behalf by this fedd.  Each fedid is also added to the
472        authorization system with the "create" attribute.
473        """
474        self.accessdb = {}
475        # These are the regexps for parsing the db
476        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
477        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
478                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
479        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
480                "\s*->\s*(" + name_expr + ")\s*$")
481        lineno = 0
482
483        # Parse the mappings and store in self.authdb, a dict of
484        # fedid -> (proj, user)
485        try:
486            f = open(accessdb_file, "r")
487            for line in f:
488                lineno += 1
489                line = line.strip()
490                if len(line) == 0 or line.startswith('#'):
491                    continue
492                m = project_line.match(line)
493                if m:
494                    fid = fedid(hexstr=m.group(1))
495                    project, user = m.group(2,3)
496                    if not self.accessdb.has_key(fid):
497                        self.accessdb[fid] = []
498                    self.accessdb[fid].append((project, user))
499                    continue
500
501                m = user_line.match(line)
502                if m:
503                    fid = fedid(hexstr=m.group(1))
504                    project = None
505                    user = m.group(2)
506                    if not self.accessdb.has_key(fid):
507                        self.accessdb[fid] = []
508                    self.accessdb[fid].append((project, user))
509                    continue
510                self.log.warn("[experiment_control] Error parsing access " +\
511                        "db %s at line %d" %  (accessdb_file, lineno))
512        except IOError:
513            raise service_error(service_error.internal,
514                    "Error opening/reading %s as experiment " +\
515                            "control accessdb" %  accessdb_file)
516        f.close()
517
518        # Initialize the authorization attributes
519        for fid in self.accessdb.keys():
520            self.auth.set_attribute(fid, 'create')
521            self.auth.set_attribute(fid, 'new')
522
523    def read_mapdb(self, file):
524        """
525        Read a simple colon separated list of mappings for the
526        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
527        """
528
529        self.tbmap = { }
530        lineno =0
531        try:
532            f = open(file, "r")
533            for line in f:
534                lineno += 1
535                line = line.strip()
536                if line.startswith('#') or len(line) == 0:
537                    continue
538                try:
539                    label, url = line.split(':', 1)
540                    self.tbmap[label] = url
541                except ValueError, e:
542                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
543                            "map db: %s %s" % (lineno, line, e))
544        except IOError, e:
545            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
546                    "open %s: %s" % (file, e))
547        f.close()
548
549    def read_store(self):
550        try:
551            self.synch_store = synch_store()
552            self.synch_store.load(self.store_filename)
553            self.log.debug("[read_store]: Read store from %s" % \
554                    self.store_filename)
555        except IOError, e:
556            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
557                    % (self.state_filename, e))
558            self.synch_store = synch_store()
559
560        # Set the initial permissions on data in the store.  XXX: This ad hoc
561        # authorization attribute initialization is getting out of hand.
562        for k in self.synch_store.all_keys():
563            try:
564                if k.startswith('fedid:'):
565                    fid = fedid(hexstr=k[6:46])
566                    if self.state.has_key(fid):
567                        for a in self.get_alloc_ids(self.state[fid]):
568                            self.auth.set_attribute(a, k)
569            except ValueError, e:
570                self.log.warn("Cannot deduce permissions for %s" % k)
571
572
573    def write_store(self):
574        """
575        Write a new copy of synch_store after writing current state
576        to a backup.  We use the internal synch_store pickle method to avoid
577        incinsistent data.
578
579        State format is a simple pickling of the store.
580        """
581        if os.access(self.store_filename, os.W_OK):
582            copy_file(self.store_filename, \
583                    "%s.bak" % self.store_filename)
584        try:
585            self.synch_store.save(self.store_filename)
586        except IOError, e:
587            self.log.error("Can't write file %s: %s" % \
588                    (self.store_filename, e))
589        except TypeError, e:
590            self.log.error("Pickling problem (TypeError): %s" % e)
591
592       
593    def generate_ssh_keys(self, dest, type="rsa" ):
594        """
595        Generate a set of keys for the gateways to use to talk.
596
597        Keys are of type type and are stored in the required dest file.
598        """
599        valid_types = ("rsa", "dsa")
600        t = type.lower();
601        if t not in valid_types: raise ValueError
602        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
603
604        try:
605            trace = open("/dev/null", "w")
606        except IOError:
607            raise service_error(service_error.internal,
608                    "Cannot open /dev/null??");
609
610        # May raise CalledProcessError
611        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
612        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
613        if rv != 0:
614            raise service_error(service_error.internal, 
615                    "Cannot generate nonce ssh keys.  %s return code %d" \
616                            % (self.ssh_keygen, rv))
617
618    def gentopo(self, str):
619        """
620        Generate the topology dtat structure from the splitter's XML
621        representation of it.
622
623        The topology XML looks like:
624            <experiment>
625                <nodes>
626                    <node><vname></vname><ips>ip1:ip2</ips></node>
627                </nodes>
628                <lans>
629                    <lan>
630                        <vname></vname><vnode></vnode><ip></ip>
631                        <bandwidth></bandwidth><member>node:port</member>
632                    </lan>
633                </lans>
634        """
635        class topo_parse:
636            """
637            Parse the topology XML and create the dats structure.
638            """
639            def __init__(self):
640                # Typing of the subelements for data conversion
641                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
642                self.int_subelements = ( 'bandwidth',)
643                self.float_subelements = ( 'delay',)
644                # The final data structure
645                self.nodes = [ ]
646                self.lans =  [ ]
647                self.topo = { \
648                        'node': self.nodes,\
649                        'lan' : self.lans,\
650                    }
651                self.element = { }  # Current element being created
652                self.chars = ""     # Last text seen
653
654            def end_element(self, name):
655                # After each sub element the contents is added to the current
656                # element or to the appropriate list.
657                if name == 'node':
658                    self.nodes.append(self.element)
659                    self.element = { }
660                elif name == 'lan':
661                    self.lans.append(self.element)
662                    self.element = { }
663                elif name in self.str_subelements:
664                    self.element[name] = self.chars
665                    self.chars = ""
666                elif name in self.int_subelements:
667                    self.element[name] = int(self.chars)
668                    self.chars = ""
669                elif name in self.float_subelements:
670                    self.element[name] = float(self.chars)
671                    self.chars = ""
672
673            def found_chars(self, data):
674                self.chars += data.rstrip()
675
676
677        tp = topo_parse();
678        parser = xml.parsers.expat.ParserCreate()
679        parser.EndElementHandler = tp.end_element
680        parser.CharacterDataHandler = tp.found_chars
681
682        parser.Parse(str)
683
684        return tp.topo
685       
686
687    def genviz(self, topo):
688        """
689        Generate the visualization the virtual topology
690        """
691
692        neato = "/usr/local/bin/neato"
693        # These are used to parse neato output and to create the visualization
694        # file.
695        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
696        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
697                "%s</type></node>"
698
699        try:
700            # Node names
701            nodes = [ n['vname'] for n in topo['node'] ]
702            topo_lans = topo['lan']
703        except KeyError, e:
704            raise service_error(service_error.internal, "Bad topology: %s" %e)
705
706        lans = { }
707        links = { }
708
709        # Walk through the virtual topology, organizing the connections into
710        # 2-node connections (links) and more-than-2-node connections (lans).
711        # When a lan is created, it's added to the list of nodes (there's a
712        # node in the visualization for the lan).
713        for l in topo_lans:
714            if links.has_key(l['vname']):
715                if len(links[l['vname']]) < 2:
716                    links[l['vname']].append(l['vnode'])
717                else:
718                    nodes.append(l['vname'])
719                    lans[l['vname']] = links[l['vname']]
720                    del links[l['vname']]
721                    lans[l['vname']].append(l['vnode'])
722            elif lans.has_key(l['vname']):
723                lans[l['vname']].append(l['vnode'])
724            else:
725                links[l['vname']] = [ l['vnode'] ]
726
727
728        # Open up a temporary file for dot to turn into a visualization
729        try:
730            df, dotname = tempfile.mkstemp()
731            dotfile = os.fdopen(df, 'w')
732        except IOError:
733            raise service_error(service_error.internal,
734                    "Failed to open file in genviz")
735
736        try:
737            dnull = open('/dev/null', 'w')
738        except IOError:
739            service_error(service_error.internal,
740                    "Failed to open /dev/null in genviz")
741
742        # Generate a dot/neato input file from the links, nodes and lans
743        try:
744            print >>dotfile, "graph G {"
745            for n in nodes:
746                print >>dotfile, '\t"%s"' % n
747            for l in links.keys():
748                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
749            for l in lans.keys():
750                for n in lans[l]:
751                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
752            print >>dotfile, "}"
753            dotfile.close()
754        except TypeError:
755            raise service_error(service_error.internal,
756                    "Single endpoint link in vtopo")
757        except IOError:
758            raise service_error(service_error.internal, "Cannot write dot file")
759
760        # Use dot to create a visualization
761        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
762                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
763                close_fds=True)
764        dnull.close()
765
766        # Translate dot to vis format
767        vis_nodes = [ ]
768        vis = { 'node': vis_nodes }
769        for line in dot.stdout:
770            m = vis_re.match(line)
771            if m:
772                vn = m.group(1)
773                vis_node = {'name': vn, \
774                        'x': float(m.group(2)),\
775                        'y' : float(m.group(3)),\
776                    }
777                if vn in links.keys() or vn in lans.keys():
778                    vis_node['type'] = 'lan'
779                else:
780                    vis_node['type'] = 'node'
781                vis_nodes.append(vis_node)
782        rv = dot.wait()
783
784        os.remove(dotname)
785        if rv == 0 : return vis
786        else: return None
787
788    def get_access(self, tb, nodes, tbparam, master, export_project,
789            access_user, services):
790        """
791        Get access to testbed through fedd and set the parameters for that tb
792        """
793        uri = self.tbmap.get(tb, None)
794        if not uri:
795            raise service_error(service_error.server_config, 
796                    "Unknown testbed: %s" % tb)
797
798        # Tweak search order so that if there are entries in access_user that
799        # have a project matching the export project, we try them first
800        if export_project: 
801            access_sequence = [ (p, u) for p, u in access_user \
802                    if p == export_project] 
803            access_sequence.extend([(p, u) for p, u in access_user \
804                    if p != export_project]) 
805        else: 
806            access_sequence = access_user
807
808        for p, u in access_sequence: 
809            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
810                    "to %s") %  ((p or "None"), u, uri))
811
812            if p:
813                # Request with user and project specified
814                req = {\
815                        'destinationTestbed' : { 'uri' : uri },
816                        'credential': [ "project: %s" % p, "user: %s"  % u],
817                        'allocID' : { 'localname': 'test' },
818                    }
819            else:
820                # Request with only user specified
821                req = {\
822                        'destinationTestbed' : { 'uri' : uri },
823                        'credential': [ 'user: %s' % u ],
824                        'user':  [ {'userID': { 'localname': u } } ],
825                        'allocID' : { 'localname': 'test' },
826                    }
827
828            # If there is a master, and this is it, ask it to export services
829            # XXX move this to export pseudo-service
830            if tb == master:
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r:
889            for s in r['service']:
890                # Tag each service with the origin testbed
891                if s.has_key('fedAttr'):
892                    # The else connects with the for
893                    for a in s['fedAttr']:
894                        if a.get('attribute', None) == 'testbed':
895                            break
896                    else:
897                        s['fedAttr'].append({'attribute': 'testbed',
898                            'value': tb})
899                else:
900                    s['fedAttr'] = [ {'attribute': 'testbed', 'value': tb} ]
901            services.extend(r['service'])
902
903        # Add attributes to parameter space.  We don't allow attributes to
904        # overlay any parameters already installed.
905        for a in r.get('fedAttr', []):
906            try:
907                if a['attribute'] and \
908                        isinstance(a['attribute'], basestring)\
909                        and not tbparam[tb].has_key(a['attribute'].lower()):
910                    tbparam[tb][a['attribute'].lower()] = a['value']
911            except KeyError:
912                self.log.error("Bad attribute in response: %s" % a)
913
914    def release_access(self, tb, aid, uri=None):
915        """
916        Release access to testbed through fedd
917        """
918
919        if not uri:
920            uri = self.tbmap.get(tb, None)
921        if not uri:
922            raise service_error(service_error.server_config, 
923                    "Unknown testbed: %s" % tb)
924
925        if self.local_access.has_key(uri):
926            resp = self.local_access[uri].ReleaseAccess(\
927                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
928                    fedid(file=self.cert_file))
929            resp = { 'ReleaseAccessResponseBody': resp } 
930        else:
931            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
932                    self.cert_file, self.cert_pwd, self.trusted_certs)
933
934        # better error coding
935
936    def remote_ns2topdl(self, uri, desc):
937
938        req = {
939                'description' : { 'ns2description': desc },
940            }
941
942        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
943                self.trusted_certs)
944
945        if r.has_key('Ns2TopdlResponseBody'):
946            r = r['Ns2TopdlResponseBody']
947            ed = r.get('experimentdescription', None)
948            if ed.has_key('topdldescription'):
949                return topdl.Topology(**ed['topdldescription'])
950            else:
951                raise service_error(service_error.protocol, 
952                        "Bad splitter response (no output)")
953        else:
954            raise service_error(service_error.protocol, "Bad splitter response")
955
956    class start_segment:
957        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
958                cert_pwd=None, trusted_certs=None, caller=None,
959                log_collector=None):
960            self.log = log
961            self.debug = debug
962            self.cert_file = cert_file
963            self.cert_pwd = cert_pwd
964            self.trusted_certs = None
965            self.caller = caller
966            self.testbed = testbed
967            self.log_collector = log_collector
968            self.response = None
969
970        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
971                services=None):
972            req = {
973                    'allocID': { 'fedid' : aid }, 
974                    'segmentdescription': { 
975                        'topdldescription': topo.to_dict(),
976                    },
977                    'master': master,
978                }
979
980            if connInfo:
981                req['connection'] = connInfo
982            # Add services to request.  The master exports, everyone else
983            # imports.
984            if services:
985                svcs = [ x.copy() for x in services]
986                for s in svcs:
987                    if master: s['visibility'] = 'export'
988                    else: s['visibility'] = 'import'
989                req['service'] = svcs
990            if attrs:
991                req['fedAttr'] = attrs
992
993            try:
994                self.log.debug("Calling StartSegment at %s " % uri)
995                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
996                        self.trusted_certs)
997                if r.has_key('StartSegmentResponseBody'):
998                    lval = r['StartSegmentResponseBody'].get('allocationLog',
999                            None)
1000                    if lval and self.log_collector:
1001                        for line in  lval.splitlines(True):
1002                            self.log_collector.write(line)
1003                    self.response = r
1004                else:
1005                    raise service_error(service_error.internal, 
1006                            "Bad response!?: %s" %r)
1007                return True
1008            except service_error, e:
1009                self.log.error("Start segment failed on %s: %s" % \
1010                        (self.testbed, e))
1011                return False
1012
1013
1014
1015    class terminate_segment:
1016        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1017                cert_pwd=None, trusted_certs=None, caller=None):
1018            self.log = log
1019            self.debug = debug
1020            self.cert_file = cert_file
1021            self.cert_pwd = cert_pwd
1022            self.trusted_certs = None
1023            self.caller = caller
1024            self.testbed = testbed
1025
1026        def __call__(self, uri, aid ):
1027            req = {
1028                    'allocID': aid , 
1029                }
1030            try:
1031                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1032                        self.trusted_certs)
1033                return True
1034            except service_error, e:
1035                self.log.error("Terminate segment failed on %s: %s" % \
1036                        (self.testbed, e))
1037                return False
1038   
1039
1040    def allocate_resources(self, allocated, master, eid, expid, 
1041            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1042            attrs=None, connInfo={}, services=[]):
1043
1044        started = { }           # Testbeds where a sub-experiment started
1045                                # successfully
1046
1047        # XXX
1048        fail_soft = False
1049
1050        log = alloc_log or self.log
1051
1052        thread_pool = self.thread_pool(self.nthreads)
1053        threads = [ ]
1054
1055        for tb in allocated.keys():
1056            # Create and start a thread to start the segment, and save it
1057            # to get the return value later
1058            thread_pool.wait_for_slot()
1059            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1060            if not uri:
1061                raise service_error(service_error.internal, 
1062                        "Unknown testbed %s !?" % tb)
1063
1064            if tbparams[tb].has_key('allocID') and \
1065                    tbparams[tb]['allocID'].has_key('fedid'):
1066                aid = tbparams[tb]['allocID']['fedid']
1067            else:
1068                raise service_error(service_error.internal, 
1069                        "No alloc id for testbed %s !?" % tb)
1070
1071            t  = self.pooled_thread(\
1072                    target=self.start_segment(log=log, debug=self.debug,
1073                        testbed=tb, cert_file=self.cert_file,
1074                        cert_pwd=self.cert_pwd,
1075                        trusted_certs=self.trusted_certs,
1076                        caller=self.call_StartSegment,
1077                        log_collector=log_collector), 
1078                    args=(uri, aid, topo[tb], tb == master, 
1079                        attrs, connInfo[tb], services),
1080                    name=tb,
1081                    pdata=thread_pool, trace_file=self.trace_file)
1082            threads.append(t)
1083            t.start()
1084
1085        # Wait until all finish (keep pinging the log, though)
1086        mins = 0
1087        revoked = False
1088        while not thread_pool.wait_for_all_done(60.0):
1089            mins += 1
1090            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1091                    % mins)
1092            if not revoked and \
1093                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1094                # a testbed has failed.  Revoke this experiment's
1095                # synchronizarion values so that sub experiments will not
1096                # deadlock waiting for synchronization that will never happen
1097                self.log.info("A subexperiment has failed to swap in, " + \
1098                        "revoking synch keys")
1099                var_key = "fedid:%s" % expid
1100                for k in self.synch_store.all_keys():
1101                    if len(k) > 45 and k[0:46] == var_key:
1102                        self.synch_store.revoke_key(k)
1103                revoked = True
1104
1105        failed = [ t.getName() for t in threads if not t.rv ]
1106        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1107
1108        # If one failed clean up, unless fail_soft is set
1109        if failed:
1110            if not fail_soft:
1111                thread_pool.clear()
1112                for tb in succeeded:
1113                    # Create and start a thread to stop the segment
1114                    thread_pool.wait_for_slot()
1115                    uri = tbparams[tb]['uri']
1116                    t  = self.pooled_thread(\
1117                            target=self.terminate_segment(log=log,
1118                                testbed=tb,
1119                                cert_file=self.cert_file, 
1120                                cert_pwd=self.cert_pwd,
1121                                trusted_certs=self.trusted_certs,
1122                                caller=self.call_TerminateSegment),
1123                            args=(uri, tbparams[tb]['federant']['allocID']),
1124                            name=tb,
1125                            pdata=thread_pool, trace_file=self.trace_file)
1126                    t.start()
1127                # Wait until all finish (if any are being stopped)
1128                if succeeded:
1129                    thread_pool.wait_for_all_done()
1130
1131                # release the allocations
1132                for tb in tbparams.keys():
1133                    self.release_access(tb, tbparams[tb]['allocID'],
1134                            tbparams[tb].get('uri', None))
1135                # Remove the placeholder
1136                self.state_lock.acquire()
1137                self.state[eid]['experimentStatus'] = 'failed'
1138                if self.state_filename: self.write_state()
1139                self.state_lock.release()
1140
1141                log.error("Swap in failed on %s" % ",".join(failed))
1142                return
1143        else:
1144            log.info("[start_segment]: Experiment %s active" % eid)
1145
1146
1147        # Walk up tmpdir, deleting as we go
1148        if self.cleanup:
1149            log.debug("[start_experiment]: removing %s" % tmpdir)
1150            for path, dirs, files in os.walk(tmpdir, topdown=False):
1151                for f in files:
1152                    os.remove(os.path.join(path, f))
1153                for d in dirs:
1154                    os.rmdir(os.path.join(path, d))
1155            os.rmdir(tmpdir)
1156        else:
1157            log.debug("[start_experiment]: not removing %s" % tmpdir)
1158
1159        # Insert the experiment into our state and update the disk copy
1160        self.state_lock.acquire()
1161        self.state[expid]['experimentStatus'] = 'active'
1162        self.state[eid] = self.state[expid]
1163        if self.state_filename: self.write_state()
1164        self.state_lock.release()
1165        return
1166
1167
1168    def add_kit(self, e, kit):
1169        """
1170        Add a Software object created from the list of (install, location)
1171        tuples passed as kit  to the software attribute of an object e.  We
1172        do this enough to break out the code, but it's kind of a hack to
1173        avoid changing the old tuple rep.
1174        """
1175
1176        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1177
1178        if isinstance(e.software, list): e.software.extend(s)
1179        else: e.software = s
1180
1181
1182    def create_experiment_state(self, fid, req, expid, expcert, 
1183            state='starting'):
1184        """
1185        Create the initial entry in the experiment's state.  The expid and
1186        expcert are the experiment's fedid and certifacte that represents that
1187        ID, which are installed in the experiment state.  If the request
1188        includes a suggested local name that is used if possible.  If the local
1189        name is already taken by an experiment owned by this user that has
1190        failed, it is overwritten.  Otherwise new letters are added until a
1191        valid localname is found.  The generated local name is returned.
1192        """
1193
1194        if req.has_key('experimentID') and \
1195                req['experimentID'].has_key('localname'):
1196            overwrite = False
1197            eid = req['experimentID']['localname']
1198            # If there's an old failed experiment here with the same local name
1199            # and accessible by this user, we'll overwrite it, otherwise we'll
1200            # fall through and do the collision avoidance.
1201            old_expid = self.get_experiment_fedid(eid)
1202            if old_expid and self.check_experiment_access(fid, old_expid):
1203                self.state_lock.acquire()
1204                status = self.state[eid].get('experimentStatus', None)
1205                if status and status == 'failed':
1206                    # remove the old access attribute
1207                    self.auth.unset_attribute(fid, old_expid)
1208                    overwrite = True
1209                    del self.state[eid]
1210                    del self.state[old_expid]
1211                self.state_lock.release()
1212            self.state_lock.acquire()
1213            while (self.state.has_key(eid) and not overwrite):
1214                eid += random.choice(string.ascii_letters)
1215            # Initial state
1216            self.state[eid] = {
1217                    'experimentID' : \
1218                            [ { 'localname' : eid }, {'fedid': expid } ],
1219                    'experimentStatus': state,
1220                    'experimentAccess': { 'X509' : expcert },
1221                    'owner': fid,
1222                    'log' : [],
1223                }
1224            self.state[expid] = self.state[eid]
1225            if self.state_filename: self.write_state()
1226            self.state_lock.release()
1227        else:
1228            eid = self.exp_stem
1229            for i in range(0,5):
1230                eid += random.choice(string.ascii_letters)
1231            self.state_lock.acquire()
1232            while (self.state.has_key(eid)):
1233                eid = self.exp_stem
1234                for i in range(0,5):
1235                    eid += random.choice(string.ascii_letters)
1236            # Initial state
1237            self.state[eid] = {
1238                    'experimentID' : \
1239                            [ { 'localname' : eid }, {'fedid': expid } ],
1240                    'experimentStatus': state,
1241                    'experimentAccess': { 'X509' : expcert },
1242                    'owner': fid,
1243                    'log' : [],
1244                }
1245            self.state[expid] = self.state[eid]
1246            if self.state_filename: self.write_state()
1247            self.state_lock.release()
1248
1249        return eid
1250
1251
1252    def allocate_ips_to_topo(self, top):
1253        """
1254        Add an ip4_address attribute to all the hosts in the topology, based on
1255        the shared substrates on which they sit.  An /etc/hosts file is also
1256        created and returned as a list of hostfiles entries.  We also return
1257        the allocator, because we may need to allocate IPs to portals
1258        (specifically DRAGON portals).
1259        """
1260        subs = sorted(top.substrates, 
1261                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1262                reverse=True)
1263        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1264        ifs = { }
1265        hosts = [ ]
1266
1267        for idx, s in enumerate(subs):
1268            net_size = len(s.interfaces)+2
1269
1270            a = ips.allocate(net_size)
1271            if a :
1272                base, num = a
1273                if num < net_size: 
1274                    raise service_error(service_error.internal,
1275                            "Allocator returned wrong number of IPs??")
1276            else:
1277                raise service_error(service_error.req, 
1278                        "Cannot allocate IP addresses")
1279            mask = ips.min_alloc
1280            while mask < net_size:
1281                mask *= 2
1282
1283            netmask = ((2**32-1) ^ (mask-1))
1284
1285            base += 1
1286            for i in s.interfaces:
1287                i.attribute.append(
1288                        topdl.Attribute('ip4_address', 
1289                            "%s" % ip_addr(base)))
1290                i.attribute.append(
1291                        topdl.Attribute('ip4_netmask', 
1292                            "%s" % ip_addr(int(netmask))))
1293
1294                hname = i.element.name[0]
1295                if ifs.has_key(hname):
1296                    hosts.append("%s\t%s-%s %s-%d" % \
1297                            (ip_addr(base), hname, s.name, hname,
1298                                ifs[hname]))
1299                else:
1300                    ifs[hname] = 0
1301                    hosts.append("%s\t%s-%s %s-%d %s" % \
1302                            (ip_addr(base), hname, s.name, hname,
1303                                ifs[hname], hname))
1304
1305                ifs[hname] += 1
1306                base += 1
1307        return hosts, ips
1308
1309    def get_access_to_testbeds(self, testbeds, access_user, 
1310            export_project, master, allocated, tbparams, services):
1311        """
1312        Request access to the various testbeds required for this instantiation
1313        (passed in as testbeds).  User, access_user, expoert_project and master
1314        are used to construct the correct requests.  Per-testbed parameters are
1315        returned in tbparams.
1316        """
1317        for tb in testbeds:
1318            self.get_access(tb, None, tbparams, master,
1319                    export_project, access_user, services)
1320            allocated[tb] = 1
1321
1322    def split_topology(self, top, topo, testbeds):
1323        """
1324        Create the sub-topologies that are needed for experiment instantiation.
1325        """
1326        for tb in testbeds:
1327            topo[tb] = top.clone()
1328            # copy in for loop allows deletions from the original
1329            for e in [ e for e in topo[tb].elements]:
1330                etb = e.get_attribute('testbed')
1331                # NB: elements without a testbed attribute won't appear in any
1332                # sub topologies. 
1333                if not etb or etb != tb:
1334                    for i in e.interface:
1335                        for s in i.subs:
1336                            try:
1337                                s.interfaces.remove(i)
1338                            except ValueError:
1339                                raise service_error(service_error.internal,
1340                                        "Can't remove interface??")
1341                    topo[tb].elements.remove(e)
1342            topo[tb].make_indices()
1343
1344    def wrangle_software(self, expid, top, topo, tbparams):
1345        """
1346        Copy software out to the repository directory, allocate permissions and
1347        rewrite the segment topologies to look for the software in local
1348        places.
1349        """
1350
1351        # Copy the rpms and tarfiles to a distribution directory from
1352        # which the federants can retrieve them
1353        linkpath = "%s/software" %  expid
1354        softdir ="%s/%s" % ( self.repodir, linkpath)
1355        softmap = { }
1356        # These are in a list of tuples format (each kit).  This comprehension
1357        # unwraps them into a single list of tuples that initilaizes the set of
1358        # tuples.
1359        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1360                for p, t in l ])
1361        pkgs.update([x.location for e in top.elements \
1362                for x in e.software])
1363        try:
1364            os.makedirs(softdir)
1365        except IOError, e:
1366            raise service_error(
1367                    "Cannot create software directory: %s" % e)
1368        # The actual copying.  Everything's converted into a url for copying.
1369        for pkg in pkgs:
1370            loc = pkg
1371
1372            scheme, host, path = urlparse(loc)[0:3]
1373            dest = os.path.basename(path)
1374            if not scheme:
1375                if not loc.startswith('/'):
1376                    loc = "/%s" % loc
1377                loc = "file://%s" %loc
1378            try:
1379                u = urlopen(loc)
1380            except Exception, e:
1381                raise service_error(service_error.req, 
1382                        "Cannot open %s: %s" % (loc, e))
1383            try:
1384                f = open("%s/%s" % (softdir, dest) , "w")
1385                self.log.debug("Writing %s/%s" % (softdir,dest) )
1386                data = u.read(4096)
1387                while data:
1388                    f.write(data)
1389                    data = u.read(4096)
1390                f.close()
1391                u.close()
1392            except Exception, e:
1393                raise service_error(service_error.internal,
1394                        "Could not copy %s: %s" % (loc, e))
1395            path = re.sub("/tmp", "", linkpath)
1396            # XXX
1397            softmap[pkg] = \
1398                    "%s/%s/%s" %\
1399                    ( self.repo_url, path, dest)
1400
1401            # Allow the individual segments to access the software.
1402            for tb in tbparams.keys():
1403                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1404                        "/%s/%s" % ( path, dest))
1405
1406        # Convert the software locations in the segments into the local
1407        # copies on this host
1408        for soft in [ s for tb in topo.values() \
1409                for e in tb.elements \
1410                    if getattr(e, 'software', False) \
1411                        for s in e.software ]:
1412            if softmap.has_key(soft.location):
1413                soft.location = softmap[soft.location]
1414
1415
1416    def new_experiment(self, req, fid):
1417        """
1418        The external interface to empty initial experiment creation called from
1419        the dispatcher.
1420
1421        Creates a working directory, splits the incoming description using the
1422        splitter script and parses out the avrious subsections using the
1423        lcasses above.  Once each sub-experiment is created, use pooled threads
1424        to instantiate them and start it all up.
1425        """
1426        if not self.auth.check_attribute(fid, 'new'):
1427            raise service_error(service_error.access, "New access denied")
1428
1429        try:
1430            tmpdir = tempfile.mkdtemp(prefix="split-")
1431        except IOError:
1432            raise service_error(service_error.internal, "Cannot create tmp dir")
1433
1434        try:
1435            access_user = self.accessdb[fid]
1436        except KeyError:
1437            raise service_error(service_error.internal,
1438                    "Access map and authorizer out of sync in " + \
1439                            "new_experiment for fedid %s"  % fid)
1440
1441        pid = "dummy"
1442        gid = "dummy"
1443
1444        req = req.get('NewRequestBody', None)
1445        if not req:
1446            raise service_error(service_error.req,
1447                    "Bad request format (no NewRequestBody)")
1448
1449        # Generate an ID for the experiment (slice) and a certificate that the
1450        # allocator can use to prove they own it.  We'll ship it back through
1451        # the encrypted connection.
1452        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1453
1454        #now we're done with the tmpdir, and it should be empty
1455        if self.cleanup:
1456            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1457            os.rmdir(tmpdir)
1458        else:
1459            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1460
1461        eid = self.create_experiment_state(fid, req, expid, expcert, 
1462                state='empty')
1463
1464        # Let users touch the state
1465        self.auth.set_attribute(fid, expid)
1466        self.auth.set_attribute(expid, expid)
1467        # Override fedids can manipulate state as well
1468        for o in self.overrides:
1469            self.auth.set_attribute(o, expid)
1470
1471        rv = {
1472                'experimentID': [
1473                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1474                ],
1475                'experimentStatus': 'empty',
1476                'experimentAccess': { 'X509' : expcert }
1477            }
1478
1479        return rv
1480
1481    def get_master_project(self, req):
1482        master= None
1483        export_project = None
1484        for e in [ s for s in req.get('service', []) \
1485                if s.get('name') == 'project_export']:
1486            for a in e.get('fedAttr', []):
1487                attr = a.get('attribute', None)
1488                if attr == 'testbed':
1489                    master = a.get('value', None)
1490                elif attr == 'project':
1491                    export_project = a.get('value', None)
1492       
1493        return (master, export_project)
1494
1495
1496
1497    def create_experiment(self, req, fid):
1498        """
1499        The external interface to experiment creation called from the
1500        dispatcher.
1501
1502        Creates a working directory, splits the incoming description using the
1503        splitter script and parses out the avrious subsections using the
1504        lcasses above.  Once each sub-experiment is created, use pooled threads
1505        to instantiate them and start it all up.
1506        """
1507
1508        req = req.get('CreateRequestBody', None)
1509        if not req:
1510            raise service_error(service_error.req,
1511                    "Bad request format (no CreateRequestBody)")
1512
1513        # Get the experiment access
1514        exp = req.get('experimentID', None)
1515        if exp:
1516            if exp.has_key('fedid'):
1517                key = exp['fedid']
1518                expid = key
1519                eid = None
1520            elif exp.has_key('localname'):
1521                key = exp['localname']
1522                eid = key
1523                expid = None
1524            else:
1525                raise service_error(service_error.req, "Unknown lookup type")
1526        else:
1527            raise service_error(service_error.req, "No request?")
1528
1529        self.check_experiment_access(fid, key)
1530
1531        try:
1532            tmpdir = tempfile.mkdtemp(prefix="split-")
1533            os.mkdir(tmpdir+"/keys")
1534        except IOError:
1535            raise service_error(service_error.internal, "Cannot create tmp dir")
1536
1537        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1538        gw_secretkey_base = "fed.%s" % self.ssh_type
1539        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1540        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1541        tclfile = tmpdir + "/experiment.tcl"
1542        tbparams = { }
1543        try:
1544            access_user = self.accessdb[fid]
1545        except KeyError:
1546            raise service_error(service_error.internal,
1547                    "Access map and authorizer out of sync in " + \
1548                            "create_experiment for fedid %s"  % fid)
1549
1550        pid = "dummy"
1551        gid = "dummy"
1552
1553        # The tcl parser needs to read a file so put the content into that file
1554        descr=req.get('experimentdescription', None)
1555        if descr:
1556            file_content=descr.get('ns2description', None)
1557            if file_content:
1558                try:
1559                    f = open(tclfile, 'w')
1560                    f.write(file_content)
1561                    f.close()
1562                except IOError:
1563                    raise service_error(service_error.internal,
1564                            "Cannot write temp experiment description")
1565            else:
1566                raise service_error(service_error.req, 
1567                        "Only ns2descriptions supported")
1568        else:
1569            raise service_error(service_error.req, "No experiment description")
1570
1571        self.state_lock.acquire()
1572        if self.state.has_key(key):
1573            self.state[key]['experimentStatus'] = "starting"
1574            for e in self.state[key].get('experimentID',[]):
1575                if not expid and e.has_key('fedid'):
1576                    expid = e['fedid']
1577                elif not eid and e.has_key('localname'):
1578                    eid = e['localname']
1579        self.state_lock.release()
1580
1581        if not (eid and expid):
1582            raise service_error(service_error.internal, 
1583                    "Cannot find local experiment info!?")
1584
1585        try: 
1586            # This catches exceptions to clear the placeholder if necessary
1587            try:
1588                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1589            except ValueError:
1590                raise service_error(service_error.server_config, 
1591                        "Bad key type (%s)" % self.ssh_type)
1592            master, export_project = self.get_master_project(req)
1593            # XXX get these out when master and project are optional
1594            if not master:
1595                raise service_error(service_error.req,
1596                        "No master testbed label")
1597            if not export_project:
1598                raise service_error(service_error.req, "No export project")
1599            # XXX
1600
1601            # Translate to topdl
1602            if self.splitter_url:
1603                # XXX: need remote topdl translator
1604                self.log.debug("Calling remote splitter at %s" % \
1605                        self.splitter_url)
1606                top = self.remote_ns2topdl(self.splitter_url, file_content)
1607            else:
1608                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1609                    str(self.muxmax), '-m', master]
1610
1611                tclcmd.extend([pid, gid, eid, tclfile])
1612
1613                self.log.debug("running local splitter %s", " ".join(tclcmd))
1614                # This is just fantastic.  As a side effect the parser copies
1615                # tb_compat.tcl into the current directory, so that directory
1616                # must be writable by the fedd user.  Doing this in the
1617                # temporary subdir ensures this is the case.
1618                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1619                        cwd=tmpdir)
1620                split_data = tclparser.stdout
1621
1622                top = topdl.topology_from_xml(file=split_data, top="experiment")
1623
1624            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1625             # Find the testbeds to look up
1626            testbeds = set([ a.value for e in top.elements \
1627                    for a in e.attribute \
1628                        if a.attribute == 'testbed'])
1629
1630            allocated = { }         # Testbeds we can access
1631            topo ={ }               # Sub topologies
1632            connInfo = { }          # Connection information
1633            services = [ ]
1634            masters = { }           # testbeds exporting services
1635            self.get_access_to_testbeds(testbeds, access_user, 
1636                    export_project, master, allocated, tbparams, services)
1637
1638            # After this masters will hold a set of services exported by each
1639            # testbed
1640            for s in services:
1641                i = s.get('visibility', 'import')
1642                if i == 'export':
1643                    for a in s.get('fedAttr', []):
1644                        if a.get('attribute', '') == 'testbed':
1645                            tb = a.get('value', None)
1646                            if tb:
1647                                if masters.has_key(tb): 
1648                                    masters[tb].add(s.get('name',''))
1649                                else:
1650                                    masters[tb] = set([s.get('name','')])
1651                            else:
1652                                log.error('Testbed attribute with no value?')
1653
1654
1655            self.split_topology(top, topo, testbeds)
1656
1657            # Copy configuration files into the remote file store
1658            # The config urlpath
1659            configpath = "/%s/config" % expid
1660            # The config file system location
1661            configdir ="%s%s" % ( self.repodir, configpath)
1662            try:
1663                os.makedirs(configdir)
1664            except IOError, e:
1665                raise service_error(
1666                        "Cannot create config directory: %s" % e)
1667            try:
1668                f = open("%s/hosts" % configdir, "w")
1669                f.write('\n'.join(hosts))
1670                f.close()
1671            except IOError, e:
1672                raise service_error(service_error.internal, 
1673                        "Cannot write hosts file: %s" % e)
1674            try:
1675                copy_file("%s" % gw_pubkey, "%s/%s" % \
1676                        (configdir, gw_pubkey_base))
1677                copy_file("%s" % gw_secretkey, "%s/%s" % \
1678                        (configdir, gw_secretkey_base))
1679            except IOError, e:
1680                raise service_error(service_error.internal, 
1681                        "Cannot copy keyfiles: %s" % e)
1682
1683            # Allow the individual testbeds to access the configuration files.
1684            for tb in tbparams.keys():
1685                asignee = tbparams[tb]['allocID']['fedid']
1686                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1687                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1688
1689            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1690                    self.muxmax)
1691            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1692                    connInfo, expid)
1693            # Now get access to the dynamic testbeds
1694            for k, t in topo.items():
1695                if not t.get_attribute('dynamic'):
1696                    continue
1697                tb = t.get_attribute('testbed')
1698                if tb: 
1699                    self.get_access(tb, None, tbparams, master, 
1700                            export_project, access_user, services)
1701                    tbparams[k] = tbparams[tb]
1702                    del tbparams[tb]
1703                    allocated[k] = 1
1704                    store_keys = t.get_attribute('store_keys')
1705                    # Give the testbed access to keys it exports or imports
1706                    if store_keys:
1707                        for sk in store_keys.split(" "):
1708                            self.auth.set_attribute(\
1709                                    tbparams[k]['allocID']['fedid'], sk)
1710                else:
1711                    raise service_error(service_error.internal, 
1712                            "Dynamic allocation from no testbed!?")
1713
1714            self.wrangle_software(expid, top, topo, tbparams)
1715
1716            vtopo = topdl.topology_to_vtopo(top)
1717            vis = self.genviz(vtopo)
1718
1719            # save federant information
1720            for k in allocated.keys():
1721                tbparams[k]['federant'] = {
1722                        'name': [ { 'localname' : eid} ],
1723                        'allocID' : tbparams[k]['allocID'],
1724                        'master' : k == master,
1725                        'uri': tbparams[k]['uri'],
1726                    }
1727                if tbparams[k].has_key('emulab'):
1728                        tbparams[k]['federant']['emulab'] = \
1729                                tbparams[k]['emulab']
1730
1731            self.state_lock.acquire()
1732            self.state[eid]['vtopo'] = vtopo
1733            self.state[eid]['vis'] = vis
1734            self.state[expid]['federant'] = \
1735                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1736                        if tbparams[tb].has_key('federant') ]
1737            if self.state_filename: 
1738                self.write_state()
1739            self.state_lock.release()
1740        except service_error, e:
1741            # If something goes wrong in the parse (usually an access error)
1742            # clear the placeholder state.  From here on out the code delays
1743            # exceptions.  Failing at this point returns a fault to the remote
1744            # caller.
1745
1746            self.state_lock.acquire()
1747            del self.state[eid]
1748            del self.state[expid]
1749            if self.state_filename: self.write_state()
1750            self.state_lock.release()
1751            raise e
1752
1753
1754        # Start the background swapper and return the starting state.  From
1755        # here on out, the state will stick around a while.
1756
1757        # Let users touch the state
1758        self.auth.set_attribute(fid, expid)
1759        self.auth.set_attribute(expid, expid)
1760        # Override fedids can manipulate state as well
1761        for o in self.overrides:
1762            self.auth.set_attribute(o, expid)
1763
1764        # Create a logger that logs to the experiment's state object as well as
1765        # to the main log file.
1766        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1767        alloc_collector = self.list_log(self.state[eid]['log'])
1768        h = logging.StreamHandler(alloc_collector)
1769        # XXX: there should be a global one of these rather than repeating the
1770        # code.
1771        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1772                    '%d %b %y %H:%M:%S'))
1773        alloc_log.addHandler(h)
1774       
1775        attrs = [ 
1776                {
1777                    'attribute': 'ssh_pubkey', 
1778                    'value': '%s/%s/config/%s' % \
1779                            (self.repo_url, expid, gw_pubkey_base)
1780                },
1781                {
1782                    'attribute': 'ssh_secretkey', 
1783                    'value': '%s/%s/config/%s' % \
1784                            (self.repo_url, expid, gw_secretkey_base)
1785                },
1786                {
1787                    'attribute': 'hosts', 
1788                    'value': '%s/%s/config/hosts' % \
1789                            (self.repo_url, expid)
1790                },
1791                {
1792                    'attribute': 'experiment_name',
1793                    'value': eid,
1794                },
1795            ]
1796
1797        # transit and disconnected testbeds may not have a connInfo entry.
1798        # Fill in the blanks.
1799        for t in allocated.keys():
1800            if not connInfo.has_key(t):
1801                connInfo[t] = { }
1802
1803        # Start a thread to do the resource allocation
1804        t  = Thread(target=self.allocate_resources,
1805                args=(allocated, master, eid, expid, tbparams, 
1806                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
1807                    services),
1808                name=eid)
1809        t.start()
1810
1811        rv = {
1812                'experimentID': [
1813                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1814                ],
1815                'experimentStatus': 'starting',
1816            }
1817
1818        return rv
1819   
1820    def get_experiment_fedid(self, key):
1821        """
1822        find the fedid associated with the localname key in the state database.
1823        """
1824
1825        rv = None
1826        self.state_lock.acquire()
1827        if self.state.has_key(key):
1828            if isinstance(self.state[key], dict):
1829                try:
1830                    kl = [ f['fedid'] for f in \
1831                            self.state[key]['experimentID']\
1832                                if f.has_key('fedid') ]
1833                except KeyError:
1834                    self.state_lock.release()
1835                    raise service_error(service_error.internal, 
1836                            "No fedid for experiment %s when getting "+\
1837                                    "fedid(!?)" % key)
1838                if len(kl) == 1:
1839                    rv = kl[0]
1840                else:
1841                    self.state_lock.release()
1842                    raise service_error(service_error.internal, 
1843                            "multiple fedids for experiment %s when " +\
1844                                    "getting fedid(!?)" % key)
1845            else:
1846                self.state_lock.release()
1847                raise service_error(service_error.internal, 
1848                        "Unexpected state for %s" % key)
1849        self.state_lock.release()
1850        return rv
1851
1852    def check_experiment_access(self, fid, key):
1853        """
1854        Confirm that the fid has access to the experiment.  Though a request
1855        may be made in terms of a local name, the access attribute is always
1856        the experiment's fedid.
1857        """
1858        if not isinstance(key, fedid):
1859            key = self.get_experiment_fedid(key)
1860
1861        if self.auth.check_attribute(fid, key):
1862            return True
1863        else:
1864            raise service_error(service_error.access, "Access Denied")
1865
1866
1867    def get_handler(self, path, fid):
1868        self.log.info("Get handler %s %s" % (path, fid))
1869        if self.auth.check_attribute(fid, path):
1870            return ("%s/%s" % (self.repodir, path), "application/binary")
1871        else:
1872            return (None, None)
1873
1874    def get_vtopo(self, req, fid):
1875        """
1876        Return the stored virtual topology for this experiment
1877        """
1878        rv = None
1879        state = None
1880
1881        req = req.get('VtopoRequestBody', None)
1882        if not req:
1883            raise service_error(service_error.req,
1884                    "Bad request format (no VtopoRequestBody)")
1885        exp = req.get('experiment', None)
1886        if exp:
1887            if exp.has_key('fedid'):
1888                key = exp['fedid']
1889                keytype = "fedid"
1890            elif exp.has_key('localname'):
1891                key = exp['localname']
1892                keytype = "localname"
1893            else:
1894                raise service_error(service_error.req, "Unknown lookup type")
1895        else:
1896            raise service_error(service_error.req, "No request?")
1897
1898        self.check_experiment_access(fid, key)
1899
1900        self.state_lock.acquire()
1901        if self.state.has_key(key):
1902            if self.state[key].has_key('vtopo'):
1903                rv = { 'experiment' : {keytype: key },\
1904                        'vtopo': self.state[key]['vtopo'],\
1905                    }
1906            else:
1907                state = self.state[key]['experimentStatus']
1908        self.state_lock.release()
1909
1910        if rv: return rv
1911        else: 
1912            if state:
1913                raise service_error(service_error.partial, 
1914                        "Not ready: %s" % state)
1915            else:
1916                raise service_error(service_error.req, "No such experiment")
1917
1918    def get_vis(self, req, fid):
1919        """
1920        Return the stored visualization for this experiment
1921        """
1922        rv = None
1923        state = None
1924
1925        req = req.get('VisRequestBody', None)
1926        if not req:
1927            raise service_error(service_error.req,
1928                    "Bad request format (no VisRequestBody)")
1929        exp = req.get('experiment', None)
1930        if exp:
1931            if exp.has_key('fedid'):
1932                key = exp['fedid']
1933                keytype = "fedid"
1934            elif exp.has_key('localname'):
1935                key = exp['localname']
1936                keytype = "localname"
1937            else:
1938                raise service_error(service_error.req, "Unknown lookup type")
1939        else:
1940            raise service_error(service_error.req, "No request?")
1941
1942        self.check_experiment_access(fid, key)
1943
1944        self.state_lock.acquire()
1945        if self.state.has_key(key):
1946            if self.state[key].has_key('vis'):
1947                rv =  { 'experiment' : {keytype: key },\
1948                        'vis': self.state[key]['vis'],\
1949                        }
1950            else:
1951                state = self.state[key]['experimentStatus']
1952        self.state_lock.release()
1953
1954        if rv: return rv
1955        else:
1956            if state:
1957                raise service_error(service_error.partial, 
1958                        "Not ready: %s" % state)
1959            else:
1960                raise service_error(service_error.req, "No such experiment")
1961
1962    def clean_info_response(self, rv):
1963        """
1964        Remove the information in the experiment's state object that is not in
1965        the info response.
1966        """
1967        # Remove the owner info (should always be there, but...)
1968        if rv.has_key('owner'): del rv['owner']
1969
1970        # Convert the log into the allocationLog parameter and remove the
1971        # log entry (with defensive programming)
1972        if rv.has_key('log'):
1973            rv['allocationLog'] = "".join(rv['log'])
1974            del rv['log']
1975        else:
1976            rv['allocationLog'] = ""
1977
1978        if rv['experimentStatus'] != 'active':
1979            if rv.has_key('federant'): del rv['federant']
1980        else:
1981            # remove the allocationID and uri info from each federant
1982            for f in rv.get('federant', []):
1983                if f.has_key('allocID'): del f['allocID']
1984                if f.has_key('uri'): del f['uri']
1985        return rv
1986
1987    def get_info(self, req, fid):
1988        """
1989        Return all the stored info about this experiment
1990        """
1991        rv = None
1992
1993        req = req.get('InfoRequestBody', None)
1994        if not req:
1995            raise service_error(service_error.req,
1996                    "Bad request format (no InfoRequestBody)")
1997        exp = req.get('experiment', None)
1998        if exp:
1999            if exp.has_key('fedid'):
2000                key = exp['fedid']
2001                keytype = "fedid"
2002            elif exp.has_key('localname'):
2003                key = exp['localname']
2004                keytype = "localname"
2005            else:
2006                raise service_error(service_error.req, "Unknown lookup type")
2007        else:
2008            raise service_error(service_error.req, "No request?")
2009
2010        self.check_experiment_access(fid, key)
2011
2012        # The state may be massaged by the service function that called
2013        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2014        # state.
2015        self.state_lock.acquire()
2016        if self.state.has_key(key):
2017            rv = copy.deepcopy(self.state[key])
2018        self.state_lock.release()
2019
2020        if rv:
2021            return self.clean_info_response(rv)
2022        else:
2023            raise service_error(service_error.req, "No such experiment")
2024
2025    def get_multi_info(self, req, fid):
2026        """
2027        Return all the stored info that this fedid can access
2028        """
2029        rv = { 'info': [ ] }
2030
2031        self.state_lock.acquire()
2032        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2033            try:
2034                self.check_experiment_access(fid, key)
2035            except service_error, e:
2036                if e.code == service_error.access:
2037                    continue
2038                else:
2039                    self.state_lock.release()
2040                    raise e
2041
2042            if self.state.has_key(key):
2043                e = copy.deepcopy(self.state[key])
2044                e = self.clean_info_response(e)
2045                rv['info'].append(e)
2046        self.state_lock.release()
2047        return rv
2048
2049    def terminate_experiment(self, req, fid):
2050        """
2051        Swap this experiment out on the federants and delete the shared
2052        information
2053        """
2054        tbparams = { }
2055        req = req.get('TerminateRequestBody', None)
2056        if not req:
2057            raise service_error(service_error.req,
2058                    "Bad request format (no TerminateRequestBody)")
2059        force = req.get('force', False)
2060        exp = req.get('experiment', None)
2061        if exp:
2062            if exp.has_key('fedid'):
2063                key = exp['fedid']
2064                keytype = "fedid"
2065            elif exp.has_key('localname'):
2066                key = exp['localname']
2067                keytype = "localname"
2068            else:
2069                raise service_error(service_error.req, "Unknown lookup type")
2070        else:
2071            raise service_error(service_error.req, "No request?")
2072
2073        self.check_experiment_access(fid, key)
2074
2075        dealloc_list = [ ]
2076
2077
2078        # Create a logger that logs to the dealloc_list as well as to the main
2079        # log file.
2080        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2081        h = logging.StreamHandler(self.list_log(dealloc_list))
2082        # XXX: there should be a global one of these rather than repeating the
2083        # code.
2084        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2085                    '%d %b %y %H:%M:%S'))
2086        dealloc_log.addHandler(h)
2087
2088        self.state_lock.acquire()
2089        fed_exp = self.state.get(key, None)
2090
2091        if fed_exp:
2092            # This branch of the conditional holds the lock to generate a
2093            # consistent temporary tbparams variable to deallocate experiments.
2094            # It releases the lock to do the deallocations and reacquires it to
2095            # remove the experiment state when the termination is complete.
2096
2097            # First make sure that the experiment creation is complete.
2098            status = fed_exp.get('experimentStatus', None)
2099
2100            if status:
2101                if status in ('starting', 'terminating'):
2102                    if not force:
2103                        self.state_lock.release()
2104                        raise service_error(service_error.partial, 
2105                                'Experiment still being created or destroyed')
2106                    else:
2107                        self.log.warning('Experiment in %s state ' % status + \
2108                                'being terminated by force.')
2109            else:
2110                # No status??? trouble
2111                self.state_lock.release()
2112                raise service_error(service_error.internal,
2113                        "Experiment has no status!?")
2114
2115            ids = []
2116            #  experimentID is a list of dicts that are self-describing
2117            #  identifiers.  This finds all the fedids and localnames - the
2118            #  keys of self.state - and puts them into ids.
2119            for id in fed_exp.get('experimentID', []):
2120                if id.has_key('fedid'): ids.append(id['fedid'])
2121                if id.has_key('localname'): ids.append(id['localname'])
2122
2123            # Collect the allocation/segment ids into a dict keyed by the fedid
2124            # of the allocation (or a monotonically increasing integer) that
2125            # contains a tuple of uri, aid (which is a dict...)
2126            for i, fed in enumerate(fed_exp.get('federant', [])):
2127                try:
2128                    uri = fed['uri']
2129                    aid = fed['allocID']
2130                    k = fed['allocID'].get('fedid', i)
2131                except KeyError, e:
2132                    continue
2133                tbparams[k] = (uri, aid)
2134            fed_exp['experimentStatus'] = 'terminating'
2135            if self.state_filename: self.write_state()
2136            self.state_lock.release()
2137
2138            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2139            # then completes, so we can't wait if nothing starts.  So, no
2140            # tbparams, no start.
2141            if len(tbparams) > 0:
2142                thread_pool = self.thread_pool(self.nthreads)
2143                for k in tbparams.keys():
2144                    # Create and start a thread to stop the segment
2145                    thread_pool.wait_for_slot()
2146                    uri, aid = tbparams[k]
2147                    t  = self.pooled_thread(\
2148                            target=self.terminate_segment(log=dealloc_log,
2149                                testbed=uri,
2150                                cert_file=self.cert_file, 
2151                                cert_pwd=self.cert_pwd,
2152                                trusted_certs=self.trusted_certs,
2153                                caller=self.call_TerminateSegment),
2154                            args=(uri, aid), name=k,
2155                            pdata=thread_pool, trace_file=self.trace_file)
2156                    t.start()
2157                # Wait for completions
2158                thread_pool.wait_for_all_done()
2159
2160            # release the allocations (failed experiments have done this
2161            # already, and starting experiments may be in odd states, so we
2162            # ignore errors releasing those allocations
2163            try: 
2164                for k in tbparams.keys():
2165                    # This releases access by uri
2166                    uri, aid = tbparams[k]
2167                    self.release_access(None, aid, uri=uri)
2168            except service_error, e:
2169                if status != 'failed' and not force:
2170                    raise e
2171
2172            # Remove the terminated experiment
2173            self.state_lock.acquire()
2174            for id in ids:
2175                if self.state.has_key(id): del self.state[id]
2176
2177            if self.state_filename: self.write_state()
2178            self.state_lock.release()
2179
2180            # Delete any synch points associated with this experiment.  All
2181            # synch points begin with the fedid of the experiment.
2182            fedid_keys = set(["fedid:%s" % f for f in ids \
2183                    if isinstance(f, fedid)])
2184            for k in self.synch_store.all_keys():
2185                try:
2186                    if len(k) > 45 and k[0:46] in fedid_keys:
2187                        self.synch_store.del_value(k)
2188                except synch_store.BadDeletionError:
2189                    pass
2190            self.write_store()
2191               
2192            return { 
2193                    'experiment': exp , 
2194                    'deallocationLog': "".join(dealloc_list),
2195                    }
2196        else:
2197            # Don't forget to release the lock
2198            self.state_lock.release()
2199            raise service_error(service_error.req, "No saved state")
2200
2201
2202    def GetValue(self, req, fid):
2203        """
2204        Get a value from the synchronized store
2205        """
2206        req = req.get('GetValueRequestBody', None)
2207        if not req:
2208            raise service_error(service_error.req,
2209                    "Bad request format (no GetValueRequestBody)")
2210       
2211        name = req['name']
2212        wait = req['wait']
2213        rv = { 'name': name }
2214
2215        if self.auth.check_attribute(fid, name):
2216            try:
2217                v = self.synch_store.get_value(name, wait)
2218            except synch_store.RevokedKeyError:
2219                # No more synch on this key
2220                raise service_error(service_error.federant, 
2221                        "Synch key %s revoked" % name)
2222            if v is not None:
2223                rv['value'] = v
2224            self.log.debug("[GetValue] got %s from %s" % (v, name))
2225            return rv
2226        else:
2227            raise service_error(service_error.access, "Access Denied")
2228       
2229
2230    def SetValue(self, req, fid):
2231        """
2232        Set a value in the synchronized store
2233        """
2234        req = req.get('SetValueRequestBody', None)
2235        if not req:
2236            raise service_error(service_error.req,
2237                    "Bad request format (no SetValueRequestBody)")
2238       
2239        name = req['name']
2240        v = req['value']
2241
2242        if self.auth.check_attribute(fid, name):
2243            try:
2244                self.synch_store.set_value(name, v)
2245                self.write_store()
2246                self.log.debug("[SetValue] set %s to %s" % (name, v))
2247            except synch_store.CollisionError:
2248                # Translate into a service_error
2249                raise service_error(service_error.req,
2250                        "Value already set: %s" %name)
2251            except synch_store.RevokedKeyError:
2252                # No more synch on this key
2253                raise service_error(service_error.federant, 
2254                        "Synch key %s revoked" % name)
2255            return { 'name': name, 'value': v }
2256        else:
2257            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.