source: fedd/federation/experiment_control.py @ 73e7f5c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 73e7f5c was 73e7f5c, checked in by Ted Faber <faber@…>, 14 years ago

Split the experiment partition routines out into a separate class

  • Property mode set to 100644
File size: 77.9 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45class experiment_control_local:
46    """
47    Control of experiments that this system can directly access.
48
49    Includes experiment creation, termination and information dissemination.
50    Thred safe.
51    """
52
53    class ssh_cmd_timeout(RuntimeError): pass
54   
55    class thread_pool:
56        """
57        A class to keep track of a set of threads all invoked for the same
58        task.  Manages the mutual exclusion of the states.
59        """
60        def __init__(self, nthreads):
61            """
62            Start a pool.
63            """
64            self.changed = Condition()
65            self.started = 0
66            self.terminated = 0
67            self.nthreads = nthreads
68
69        def acquire(self):
70            """
71            Get the pool's lock.
72            """
73            self.changed.acquire()
74
75        def release(self):
76            """
77            Release the pool's lock.
78            """
79            self.changed.release()
80
81        def wait(self, timeout = None):
82            """
83            Wait for a pool thread to start or stop.
84            """
85            self.changed.wait(timeout)
86
87        def start(self):
88            """
89            Called by a pool thread to report starting.
90            """
91            self.changed.acquire()
92            self.started += 1
93            self.changed.notifyAll()
94            self.changed.release()
95
96        def terminate(self):
97            """
98            Called by a pool thread to report finishing.
99            """
100            self.changed.acquire()
101            self.terminated += 1
102            self.changed.notifyAll()
103            self.changed.release()
104
105        def clear(self):
106            """
107            Clear all pool data.
108            """
109            self.changed.acquire()
110            self.started = 0
111            self.terminated =0
112            self.changed.notifyAll()
113            self.changed.release()
114
115        def wait_for_slot(self):
116            """
117            Wait until we have a free slot to start another pooled thread
118            """
119            self.acquire()
120            while self.started - self.terminated >= self.nthreads:
121                self.wait()
122            self.release()
123
124        def wait_for_all_done(self, timeout=None):
125            """
126            Wait until all active threads finish (and at least one has
127            started).  If a timeout is given, return after waiting that long
128            for termination.  If all threads are done (and one has started in
129            the since the last clear()) return True, otherwise False.
130            """
131            if timeout:
132                deadline = time.time() + timeout
133            self.acquire()
134            while self.started == 0 or self.started > self.terminated:
135                self.wait(timeout)
136                if timeout:
137                    if time.time() > deadline:
138                        break
139                    timeout = deadline - time.time()
140            self.release()
141            return not (self.started == 0 or self.started > self.terminated)
142
143    class pooled_thread(Thread):
144        """
145        One of a set of threads dedicated to a specific task.  Uses the
146        thread_pool class above for coordination.
147        """
148        def __init__(self, group=None, target=None, name=None, args=(), 
149                kwargs={}, pdata=None, trace_file=None):
150            Thread.__init__(self, group, target, name, args, kwargs)
151            self.rv = None          # Return value of the ops in this thread
152            self.exception = None   # Exception that terminated this thread
153            self.target=target      # Target function to run on start()
154            self.args = args        # Args to pass to target
155            self.kwargs = kwargs    # Additional kw args
156            self.pdata = pdata      # thread_pool for this class
157            # Logger for this thread
158            self.log = logging.getLogger("fedd.experiment_control")
159       
160        def run(self):
161            """
162            Emulate Thread.run, except add pool data manipulation and error
163            logging.
164            """
165            if self.pdata:
166                self.pdata.start()
167
168            if self.target:
169                try:
170                    self.rv = self.target(*self.args, **self.kwargs)
171                except service_error, s:
172                    self.exception = s
173                    self.log.error("Thread exception: %s %s" % \
174                            (s.code_string(), s.desc))
175                except:
176                    self.exception = sys.exc_info()[1]
177                    self.log.error(("Unexpected thread exception: %s" +\
178                            "Trace %s") % (self.exception,\
179                                traceback.format_exc()))
180            if self.pdata:
181                self.pdata.terminate()
182
183    call_RequestAccess = service_caller('RequestAccess')
184    call_ReleaseAccess = service_caller('ReleaseAccess')
185    call_StartSegment = service_caller('StartSegment')
186    call_TerminateSegment = service_caller('TerminateSegment')
187    call_Ns2Topdl = service_caller('Ns2Topdl')
188
189    def __init__(self, config=None, auth=None):
190        """
191        Intialize the various attributes, most from the config object
192        """
193
194        def parse_tarfile_list(tf):
195            """
196            Parse a tarfile list from the configuration.  This is a set of
197            paths and tarfiles separated by spaces.
198            """
199            rv = [ ]
200            if tf is not None:
201                tl = tf.split()
202                while len(tl) > 1:
203                    p, t = tl[0:2]
204                    del tl[0:2]
205                    rv.append((p, t))
206            return rv
207
208        self.thread_with_rv = experiment_control_local.pooled_thread
209        self.thread_pool = experiment_control_local.thread_pool
210        self.list_log = list_log.list_log
211
212        self.cert_file = config.get("experiment_control", "cert_file")
213        if self.cert_file:
214            self.cert_pwd = config.get("experiment_control", "cert_pwd")
215        else:
216            self.cert_file = config.get("globals", "cert_file")
217            self.cert_pwd = config.get("globals", "cert_pwd")
218
219        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
220                or config.get("globals", "trusted_certs")
221
222        self.repodir = config.get("experiment_control", "repodir")
223        self.repo_url = config.get("experiment_control", "repo_url", 
224                "https://users.isi.deterlab.net:23235");
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 10
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.cleanup = not config.getboolean("experiment_control", 
240                "leave_tmpfiles")
241        self.state_filename = config.get("experiment_control", 
242                "experiment_state")
243        self.store_filename = config.get("experiment_control", 
244                "synch_store")
245        self.store_url = config.get("experiment_control", "store_url")
246        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
247        self.fedkit = parse_tarfile_list(\
248                config.get("experiment_control", "fedkit"))
249        self.gatewaykit = parse_tarfile_list(\
250                config.get("experiment_control", "gatewaykit"))
251        accessdb_file = config.get("experiment_control", "accessdb")
252
253        self.ssh_pubkey_file = config.get("experiment_control", 
254                "ssh_pubkey_file")
255        self.ssh_privkey_file = config.get("experiment_control",
256                "ssh_privkey_file")
257        # NB for internal master/slave ops, not experiment setup
258        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
259
260        self.overrides = set([])
261        ovr = config.get('experiment_control', 'overrides')
262        if ovr:
263            for o in ovr.split(","):
264                o = o.strip()
265                if o.startswith('fedid:'): o = o[len('fedid:'):]
266                self.overrides.add(fedid(hexstr=o))
267
268        self.state = { }
269        self.state_lock = Lock()
270        self.tclsh = "/usr/local/bin/otclsh"
271        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
272                config.get("experiment_control", "tcl_splitter",
273                        "/usr/testbed/lib/ns2ir/parse.tcl")
274        mapdb_file = config.get("experiment_control", "mapdb")
275        self.trace_file = sys.stderr
276
277        self.def_expstart = \
278                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
279                "/tmp/federate";
280        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
281                "FEDDIR/hosts";
282        self.def_gwstart = \
283                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
284                "/tmp/bridge.log";
285        self.def_mgwstart = \
286                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
287                "/tmp/bridge.log";
288        self.def_gwimage = "FBSD61-TUNNEL2";
289        self.def_gwtype = "pc";
290        self.local_access = { }
291
292        if auth:
293            self.auth = auth
294        else:
295            self.log.error(\
296                    "[access]: No authorizer initialized, creating local one.")
297            auth = authorizer()
298
299
300        if self.ssh_pubkey_file:
301            try:
302                f = open(self.ssh_pubkey_file, 'r')
303                self.ssh_pubkey = f.read()
304                f.close()
305            except IOError:
306                raise service_error(service_error.internal,
307                        "Cannot read sshpubkey")
308        else:
309            raise service_error(service_error.internal, 
310                    "No SSH public key file?")
311
312        if not self.ssh_privkey_file:
313            raise service_error(service_error.internal, 
314                    "No SSH public key file?")
315
316
317        if mapdb_file:
318            self.read_mapdb(mapdb_file)
319        else:
320            self.log.warn("[experiment_control] No testbed map, using defaults")
321            self.tbmap = { 
322                    'deter':'https://users.isi.deterlab.net:23235',
323                    'emulab':'https://users.isi.deterlab.net:23236',
324                    'ucb':'https://users.isi.deterlab.net:23237',
325                    }
326
327        if accessdb_file:
328                self.read_accessdb(accessdb_file)
329        else:
330            raise service_error(service_error.internal,
331                    "No accessdb specified in config")
332
333        # Grab saved state.  OK to do this w/o locking because it's read only
334        # and only one thread should be in existence that can see self.state at
335        # this point.
336        if self.state_filename:
337            self.read_state()
338
339        if self.store_filename:
340            self.read_store()
341        else:
342            self.log.warning("No saved synch store")
343            self.synch_store = synch_store
344
345        # Dispatch tables
346        self.soap_services = {\
347                'New': soap_handler('New', self.new_experiment),
348                'Create': soap_handler('Create', self.create_experiment),
349                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
350                'Vis': soap_handler('Vis', self.get_vis),
351                'Info': soap_handler('Info', self.get_info),
352                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
353                'Terminate': soap_handler('Terminate', 
354                    self.terminate_experiment),
355                'GetValue': soap_handler('GetValue', self.GetValue),
356                'SetValue': soap_handler('SetValue', self.SetValue),
357        }
358
359        self.xmlrpc_services = {\
360                'New': xmlrpc_handler('New', self.new_experiment),
361                'Create': xmlrpc_handler('Create', self.create_experiment),
362                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
363                'Vis': xmlrpc_handler('Vis', self.get_vis),
364                'Info': xmlrpc_handler('Info', self.get_info),
365                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
366                'Terminate': xmlrpc_handler('Terminate',
367                    self.terminate_experiment),
368                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
369                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
370        }
371
372    # Call while holding self.state_lock
373    def write_state(self):
374        """
375        Write a new copy of experiment state after copying the existing state
376        to a backup.
377
378        State format is a simple pickling of the state dictionary.
379        """
380        if os.access(self.state_filename, os.W_OK):
381            copy_file(self.state_filename, \
382                    "%s.bak" % self.state_filename)
383        try:
384            f = open(self.state_filename, 'w')
385            pickle.dump(self.state, f)
386        except IOError, e:
387            self.log.error("Can't write file %s: %s" % \
388                    (self.state_filename, e))
389        except pickle.PicklingError, e:
390            self.log.error("Pickling problem: %s" % e)
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    @staticmethod
395    def get_alloc_ids(state):
396        """
397        Pull the fedids of the identifiers of each allocation from the
398        state.  Again, a dict dive that's best isolated.
399
400        Used by read_store and read state
401        """
402
403        return [ f['allocID']['fedid'] 
404                for f in state.get('federant',[]) \
405                    if f.has_key('allocID') and \
406                        f['allocID'].has_key('fedid')]
407
408    # Call while holding self.state_lock
409    def read_state(self):
410        """
411        Read a new copy of experiment state.  Old state is overwritten.
412
413        State format is a simple pickling of the state dictionary.
414        """
415       
416        def get_experiment_id(state):
417            """
418            Pull the fedid experimentID out of the saved state.  This is kind
419            of a gross walk through the dict.
420            """
421
422            if state.has_key('experimentID'):
423                for e in state['experimentID']:
424                    if e.has_key('fedid'):
425                        return e['fedid']
426                else:
427                    return None
428            else:
429                return None
430
431        try:
432            f = open(self.state_filename, "r")
433            self.state = pickle.load(f)
434            self.log.debug("[read_state]: Read state from %s" % \
435                    self.state_filename)
436        except IOError, e:
437            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
438                    % (self.state_filename, e))
439        except pickle.UnpicklingError, e:
440            self.log.warning(("[read_state]: No saved state: " + \
441                    "Unpickling failed: %s") % e)
442       
443        for s in self.state.values():
444            try:
445
446                eid = get_experiment_id(s)
447                if eid : 
448                    # Give the owner rights to the experiment
449                    self.auth.set_attribute(s['owner'], eid)
450                    # And holders of the eid as well
451                    self.auth.set_attribute(eid, eid)
452                    # allow overrides to control experiments as well
453                    for o in self.overrides:
454                        self.auth.set_attribute(o, eid)
455                    # Set permissions to allow reading of the software repo, if
456                    # any, as well.
457                    for a in self.get_alloc_ids(s):
458                        self.auth.set_attribute(a, 'repo/%s' % eid)
459                else:
460                    raise KeyError("No experiment id")
461            except KeyError, e:
462                self.log.warning("[read_state]: State ownership or identity " +\
463                        "misformatted in %s: %s" % (self.state_filename, e))
464
465
466    def read_accessdb(self, accessdb_file):
467        """
468        Read the mapping from fedids that can create experiments to their name
469        in the 3-level access namespace.  All will be asserted from this
470        testbed and can include the local username and porject that will be
471        asserted on their behalf by this fedd.  Each fedid is also added to the
472        authorization system with the "create" attribute.
473        """
474        self.accessdb = {}
475        # These are the regexps for parsing the db
476        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
477        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
478                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
479        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
480                "\s*->\s*(" + name_expr + ")\s*$")
481        lineno = 0
482
483        # Parse the mappings and store in self.authdb, a dict of
484        # fedid -> (proj, user)
485        try:
486            f = open(accessdb_file, "r")
487            for line in f:
488                lineno += 1
489                line = line.strip()
490                if len(line) == 0 or line.startswith('#'):
491                    continue
492                m = project_line.match(line)
493                if m:
494                    fid = fedid(hexstr=m.group(1))
495                    project, user = m.group(2,3)
496                    if not self.accessdb.has_key(fid):
497                        self.accessdb[fid] = []
498                    self.accessdb[fid].append((project, user))
499                    continue
500
501                m = user_line.match(line)
502                if m:
503                    fid = fedid(hexstr=m.group(1))
504                    project = None
505                    user = m.group(2)
506                    if not self.accessdb.has_key(fid):
507                        self.accessdb[fid] = []
508                    self.accessdb[fid].append((project, user))
509                    continue
510                self.log.warn("[experiment_control] Error parsing access " +\
511                        "db %s at line %d" %  (accessdb_file, lineno))
512        except IOError:
513            raise service_error(service_error.internal,
514                    "Error opening/reading %s as experiment " +\
515                            "control accessdb" %  accessdb_file)
516        f.close()
517
518        # Initialize the authorization attributes
519        for fid in self.accessdb.keys():
520            self.auth.set_attribute(fid, 'create')
521            self.auth.set_attribute(fid, 'new')
522
523    def read_mapdb(self, file):
524        """
525        Read a simple colon separated list of mappings for the
526        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
527        """
528
529        self.tbmap = { }
530        lineno =0
531        try:
532            f = open(file, "r")
533            for line in f:
534                lineno += 1
535                line = line.strip()
536                if line.startswith('#') or len(line) == 0:
537                    continue
538                try:
539                    label, url = line.split(':', 1)
540                    self.tbmap[label] = url
541                except ValueError, e:
542                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
543                            "map db: %s %s" % (lineno, line, e))
544        except IOError, e:
545            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
546                    "open %s: %s" % (file, e))
547        f.close()
548
549    def read_store(self):
550        try:
551            self.synch_store = synch_store()
552            self.synch_store.load(self.store_filename)
553            self.log.debug("[read_store]: Read store from %s" % \
554                    self.store_filename)
555        except IOError, e:
556            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
557                    % (self.state_filename, e))
558            self.synch_store = synch_store()
559
560        # Set the initial permissions on data in the store.  XXX: This ad hoc
561        # authorization attribute initialization is getting out of hand.
562        for k in self.synch_store.all_keys():
563            try:
564                if k.startswith('fedid:'):
565                    fid = fedid(hexstr=k[6:46])
566                    if self.state.has_key(fid):
567                        for a in self.get_alloc_ids(self.state[fid]):
568                            self.auth.set_attribute(a, k)
569            except ValueError, e:
570                self.log.warn("Cannot deduce permissions for %s" % k)
571
572
573    def write_store(self):
574        """
575        Write a new copy of synch_store after writing current state
576        to a backup.  We use the internal synch_store pickle method to avoid
577        incinsistent data.
578
579        State format is a simple pickling of the store.
580        """
581        if os.access(self.store_filename, os.W_OK):
582            copy_file(self.store_filename, \
583                    "%s.bak" % self.store_filename)
584        try:
585            self.synch_store.save(self.store_filename)
586        except IOError, e:
587            self.log.error("Can't write file %s: %s" % \
588                    (self.store_filename, e))
589        except TypeError, e:
590            self.log.error("Pickling problem (TypeError): %s" % e)
591
592       
593    def generate_ssh_keys(self, dest, type="rsa" ):
594        """
595        Generate a set of keys for the gateways to use to talk.
596
597        Keys are of type type and are stored in the required dest file.
598        """
599        valid_types = ("rsa", "dsa")
600        t = type.lower();
601        if t not in valid_types: raise ValueError
602        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
603
604        try:
605            trace = open("/dev/null", "w")
606        except IOError:
607            raise service_error(service_error.internal,
608                    "Cannot open /dev/null??");
609
610        # May raise CalledProcessError
611        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
612        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
613        if rv != 0:
614            raise service_error(service_error.internal, 
615                    "Cannot generate nonce ssh keys.  %s return code %d" \
616                            % (self.ssh_keygen, rv))
617
618    def gentopo(self, str):
619        """
620        Generate the topology dtat structure from the splitter's XML
621        representation of it.
622
623        The topology XML looks like:
624            <experiment>
625                <nodes>
626                    <node><vname></vname><ips>ip1:ip2</ips></node>
627                </nodes>
628                <lans>
629                    <lan>
630                        <vname></vname><vnode></vnode><ip></ip>
631                        <bandwidth></bandwidth><member>node:port</member>
632                    </lan>
633                </lans>
634        """
635        class topo_parse:
636            """
637            Parse the topology XML and create the dats structure.
638            """
639            def __init__(self):
640                # Typing of the subelements for data conversion
641                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
642                self.int_subelements = ( 'bandwidth',)
643                self.float_subelements = ( 'delay',)
644                # The final data structure
645                self.nodes = [ ]
646                self.lans =  [ ]
647                self.topo = { \
648                        'node': self.nodes,\
649                        'lan' : self.lans,\
650                    }
651                self.element = { }  # Current element being created
652                self.chars = ""     # Last text seen
653
654            def end_element(self, name):
655                # After each sub element the contents is added to the current
656                # element or to the appropriate list.
657                if name == 'node':
658                    self.nodes.append(self.element)
659                    self.element = { }
660                elif name == 'lan':
661                    self.lans.append(self.element)
662                    self.element = { }
663                elif name in self.str_subelements:
664                    self.element[name] = self.chars
665                    self.chars = ""
666                elif name in self.int_subelements:
667                    self.element[name] = int(self.chars)
668                    self.chars = ""
669                elif name in self.float_subelements:
670                    self.element[name] = float(self.chars)
671                    self.chars = ""
672
673            def found_chars(self, data):
674                self.chars += data.rstrip()
675
676
677        tp = topo_parse();
678        parser = xml.parsers.expat.ParserCreate()
679        parser.EndElementHandler = tp.end_element
680        parser.CharacterDataHandler = tp.found_chars
681
682        parser.Parse(str)
683
684        return tp.topo
685       
686
687    def genviz(self, topo):
688        """
689        Generate the visualization the virtual topology
690        """
691
692        neato = "/usr/local/bin/neato"
693        # These are used to parse neato output and to create the visualization
694        # file.
695        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
696        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
697                "%s</type></node>"
698
699        try:
700            # Node names
701            nodes = [ n['vname'] for n in topo['node'] ]
702            topo_lans = topo['lan']
703        except KeyError, e:
704            raise service_error(service_error.internal, "Bad topology: %s" %e)
705
706        lans = { }
707        links = { }
708
709        # Walk through the virtual topology, organizing the connections into
710        # 2-node connections (links) and more-than-2-node connections (lans).
711        # When a lan is created, it's added to the list of nodes (there's a
712        # node in the visualization for the lan).
713        for l in topo_lans:
714            if links.has_key(l['vname']):
715                if len(links[l['vname']]) < 2:
716                    links[l['vname']].append(l['vnode'])
717                else:
718                    nodes.append(l['vname'])
719                    lans[l['vname']] = links[l['vname']]
720                    del links[l['vname']]
721                    lans[l['vname']].append(l['vnode'])
722            elif lans.has_key(l['vname']):
723                lans[l['vname']].append(l['vnode'])
724            else:
725                links[l['vname']] = [ l['vnode'] ]
726
727
728        # Open up a temporary file for dot to turn into a visualization
729        try:
730            df, dotname = tempfile.mkstemp()
731            dotfile = os.fdopen(df, 'w')
732        except IOError:
733            raise service_error(service_error.internal,
734                    "Failed to open file in genviz")
735
736        try:
737            dnull = open('/dev/null', 'w')
738        except IOError:
739            service_error(service_error.internal,
740                    "Failed to open /dev/null in genviz")
741
742        # Generate a dot/neato input file from the links, nodes and lans
743        try:
744            print >>dotfile, "graph G {"
745            for n in nodes:
746                print >>dotfile, '\t"%s"' % n
747            for l in links.keys():
748                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
749            for l in lans.keys():
750                for n in lans[l]:
751                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
752            print >>dotfile, "}"
753            dotfile.close()
754        except TypeError:
755            raise service_error(service_error.internal,
756                    "Single endpoint link in vtopo")
757        except IOError:
758            raise service_error(service_error.internal, "Cannot write dot file")
759
760        # Use dot to create a visualization
761        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
762                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
763                close_fds=True)
764        dnull.close()
765
766        # Translate dot to vis format
767        vis_nodes = [ ]
768        vis = { 'node': vis_nodes }
769        for line in dot.stdout:
770            m = vis_re.match(line)
771            if m:
772                vn = m.group(1)
773                vis_node = {'name': vn, \
774                        'x': float(m.group(2)),\
775                        'y' : float(m.group(3)),\
776                    }
777                if vn in links.keys() or vn in lans.keys():
778                    vis_node['type'] = 'lan'
779                else:
780                    vis_node['type'] = 'node'
781                vis_nodes.append(vis_node)
782        rv = dot.wait()
783
784        os.remove(dotname)
785        if rv == 0 : return vis
786        else: return None
787
788    def get_access(self, tb, nodes, tbparam, master, export_project,
789            access_user, services):
790        """
791        Get access to testbed through fedd and set the parameters for that tb
792        """
793        uri = self.tbmap.get(tb, None)
794        if not uri:
795            raise service_error(service_error.server_config, 
796                    "Unknown testbed: %s" % tb)
797
798        # Tweak search order so that if there are entries in access_user that
799        # have a project matching the export project, we try them first
800        if export_project: 
801            access_sequence = [ (p, u) for p, u in access_user \
802                    if p == export_project] 
803            access_sequence.extend([(p, u) for p, u in access_user \
804                    if p != export_project]) 
805        else: 
806            access_sequence = access_user
807
808        for p, u in access_sequence: 
809            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
810                    "to %s") %  ((p or "None"), u, uri))
811
812            if p:
813                # Request with user and project specified
814                req = {\
815                        'destinationTestbed' : { 'uri' : uri },
816                        'credential': [ "project: %s" % p, "user: %s"  % u],
817                        'allocID' : { 'localname': 'test' },
818                    }
819            else:
820                # Request with only user specified
821                req = {\
822                        'destinationTestbed' : { 'uri' : uri },
823                        'credential': [ 'user: %s' % u ],
824                        'user':  [ {'userID': { 'localname': u } } ],
825                        'allocID' : { 'localname': 'test' },
826                    }
827
828            # If there is a master, and this is it, ask it to export services
829            # XXX move this to export pseudo-service
830            if tb == master:
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r: 
889            services.extend(r['service'])
890
891        # Add attributes to parameter space.  We don't allow attributes to
892        # overlay any parameters already installed.
893        for a in r.get('fedAttr', []):
894            try:
895                if a['attribute'] and \
896                        isinstance(a['attribute'], basestring)\
897                        and not tbparam[tb].has_key(a['attribute'].lower()):
898                    tbparam[tb][a['attribute'].lower()] = a['value']
899            except KeyError:
900                self.log.error("Bad attribute in response: %s" % a)
901
902    def release_access(self, tb, aid, uri=None):
903        """
904        Release access to testbed through fedd
905        """
906
907        if not uri:
908            uri = self.tbmap.get(tb, None)
909        if not uri:
910            raise service_error(service_error.server_config, 
911                    "Unknown testbed: %s" % tb)
912
913        if self.local_access.has_key(uri):
914            resp = self.local_access[uri].ReleaseAccess(\
915                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
916                    fedid(file=self.cert_file))
917            resp = { 'ReleaseAccessResponseBody': resp } 
918        else:
919            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
920                    self.cert_file, self.cert_pwd, self.trusted_certs)
921
922        # better error coding
923
924    def remote_ns2topdl(self, uri, desc):
925
926        req = {
927                'description' : { 'ns2description': desc },
928            }
929
930        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
931                self.trusted_certs)
932
933        if r.has_key('Ns2TopdlResponseBody'):
934            r = r['Ns2TopdlResponseBody']
935            ed = r.get('experimentdescription', None)
936            if ed.has_key('topdldescription'):
937                return topdl.Topology(**ed['topdldescription'])
938            else:
939                raise service_error(service_error.protocol, 
940                        "Bad splitter response (no output)")
941        else:
942            raise service_error(service_error.protocol, "Bad splitter response")
943
944    class start_segment:
945        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
946                cert_pwd=None, trusted_certs=None, caller=None,
947                log_collector=None):
948            self.log = log
949            self.debug = debug
950            self.cert_file = cert_file
951            self.cert_pwd = cert_pwd
952            self.trusted_certs = None
953            self.caller = caller
954            self.testbed = testbed
955            self.log_collector = log_collector
956            self.response = None
957
958        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
959                services=None):
960            req = {
961                    'allocID': { 'fedid' : aid }, 
962                    'segmentdescription': { 
963                        'topdldescription': topo.to_dict(),
964                    },
965                    'master': master,
966                }
967
968            if connInfo:
969                req['connection'] = connInfo
970            # Add services to request.  The master exports, everyone else
971            # imports.
972            if services:
973                svcs = [ x.copy() for x in services]
974                for s in svcs:
975                    if master: s['visibility'] = 'export'
976                    else: s['visibility'] = 'import'
977                req['service'] = svcs
978            if attrs:
979                req['fedAttr'] = attrs
980
981            try:
982                self.log.debug("Calling StartSegment at %s " % uri)
983                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
984                        self.trusted_certs)
985                if r.has_key('StartSegmentResponseBody'):
986                    lval = r['StartSegmentResponseBody'].get('allocationLog',
987                            None)
988                    if lval and self.log_collector:
989                        for line in  lval.splitlines(True):
990                            self.log_collector.write(line)
991                    self.response = r
992                else:
993                    raise service_error(service_error.internal, 
994                            "Bad response!?: %s" %r)
995                return True
996            except service_error, e:
997                self.log.error("Start segment failed on %s: %s" % \
998                        (self.testbed, e))
999                return False
1000
1001
1002
1003    class terminate_segment:
1004        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1005                cert_pwd=None, trusted_certs=None, caller=None):
1006            self.log = log
1007            self.debug = debug
1008            self.cert_file = cert_file
1009            self.cert_pwd = cert_pwd
1010            self.trusted_certs = None
1011            self.caller = caller
1012            self.testbed = testbed
1013
1014        def __call__(self, uri, aid ):
1015            req = {
1016                    'allocID': aid , 
1017                }
1018            try:
1019                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1020                        self.trusted_certs)
1021                return True
1022            except service_error, e:
1023                self.log.error("Terminate segment failed on %s: %s" % \
1024                        (self.testbed, e))
1025                return False
1026   
1027
1028    def allocate_resources(self, allocated, master, eid, expid, 
1029            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1030            attrs=None, connInfo={}, services=[]):
1031
1032        started = { }           # Testbeds where a sub-experiment started
1033                                # successfully
1034
1035        # XXX
1036        fail_soft = False
1037
1038        log = alloc_log or self.log
1039
1040        thread_pool = self.thread_pool(self.nthreads)
1041        threads = [ ]
1042
1043        for tb in allocated.keys():
1044            # Create and start a thread to start the segment, and save it
1045            # to get the return value later
1046            thread_pool.wait_for_slot()
1047            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1048            if not uri:
1049                raise service_error(service_error.internal, 
1050                        "Unknown testbed %s !?" % tb)
1051
1052            if tbparams[tb].has_key('allocID') and \
1053                    tbparams[tb]['allocID'].has_key('fedid'):
1054                aid = tbparams[tb]['allocID']['fedid']
1055            else:
1056                raise service_error(service_error.internal, 
1057                        "No alloc id for testbed %s !?" % tb)
1058
1059            t  = self.pooled_thread(\
1060                    target=self.start_segment(log=log, debug=self.debug,
1061                        testbed=tb, cert_file=self.cert_file,
1062                        cert_pwd=self.cert_pwd,
1063                        trusted_certs=self.trusted_certs,
1064                        caller=self.call_StartSegment,
1065                        log_collector=log_collector), 
1066                    args=(uri, aid, topo[tb], tb == master, 
1067                        attrs, connInfo[tb], services),
1068                    name=tb,
1069                    pdata=thread_pool, trace_file=self.trace_file)
1070            threads.append(t)
1071            t.start()
1072
1073        # Wait until all finish (keep pinging the log, though)
1074        mins = 0
1075        revoked = False
1076        while not thread_pool.wait_for_all_done(60.0):
1077            mins += 1
1078            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1079                    % mins)
1080            if not revoked and \
1081                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1082                # a testbed has failed.  Revoke this experiment's
1083                # synchronizarion values so that sub experiments will not
1084                # deadlock waiting for synchronization that will never happen
1085                self.log.info("A subexperiment has failed to swap in, " + \
1086                        "revoking synch keys")
1087                var_key = "fedid:%s" % expid
1088                for k in self.synch_store.all_keys():
1089                    if len(k) > 45 and k[0:46] == var_key:
1090                        self.synch_store.revoke_key(k)
1091                revoked = True
1092
1093        failed = [ t.getName() for t in threads if not t.rv ]
1094        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1095
1096        # If one failed clean up, unless fail_soft is set
1097        if failed:
1098            if not fail_soft:
1099                thread_pool.clear()
1100                for tb in succeeded:
1101                    # Create and start a thread to stop the segment
1102                    thread_pool.wait_for_slot()
1103                    uri = tbparams[tb]['uri']
1104                    t  = self.pooled_thread(\
1105                            target=self.terminate_segment(log=log,
1106                                testbed=tb,
1107                                cert_file=self.cert_file, 
1108                                cert_pwd=self.cert_pwd,
1109                                trusted_certs=self.trusted_certs,
1110                                caller=self.call_TerminateSegment),
1111                            args=(uri, tbparams[tb]['federant']['allocID']),
1112                            name=tb,
1113                            pdata=thread_pool, trace_file=self.trace_file)
1114                    t.start()
1115                # Wait until all finish (if any are being stopped)
1116                if succeeded:
1117                    thread_pool.wait_for_all_done()
1118
1119                # release the allocations
1120                for tb in tbparams.keys():
1121                    self.release_access(tb, tbparams[tb]['allocID'],
1122                            tbparams[tb].get('uri', None))
1123                # Remove the placeholder
1124                self.state_lock.acquire()
1125                self.state[eid]['experimentStatus'] = 'failed'
1126                if self.state_filename: self.write_state()
1127                self.state_lock.release()
1128
1129                log.error("Swap in failed on %s" % ",".join(failed))
1130                return
1131        else:
1132            log.info("[start_segment]: Experiment %s active" % eid)
1133
1134
1135        # Walk up tmpdir, deleting as we go
1136        if self.cleanup:
1137            log.debug("[start_experiment]: removing %s" % tmpdir)
1138            for path, dirs, files in os.walk(tmpdir, topdown=False):
1139                for f in files:
1140                    os.remove(os.path.join(path, f))
1141                for d in dirs:
1142                    os.rmdir(os.path.join(path, d))
1143            os.rmdir(tmpdir)
1144        else:
1145            log.debug("[start_experiment]: not removing %s" % tmpdir)
1146
1147        # Insert the experiment into our state and update the disk copy
1148        self.state_lock.acquire()
1149        self.state[expid]['experimentStatus'] = 'active'
1150        self.state[eid] = self.state[expid]
1151        if self.state_filename: self.write_state()
1152        self.state_lock.release()
1153        return
1154
1155
1156    def add_kit(self, e, kit):
1157        """
1158        Add a Software object created from the list of (install, location)
1159        tuples passed as kit  to the software attribute of an object e.  We
1160        do this enough to break out the code, but it's kind of a hack to
1161        avoid changing the old tuple rep.
1162        """
1163
1164        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1165
1166        if isinstance(e.software, list): e.software.extend(s)
1167        else: e.software = s
1168
1169
1170    def create_experiment_state(self, fid, req, expid, expcert, 
1171            state='starting'):
1172        """
1173        Create the initial entry in the experiment's state.  The expid and
1174        expcert are the experiment's fedid and certifacte that represents that
1175        ID, which are installed in the experiment state.  If the request
1176        includes a suggested local name that is used if possible.  If the local
1177        name is already taken by an experiment owned by this user that has
1178        failed, it is overwritten.  Otherwise new letters are added until a
1179        valid localname is found.  The generated local name is returned.
1180        """
1181
1182        if req.has_key('experimentID') and \
1183                req['experimentID'].has_key('localname'):
1184            overwrite = False
1185            eid = req['experimentID']['localname']
1186            # If there's an old failed experiment here with the same local name
1187            # and accessible by this user, we'll overwrite it, otherwise we'll
1188            # fall through and do the collision avoidance.
1189            old_expid = self.get_experiment_fedid(eid)
1190            if old_expid and self.check_experiment_access(fid, old_expid):
1191                self.state_lock.acquire()
1192                status = self.state[eid].get('experimentStatus', None)
1193                if status and status == 'failed':
1194                    # remove the old access attribute
1195                    self.auth.unset_attribute(fid, old_expid)
1196                    overwrite = True
1197                    del self.state[eid]
1198                    del self.state[old_expid]
1199                self.state_lock.release()
1200            self.state_lock.acquire()
1201            while (self.state.has_key(eid) and not overwrite):
1202                eid += random.choice(string.ascii_letters)
1203            # Initial state
1204            self.state[eid] = {
1205                    'experimentID' : \
1206                            [ { 'localname' : eid }, {'fedid': expid } ],
1207                    'experimentStatus': state,
1208                    'experimentAccess': { 'X509' : expcert },
1209                    'owner': fid,
1210                    'log' : [],
1211                }
1212            self.state[expid] = self.state[eid]
1213            if self.state_filename: self.write_state()
1214            self.state_lock.release()
1215        else:
1216            eid = self.exp_stem
1217            for i in range(0,5):
1218                eid += random.choice(string.ascii_letters)
1219            self.state_lock.acquire()
1220            while (self.state.has_key(eid)):
1221                eid = self.exp_stem
1222                for i in range(0,5):
1223                    eid += random.choice(string.ascii_letters)
1224            # Initial state
1225            self.state[eid] = {
1226                    'experimentID' : \
1227                            [ { 'localname' : eid }, {'fedid': expid } ],
1228                    'experimentStatus': state,
1229                    'experimentAccess': { 'X509' : expcert },
1230                    'owner': fid,
1231                    'log' : [],
1232                }
1233            self.state[expid] = self.state[eid]
1234            if self.state_filename: self.write_state()
1235            self.state_lock.release()
1236
1237        return eid
1238
1239
1240    def allocate_ips_to_topo(self, top):
1241        """
1242        Add an ip4_address attribute to all the hosts in the topology, based on
1243        the shared substrates on which they sit.  An /etc/hosts file is also
1244        created and returned as a list of hostfiles entries.  We also return
1245        the allocator, because we may need to allocate IPs to portals
1246        (specifically DRAGON portals).
1247        """
1248        subs = sorted(top.substrates, 
1249                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1250                reverse=True)
1251        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1252        ifs = { }
1253        hosts = [ ]
1254
1255        for idx, s in enumerate(subs):
1256            net_size = len(s.interfaces)+2
1257
1258            a = ips.allocate(net_size)
1259            if a :
1260                base, num = a
1261                if num < net_size: 
1262                    raise service_error(service_error.internal,
1263                            "Allocator returned wrong number of IPs??")
1264            else:
1265                raise service_error(service_error.req, 
1266                        "Cannot allocate IP addresses")
1267            mask = ips.min_alloc
1268            while mask < net_size:
1269                mask *= 2
1270
1271            netmask = ((2**32-1) ^ (mask-1))
1272
1273            base += 1
1274            for i in s.interfaces:
1275                i.attribute.append(
1276                        topdl.Attribute('ip4_address', 
1277                            "%s" % ip_addr(base)))
1278                i.attribute.append(
1279                        topdl.Attribute('ip4_netmask', 
1280                            "%s" % ip_addr(int(netmask))))
1281
1282                hname = i.element.name[0]
1283                if ifs.has_key(hname):
1284                    hosts.append("%s\t%s-%s %s-%d" % \
1285                            (ip_addr(base), hname, s.name, hname,
1286                                ifs[hname]))
1287                else:
1288                    ifs[hname] = 0
1289                    hosts.append("%s\t%s-%s %s-%d %s" % \
1290                            (ip_addr(base), hname, s.name, hname,
1291                                ifs[hname], hname))
1292
1293                ifs[hname] += 1
1294                base += 1
1295        return hosts, ips
1296
1297    def get_access_to_testbeds(self, testbeds, access_user, 
1298            export_project, master, allocated, tbparams, services):
1299        """
1300        Request access to the various testbeds required for this instantiation
1301        (passed in as testbeds).  User, access_user, expoert_project and master
1302        are used to construct the correct requests.  Per-testbed parameters are
1303        returned in tbparams.
1304        """
1305        for tb in testbeds:
1306            self.get_access(tb, None, tbparams, master,
1307                    export_project, access_user, services)
1308            allocated[tb] = 1
1309
1310    def split_topology(self, top, topo, testbeds):
1311        """
1312        Create the sub-topologies that are needed for experiment instantiation.
1313        """
1314        for tb in testbeds:
1315            topo[tb] = top.clone()
1316            # copy in for loop allows deletions from the original
1317            for e in [ e for e in topo[tb].elements]:
1318                etb = e.get_attribute('testbed')
1319                # NB: elements without a testbed attribute won't appear in any
1320                # sub topologies. 
1321                if not etb or etb != tb:
1322                    for i in e.interface:
1323                        for s in i.subs:
1324                            try:
1325                                s.interfaces.remove(i)
1326                            except ValueError:
1327                                raise service_error(service_error.internal,
1328                                        "Can't remove interface??")
1329                    topo[tb].elements.remove(e)
1330            topo[tb].make_indices()
1331
1332    def wrangle_software(self, expid, top, topo, tbparams):
1333        """
1334        Copy software out to the repository directory, allocate permissions and
1335        rewrite the segment topologies to look for the software in local
1336        places.
1337        """
1338
1339        # Copy the rpms and tarfiles to a distribution directory from
1340        # which the federants can retrieve them
1341        linkpath = "%s/software" %  expid
1342        softdir ="%s/%s" % ( self.repodir, linkpath)
1343        softmap = { }
1344        # These are in a list of tuples format (each kit).  This comprehension
1345        # unwraps them into a single list of tuples that initilaizes the set of
1346        # tuples.
1347        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1348                for p, t in l ])
1349        pkgs.update([x.location for e in top.elements \
1350                for x in e.software])
1351        try:
1352            os.makedirs(softdir)
1353        except IOError, e:
1354            raise service_error(
1355                    "Cannot create software directory: %s" % e)
1356        # The actual copying.  Everything's converted into a url for copying.
1357        for pkg in pkgs:
1358            loc = pkg
1359
1360            scheme, host, path = urlparse(loc)[0:3]
1361            dest = os.path.basename(path)
1362            if not scheme:
1363                if not loc.startswith('/'):
1364                    loc = "/%s" % loc
1365                loc = "file://%s" %loc
1366            try:
1367                u = urlopen(loc)
1368            except Exception, e:
1369                raise service_error(service_error.req, 
1370                        "Cannot open %s: %s" % (loc, e))
1371            try:
1372                f = open("%s/%s" % (softdir, dest) , "w")
1373                self.log.debug("Writing %s/%s" % (softdir,dest) )
1374                data = u.read(4096)
1375                while data:
1376                    f.write(data)
1377                    data = u.read(4096)
1378                f.close()
1379                u.close()
1380            except Exception, e:
1381                raise service_error(service_error.internal,
1382                        "Could not copy %s: %s" % (loc, e))
1383            path = re.sub("/tmp", "", linkpath)
1384            # XXX
1385            softmap[pkg] = \
1386                    "%s/%s/%s" %\
1387                    ( self.repo_url, path, dest)
1388
1389            # Allow the individual segments to access the software.
1390            for tb in tbparams.keys():
1391                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1392                        "/%s/%s" % ( path, dest))
1393
1394        # Convert the software locations in the segments into the local
1395        # copies on this host
1396        for soft in [ s for tb in topo.values() \
1397                for e in tb.elements \
1398                    if getattr(e, 'software', False) \
1399                        for s in e.software ]:
1400            if softmap.has_key(soft.location):
1401                soft.location = softmap[soft.location]
1402
1403
1404    def new_experiment(self, req, fid):
1405        """
1406        The external interface to empty initial experiment creation called from
1407        the dispatcher.
1408
1409        Creates a working directory, splits the incoming description using the
1410        splitter script and parses out the avrious subsections using the
1411        lcasses above.  Once each sub-experiment is created, use pooled threads
1412        to instantiate them and start it all up.
1413        """
1414        if not self.auth.check_attribute(fid, 'new'):
1415            raise service_error(service_error.access, "New access denied")
1416
1417        try:
1418            tmpdir = tempfile.mkdtemp(prefix="split-")
1419        except IOError:
1420            raise service_error(service_error.internal, "Cannot create tmp dir")
1421
1422        try:
1423            access_user = self.accessdb[fid]
1424        except KeyError:
1425            raise service_error(service_error.internal,
1426                    "Access map and authorizer out of sync in " + \
1427                            "new_experiment for fedid %s"  % fid)
1428
1429        pid = "dummy"
1430        gid = "dummy"
1431
1432        req = req.get('NewRequestBody', None)
1433        if not req:
1434            raise service_error(service_error.req,
1435                    "Bad request format (no NewRequestBody)")
1436
1437        # Generate an ID for the experiment (slice) and a certificate that the
1438        # allocator can use to prove they own it.  We'll ship it back through
1439        # the encrypted connection.
1440        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1441
1442        #now we're done with the tmpdir, and it should be empty
1443        if self.cleanup:
1444            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1445            os.rmdir(tmpdir)
1446        else:
1447            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1448
1449        eid = self.create_experiment_state(fid, req, expid, expcert, 
1450                state='empty')
1451
1452        # Let users touch the state
1453        self.auth.set_attribute(fid, expid)
1454        self.auth.set_attribute(expid, expid)
1455        # Override fedids can manipulate state as well
1456        for o in self.overrides:
1457            self.auth.set_attribute(o, expid)
1458
1459        rv = {
1460                'experimentID': [
1461                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1462                ],
1463                'experimentStatus': 'empty',
1464                'experimentAccess': { 'X509' : expcert }
1465            }
1466
1467        return rv
1468
1469    def get_master_project(self, req):
1470        master= None
1471        export_project = None
1472        for e in [ s for s in req.get('service', []) \
1473                if s.get('name') == 'project_export']:
1474            for a in e.get('fedAttr', []):
1475                attr = a.get('attribute', None)
1476                if attr == 'testbed':
1477                    master = a.get('value', None)
1478                elif attr == 'project':
1479                    export_project = a.get('value', None)
1480       
1481        return (master, export_project)
1482
1483
1484
1485    def create_experiment(self, req, fid):
1486        """
1487        The external interface to experiment creation called from the
1488        dispatcher.
1489
1490        Creates a working directory, splits the incoming description using the
1491        splitter script and parses out the avrious subsections using the
1492        lcasses above.  Once each sub-experiment is created, use pooled threads
1493        to instantiate them and start it all up.
1494        """
1495
1496        req = req.get('CreateRequestBody', None)
1497        if not req:
1498            raise service_error(service_error.req,
1499                    "Bad request format (no CreateRequestBody)")
1500
1501        # Get the experiment access
1502        exp = req.get('experimentID', None)
1503        if exp:
1504            if exp.has_key('fedid'):
1505                key = exp['fedid']
1506                expid = key
1507                eid = None
1508            elif exp.has_key('localname'):
1509                key = exp['localname']
1510                eid = key
1511                expid = None
1512            else:
1513                raise service_error(service_error.req, "Unknown lookup type")
1514        else:
1515            raise service_error(service_error.req, "No request?")
1516
1517        self.check_experiment_access(fid, key)
1518
1519        try:
1520            tmpdir = tempfile.mkdtemp(prefix="split-")
1521            os.mkdir(tmpdir+"/keys")
1522        except IOError:
1523            raise service_error(service_error.internal, "Cannot create tmp dir")
1524
1525        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1526        gw_secretkey_base = "fed.%s" % self.ssh_type
1527        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1528        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1529        tclfile = tmpdir + "/experiment.tcl"
1530        tbparams = { }
1531        try:
1532            access_user = self.accessdb[fid]
1533        except KeyError:
1534            raise service_error(service_error.internal,
1535                    "Access map and authorizer out of sync in " + \
1536                            "create_experiment for fedid %s"  % fid)
1537
1538        pid = "dummy"
1539        gid = "dummy"
1540
1541        # The tcl parser needs to read a file so put the content into that file
1542        descr=req.get('experimentdescription', None)
1543        if descr:
1544            file_content=descr.get('ns2description', None)
1545            if file_content:
1546                try:
1547                    f = open(tclfile, 'w')
1548                    f.write(file_content)
1549                    f.close()
1550                except IOError:
1551                    raise service_error(service_error.internal,
1552                            "Cannot write temp experiment description")
1553            else:
1554                raise service_error(service_error.req, 
1555                        "Only ns2descriptions supported")
1556        else:
1557            raise service_error(service_error.req, "No experiment description")
1558
1559        self.state_lock.acquire()
1560        if self.state.has_key(key):
1561            self.state[key]['experimentStatus'] = "starting"
1562            for e in self.state[key].get('experimentID',[]):
1563                if not expid and e.has_key('fedid'):
1564                    expid = e['fedid']
1565                elif not eid and e.has_key('localname'):
1566                    eid = e['localname']
1567        self.state_lock.release()
1568
1569        if not (eid and expid):
1570            raise service_error(service_error.internal, 
1571                    "Cannot find local experiment info!?")
1572
1573        try: 
1574            # This catches exceptions to clear the placeholder if necessary
1575            try:
1576                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1577            except ValueError:
1578                raise service_error(service_error.server_config, 
1579                        "Bad key type (%s)" % self.ssh_type)
1580            master, export_project = self.get_master_project(req)
1581            # XXX get these out when master and project are optional
1582            if not master:
1583                raise service_error(service_error.req,
1584                        "No master testbed label")
1585            if not export_project:
1586                raise service_error(service_error.req, "No export project")
1587            # XXX
1588
1589            # Translate to topdl
1590            if self.splitter_url:
1591                # XXX: need remote topdl translator
1592                self.log.debug("Calling remote splitter at %s" % \
1593                        self.splitter_url)
1594                top = self.remote_ns2topdl(self.splitter_url, file_content)
1595            else:
1596                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1597                    str(self.muxmax), '-m', master]
1598
1599                tclcmd.extend([pid, gid, eid, tclfile])
1600
1601                self.log.debug("running local splitter %s", " ".join(tclcmd))
1602                # This is just fantastic.  As a side effect the parser copies
1603                # tb_compat.tcl into the current directory, so that directory
1604                # must be writable by the fedd user.  Doing this in the
1605                # temporary subdir ensures this is the case.
1606                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1607                        cwd=tmpdir)
1608                split_data = tclparser.stdout
1609
1610                top = topdl.topology_from_xml(file=split_data, top="experiment")
1611
1612            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1613             # Find the testbeds to look up
1614            testbeds = set([ a.value for e in top.elements \
1615                    for a in e.attribute \
1616                    if a.attribute == 'testbed'] )
1617
1618            allocated = { }         # Testbeds we can access
1619            topo ={ }               # Sub topologies
1620            connInfo = { }          # Connection information
1621            services = [ ]
1622            self.get_access_to_testbeds(testbeds, access_user, 
1623                    export_project, master, allocated, tbparams, services)
1624            self.split_topology(top, topo, testbeds)
1625
1626            # Copy configuration files into the remote file store
1627            # The config urlpath
1628            configpath = "/%s/config" % expid
1629            # The config file system location
1630            configdir ="%s%s" % ( self.repodir, configpath)
1631            try:
1632                os.makedirs(configdir)
1633            except IOError, e:
1634                raise service_error(
1635                        "Cannot create config directory: %s" % e)
1636            try:
1637                f = open("%s/hosts" % configdir, "w")
1638                f.write('\n'.join(hosts))
1639                f.close()
1640            except IOError, e:
1641                raise service_error(service_error.internal, 
1642                        "Cannot write hosts file: %s" % e)
1643            try:
1644                copy_file("%s" % gw_pubkey, "%s/%s" % \
1645                        (configdir, gw_pubkey_base))
1646                copy_file("%s" % gw_secretkey, "%s/%s" % \
1647                        (configdir, gw_secretkey_base))
1648            except IOError, e:
1649                raise service_error(service_error.internal, 
1650                        "Cannot copy keyfiles: %s" % e)
1651
1652            # Allow the individual testbeds to access the configuration files.
1653            for tb in tbparams.keys():
1654                asignee = tbparams[tb]['allocID']['fedid']
1655                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1656                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1657
1658            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1659                    self.muxmax)
1660            part.add_portals(top, topo, eid, master, tbparams, ip_allocator,
1661                    connInfo, expid)
1662            # Now get access to the dynamic testbeds
1663            for k, t in topo.items():
1664                if not t.get_attribute('dynamic'):
1665                    continue
1666                tb = t.get_attribute('testbed')
1667                if tb: 
1668                    self.get_access(tb, None, tbparams, master, 
1669                            export_project, access_user, services)
1670                    tbparams[k] = tbparams[tb]
1671                    del tbparams[tb]
1672                    allocated[k] = 1
1673                    store_keys = t.get_attribute('store_keys')
1674                    # Give the testbed access to keys it exports or imports
1675                    if store_keys:
1676                        for sk in store_keys.split(" "):
1677                            self.auth.set_attribute(\
1678                                    tbparams[k]['allocID']['fedid'], sk)
1679                else:
1680                    raise service_error(service_error.internal, 
1681                            "Dynamic allocation from no testbed!?")
1682
1683            self.wrangle_software(expid, top, topo, tbparams)
1684
1685            vtopo = topdl.topology_to_vtopo(top)
1686            vis = self.genviz(vtopo)
1687
1688            # save federant information
1689            for k in allocated.keys():
1690                tbparams[k]['federant'] = {
1691                        'name': [ { 'localname' : eid} ],
1692                        'allocID' : tbparams[k]['allocID'],
1693                        'master' : k == master,
1694                        'uri': tbparams[k]['uri'],
1695                    }
1696                if tbparams[k].has_key('emulab'):
1697                        tbparams[k]['federant']['emulab'] = \
1698                                tbparams[k]['emulab']
1699
1700            self.state_lock.acquire()
1701            self.state[eid]['vtopo'] = vtopo
1702            self.state[eid]['vis'] = vis
1703            self.state[expid]['federant'] = \
1704                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1705                        if tbparams[tb].has_key('federant') ]
1706            if self.state_filename: 
1707                self.write_state()
1708            self.state_lock.release()
1709        except service_error, e:
1710            # If something goes wrong in the parse (usually an access error)
1711            # clear the placeholder state.  From here on out the code delays
1712            # exceptions.  Failing at this point returns a fault to the remote
1713            # caller.
1714
1715            self.state_lock.acquire()
1716            del self.state[eid]
1717            del self.state[expid]
1718            if self.state_filename: self.write_state()
1719            self.state_lock.release()
1720            raise e
1721
1722
1723        # Start the background swapper and return the starting state.  From
1724        # here on out, the state will stick around a while.
1725
1726        # Let users touch the state
1727        self.auth.set_attribute(fid, expid)
1728        self.auth.set_attribute(expid, expid)
1729        # Override fedids can manipulate state as well
1730        for o in self.overrides:
1731            self.auth.set_attribute(o, expid)
1732
1733        # Create a logger that logs to the experiment's state object as well as
1734        # to the main log file.
1735        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1736        alloc_collector = self.list_log(self.state[eid]['log'])
1737        h = logging.StreamHandler(alloc_collector)
1738        # XXX: there should be a global one of these rather than repeating the
1739        # code.
1740        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1741                    '%d %b %y %H:%M:%S'))
1742        alloc_log.addHandler(h)
1743       
1744        attrs = [ 
1745                {
1746                    'attribute': 'ssh_pubkey', 
1747                    'value': '%s/%s/config/%s' % \
1748                            (self.repo_url, expid, gw_pubkey_base)
1749                },
1750                {
1751                    'attribute': 'ssh_secretkey', 
1752                    'value': '%s/%s/config/%s' % \
1753                            (self.repo_url, expid, gw_secretkey_base)
1754                },
1755                {
1756                    'attribute': 'hosts', 
1757                    'value': '%s/%s/config/hosts' % \
1758                            (self.repo_url, expid)
1759                },
1760                {
1761                    'attribute': 'experiment_name',
1762                    'value': eid,
1763                },
1764            ]
1765
1766        # transit and disconnected testbeds may not have a connInfo entry.
1767        # Fill in the blanks.
1768        for t in allocated.keys():
1769            if not connInfo.has_key(t):
1770                connInfo[t] = { }
1771
1772        # Start a thread to do the resource allocation
1773        t  = Thread(target=self.allocate_resources,
1774                args=(allocated, master, eid, expid, tbparams, 
1775                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
1776                    services),
1777                name=eid)
1778        t.start()
1779
1780        rv = {
1781                'experimentID': [
1782                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1783                ],
1784                'experimentStatus': 'starting',
1785            }
1786
1787        return rv
1788   
1789    def get_experiment_fedid(self, key):
1790        """
1791        find the fedid associated with the localname key in the state database.
1792        """
1793
1794        rv = None
1795        self.state_lock.acquire()
1796        if self.state.has_key(key):
1797            if isinstance(self.state[key], dict):
1798                try:
1799                    kl = [ f['fedid'] for f in \
1800                            self.state[key]['experimentID']\
1801                                if f.has_key('fedid') ]
1802                except KeyError:
1803                    self.state_lock.release()
1804                    raise service_error(service_error.internal, 
1805                            "No fedid for experiment %s when getting "+\
1806                                    "fedid(!?)" % key)
1807                if len(kl) == 1:
1808                    rv = kl[0]
1809                else:
1810                    self.state_lock.release()
1811                    raise service_error(service_error.internal, 
1812                            "multiple fedids for experiment %s when " +\
1813                                    "getting fedid(!?)" % key)
1814            else:
1815                self.state_lock.release()
1816                raise service_error(service_error.internal, 
1817                        "Unexpected state for %s" % key)
1818        self.state_lock.release()
1819        return rv
1820
1821    def check_experiment_access(self, fid, key):
1822        """
1823        Confirm that the fid has access to the experiment.  Though a request
1824        may be made in terms of a local name, the access attribute is always
1825        the experiment's fedid.
1826        """
1827        if not isinstance(key, fedid):
1828            key = self.get_experiment_fedid(key)
1829
1830        if self.auth.check_attribute(fid, key):
1831            return True
1832        else:
1833            raise service_error(service_error.access, "Access Denied")
1834
1835
1836    def get_handler(self, path, fid):
1837        self.log.info("Get handler %s %s" % (path, fid))
1838        if self.auth.check_attribute(fid, path):
1839            return ("%s/%s" % (self.repodir, path), "application/binary")
1840        else:
1841            return (None, None)
1842
1843    def get_vtopo(self, req, fid):
1844        """
1845        Return the stored virtual topology for this experiment
1846        """
1847        rv = None
1848        state = None
1849
1850        req = req.get('VtopoRequestBody', None)
1851        if not req:
1852            raise service_error(service_error.req,
1853                    "Bad request format (no VtopoRequestBody)")
1854        exp = req.get('experiment', None)
1855        if exp:
1856            if exp.has_key('fedid'):
1857                key = exp['fedid']
1858                keytype = "fedid"
1859            elif exp.has_key('localname'):
1860                key = exp['localname']
1861                keytype = "localname"
1862            else:
1863                raise service_error(service_error.req, "Unknown lookup type")
1864        else:
1865            raise service_error(service_error.req, "No request?")
1866
1867        self.check_experiment_access(fid, key)
1868
1869        self.state_lock.acquire()
1870        if self.state.has_key(key):
1871            if self.state[key].has_key('vtopo'):
1872                rv = { 'experiment' : {keytype: key },\
1873                        'vtopo': self.state[key]['vtopo'],\
1874                    }
1875            else:
1876                state = self.state[key]['experimentStatus']
1877        self.state_lock.release()
1878
1879        if rv: return rv
1880        else: 
1881            if state:
1882                raise service_error(service_error.partial, 
1883                        "Not ready: %s" % state)
1884            else:
1885                raise service_error(service_error.req, "No such experiment")
1886
1887    def get_vis(self, req, fid):
1888        """
1889        Return the stored visualization for this experiment
1890        """
1891        rv = None
1892        state = None
1893
1894        req = req.get('VisRequestBody', None)
1895        if not req:
1896            raise service_error(service_error.req,
1897                    "Bad request format (no VisRequestBody)")
1898        exp = req.get('experiment', None)
1899        if exp:
1900            if exp.has_key('fedid'):
1901                key = exp['fedid']
1902                keytype = "fedid"
1903            elif exp.has_key('localname'):
1904                key = exp['localname']
1905                keytype = "localname"
1906            else:
1907                raise service_error(service_error.req, "Unknown lookup type")
1908        else:
1909            raise service_error(service_error.req, "No request?")
1910
1911        self.check_experiment_access(fid, key)
1912
1913        self.state_lock.acquire()
1914        if self.state.has_key(key):
1915            if self.state[key].has_key('vis'):
1916                rv =  { 'experiment' : {keytype: key },\
1917                        'vis': self.state[key]['vis'],\
1918                        }
1919            else:
1920                state = self.state[key]['experimentStatus']
1921        self.state_lock.release()
1922
1923        if rv: return rv
1924        else:
1925            if state:
1926                raise service_error(service_error.partial, 
1927                        "Not ready: %s" % state)
1928            else:
1929                raise service_error(service_error.req, "No such experiment")
1930
1931    def clean_info_response(self, rv):
1932        """
1933        Remove the information in the experiment's state object that is not in
1934        the info response.
1935        """
1936        # Remove the owner info (should always be there, but...)
1937        if rv.has_key('owner'): del rv['owner']
1938
1939        # Convert the log into the allocationLog parameter and remove the
1940        # log entry (with defensive programming)
1941        if rv.has_key('log'):
1942            rv['allocationLog'] = "".join(rv['log'])
1943            del rv['log']
1944        else:
1945            rv['allocationLog'] = ""
1946
1947        if rv['experimentStatus'] != 'active':
1948            if rv.has_key('federant'): del rv['federant']
1949        else:
1950            # remove the allocationID and uri info from each federant
1951            for f in rv.get('federant', []):
1952                if f.has_key('allocID'): del f['allocID']
1953                if f.has_key('uri'): del f['uri']
1954        return rv
1955
1956    def get_info(self, req, fid):
1957        """
1958        Return all the stored info about this experiment
1959        """
1960        rv = None
1961
1962        req = req.get('InfoRequestBody', None)
1963        if not req:
1964            raise service_error(service_error.req,
1965                    "Bad request format (no InfoRequestBody)")
1966        exp = req.get('experiment', None)
1967        if exp:
1968            if exp.has_key('fedid'):
1969                key = exp['fedid']
1970                keytype = "fedid"
1971            elif exp.has_key('localname'):
1972                key = exp['localname']
1973                keytype = "localname"
1974            else:
1975                raise service_error(service_error.req, "Unknown lookup type")
1976        else:
1977            raise service_error(service_error.req, "No request?")
1978
1979        self.check_experiment_access(fid, key)
1980
1981        # The state may be massaged by the service function that called
1982        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
1983        # state.
1984        self.state_lock.acquire()
1985        if self.state.has_key(key):
1986            rv = copy.deepcopy(self.state[key])
1987        self.state_lock.release()
1988
1989        if rv:
1990            return self.clean_info_response(rv)
1991        else:
1992            raise service_error(service_error.req, "No such experiment")
1993
1994    def get_multi_info(self, req, fid):
1995        """
1996        Return all the stored info that this fedid can access
1997        """
1998        rv = { 'info': [ ] }
1999
2000        self.state_lock.acquire()
2001        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2002            try:
2003                self.check_experiment_access(fid, key)
2004            except service_error, e:
2005                if e.code == service_error.access:
2006                    continue
2007                else:
2008                    self.state_lock.release()
2009                    raise e
2010
2011            if self.state.has_key(key):
2012                e = copy.deepcopy(self.state[key])
2013                e = self.clean_info_response(e)
2014                rv['info'].append(e)
2015        self.state_lock.release()
2016        return rv
2017
2018    def terminate_experiment(self, req, fid):
2019        """
2020        Swap this experiment out on the federants and delete the shared
2021        information
2022        """
2023        tbparams = { }
2024        req = req.get('TerminateRequestBody', None)
2025        if not req:
2026            raise service_error(service_error.req,
2027                    "Bad request format (no TerminateRequestBody)")
2028        force = req.get('force', False)
2029        exp = req.get('experiment', None)
2030        if exp:
2031            if exp.has_key('fedid'):
2032                key = exp['fedid']
2033                keytype = "fedid"
2034            elif exp.has_key('localname'):
2035                key = exp['localname']
2036                keytype = "localname"
2037            else:
2038                raise service_error(service_error.req, "Unknown lookup type")
2039        else:
2040            raise service_error(service_error.req, "No request?")
2041
2042        self.check_experiment_access(fid, key)
2043
2044        dealloc_list = [ ]
2045
2046
2047        # Create a logger that logs to the dealloc_list as well as to the main
2048        # log file.
2049        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2050        h = logging.StreamHandler(self.list_log(dealloc_list))
2051        # XXX: there should be a global one of these rather than repeating the
2052        # code.
2053        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2054                    '%d %b %y %H:%M:%S'))
2055        dealloc_log.addHandler(h)
2056
2057        self.state_lock.acquire()
2058        fed_exp = self.state.get(key, None)
2059
2060        if fed_exp:
2061            # This branch of the conditional holds the lock to generate a
2062            # consistent temporary tbparams variable to deallocate experiments.
2063            # It releases the lock to do the deallocations and reacquires it to
2064            # remove the experiment state when the termination is complete.
2065
2066            # First make sure that the experiment creation is complete.
2067            status = fed_exp.get('experimentStatus', None)
2068
2069            if status:
2070                if status in ('starting', 'terminating'):
2071                    if not force:
2072                        self.state_lock.release()
2073                        raise service_error(service_error.partial, 
2074                                'Experiment still being created or destroyed')
2075                    else:
2076                        self.log.warning('Experiment in %s state ' % status + \
2077                                'being terminated by force.')
2078            else:
2079                # No status??? trouble
2080                self.state_lock.release()
2081                raise service_error(service_error.internal,
2082                        "Experiment has no status!?")
2083
2084            ids = []
2085            #  experimentID is a list of dicts that are self-describing
2086            #  identifiers.  This finds all the fedids and localnames - the
2087            #  keys of self.state - and puts them into ids.
2088            for id in fed_exp.get('experimentID', []):
2089                if id.has_key('fedid'): ids.append(id['fedid'])
2090                if id.has_key('localname'): ids.append(id['localname'])
2091
2092            # Collect the allocation/segment ids into a dict keyed by the fedid
2093            # of the allocation (or a monotonically increasing integer) that
2094            # contains a tuple of uri, aid (which is a dict...)
2095            for i, fed in enumerate(fed_exp.get('federant', [])):
2096                try:
2097                    uri = fed['uri']
2098                    aid = fed['allocID']
2099                    k = fed['allocID'].get('fedid', i)
2100                except KeyError, e:
2101                    continue
2102                tbparams[k] = (uri, aid)
2103            fed_exp['experimentStatus'] = 'terminating'
2104            if self.state_filename: self.write_state()
2105            self.state_lock.release()
2106
2107            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2108            # then completes, so we can't wait if nothing starts.  So, no
2109            # tbparams, no start.
2110            if len(tbparams) > 0:
2111                thread_pool = self.thread_pool(self.nthreads)
2112                for k in tbparams.keys():
2113                    # Create and start a thread to stop the segment
2114                    thread_pool.wait_for_slot()
2115                    uri, aid = tbparams[k]
2116                    t  = self.pooled_thread(\
2117                            target=self.terminate_segment(log=dealloc_log,
2118                                testbed=uri,
2119                                cert_file=self.cert_file, 
2120                                cert_pwd=self.cert_pwd,
2121                                trusted_certs=self.trusted_certs,
2122                                caller=self.call_TerminateSegment),
2123                            args=(uri, aid), name=k,
2124                            pdata=thread_pool, trace_file=self.trace_file)
2125                    t.start()
2126                # Wait for completions
2127                thread_pool.wait_for_all_done()
2128
2129            # release the allocations (failed experiments have done this
2130            # already, and starting experiments may be in odd states, so we
2131            # ignore errors releasing those allocations
2132            try: 
2133                for k in tbparams.keys():
2134                    # This releases access by uri
2135                    uri, aid = tbparams[k]
2136                    self.release_access(None, aid, uri=uri)
2137            except service_error, e:
2138                if status != 'failed' and not force:
2139                    raise e
2140
2141            # Remove the terminated experiment
2142            self.state_lock.acquire()
2143            for id in ids:
2144                if self.state.has_key(id): del self.state[id]
2145
2146            if self.state_filename: self.write_state()
2147            self.state_lock.release()
2148
2149            # Delete any synch points associated with this experiment.  All
2150            # synch points begin with the fedid of the experiment.
2151            fedid_keys = set(["fedid:%s" % f for f in ids \
2152                    if isinstance(f, fedid)])
2153            for k in self.synch_store.all_keys():
2154                try:
2155                    if len(k) > 45 and k[0:46] in fedid_keys:
2156                        self.synch_store.del_value(k)
2157                except synch_store.BadDeletionError:
2158                    pass
2159            self.write_store()
2160               
2161            return { 
2162                    'experiment': exp , 
2163                    'deallocationLog': "".join(dealloc_list),
2164                    }
2165        else:
2166            # Don't forget to release the lock
2167            self.state_lock.release()
2168            raise service_error(service_error.req, "No saved state")
2169
2170
2171    def GetValue(self, req, fid):
2172        """
2173        Get a value from the synchronized store
2174        """
2175        req = req.get('GetValueRequestBody', None)
2176        if not req:
2177            raise service_error(service_error.req,
2178                    "Bad request format (no GetValueRequestBody)")
2179       
2180        name = req['name']
2181        wait = req['wait']
2182        rv = { 'name': name }
2183
2184        if self.auth.check_attribute(fid, name):
2185            try:
2186                v = self.synch_store.get_value(name, wait)
2187            except synch_store.RevokedKeyError:
2188                # No more synch on this key
2189                raise service_error(service_error.federant, 
2190                        "Synch key %s revoked" % name)
2191            if v is not None:
2192                rv['value'] = v
2193            self.log.debug("[GetValue] got %s from %s" % (v, name))
2194            return rv
2195        else:
2196            raise service_error(service_error.access, "Access Denied")
2197       
2198
2199    def SetValue(self, req, fid):
2200        """
2201        Set a value in the synchronized store
2202        """
2203        req = req.get('SetValueRequestBody', None)
2204        if not req:
2205            raise service_error(service_error.req,
2206                    "Bad request format (no SetValueRequestBody)")
2207       
2208        name = req['name']
2209        v = req['value']
2210
2211        if self.auth.check_attribute(fid, name):
2212            try:
2213                self.synch_store.set_value(name, v)
2214                self.write_store()
2215                self.log.debug("[SetValue] set %s to %s" % (name, v))
2216            except synch_store.CollisionError:
2217                # Translate into a service_error
2218                raise service_error(service_error.req,
2219                        "Value already set: %s" %name)
2220            except synch_store.RevokedKeyError:
2221                # No more synch on this key
2222                raise service_error(service_error.federant, 
2223                        "Synch key %s revoked" % name)
2224            return { 'name': name, 'value': v }
2225        else:
2226            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.