source: fedd/federation/experiment_control.py @ 5f96438

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 5f96438 was 5f96438, checked in by Ted Faber <faber@…>, 15 years ago

checkpoint for download - not tested

  • Property mode set to 100644
File size: 78.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45class experiment_control_local:
46    """
47    Control of experiments that this system can directly access.
48
49    Includes experiment creation, termination and information dissemination.
50    Thred safe.
51    """
52
53    class ssh_cmd_timeout(RuntimeError): pass
54   
55    class thread_pool:
56        """
57        A class to keep track of a set of threads all invoked for the same
58        task.  Manages the mutual exclusion of the states.
59        """
60        def __init__(self, nthreads):
61            """
62            Start a pool.
63            """
64            self.changed = Condition()
65            self.started = 0
66            self.terminated = 0
67            self.nthreads = nthreads
68
69        def acquire(self):
70            """
71            Get the pool's lock.
72            """
73            self.changed.acquire()
74
75        def release(self):
76            """
77            Release the pool's lock.
78            """
79            self.changed.release()
80
81        def wait(self, timeout = None):
82            """
83            Wait for a pool thread to start or stop.
84            """
85            self.changed.wait(timeout)
86
87        def start(self):
88            """
89            Called by a pool thread to report starting.
90            """
91            self.changed.acquire()
92            self.started += 1
93            self.changed.notifyAll()
94            self.changed.release()
95
96        def terminate(self):
97            """
98            Called by a pool thread to report finishing.
99            """
100            self.changed.acquire()
101            self.terminated += 1
102            self.changed.notifyAll()
103            self.changed.release()
104
105        def clear(self):
106            """
107            Clear all pool data.
108            """
109            self.changed.acquire()
110            self.started = 0
111            self.terminated =0
112            self.changed.notifyAll()
113            self.changed.release()
114
115        def wait_for_slot(self):
116            """
117            Wait until we have a free slot to start another pooled thread
118            """
119            self.acquire()
120            while self.started - self.terminated >= self.nthreads:
121                self.wait()
122            self.release()
123
124        def wait_for_all_done(self, timeout=None):
125            """
126            Wait until all active threads finish (and at least one has
127            started).  If a timeout is given, return after waiting that long
128            for termination.  If all threads are done (and one has started in
129            the since the last clear()) return True, otherwise False.
130            """
131            if timeout:
132                deadline = time.time() + timeout
133            self.acquire()
134            while self.started == 0 or self.started > self.terminated:
135                self.wait(timeout)
136                if timeout:
137                    if time.time() > deadline:
138                        break
139                    timeout = deadline - time.time()
140            self.release()
141            return not (self.started == 0 or self.started > self.terminated)
142
143    class pooled_thread(Thread):
144        """
145        One of a set of threads dedicated to a specific task.  Uses the
146        thread_pool class above for coordination.
147        """
148        def __init__(self, group=None, target=None, name=None, args=(), 
149                kwargs={}, pdata=None, trace_file=None):
150            Thread.__init__(self, group, target, name, args, kwargs)
151            self.rv = None          # Return value of the ops in this thread
152            self.exception = None   # Exception that terminated this thread
153            self.target=target      # Target function to run on start()
154            self.args = args        # Args to pass to target
155            self.kwargs = kwargs    # Additional kw args
156            self.pdata = pdata      # thread_pool for this class
157            # Logger for this thread
158            self.log = logging.getLogger("fedd.experiment_control")
159       
160        def run(self):
161            """
162            Emulate Thread.run, except add pool data manipulation and error
163            logging.
164            """
165            if self.pdata:
166                self.pdata.start()
167
168            if self.target:
169                try:
170                    self.rv = self.target(*self.args, **self.kwargs)
171                except service_error, s:
172                    self.exception = s
173                    self.log.error("Thread exception: %s %s" % \
174                            (s.code_string(), s.desc))
175                except:
176                    self.exception = sys.exc_info()[1]
177                    self.log.error(("Unexpected thread exception: %s" +\
178                            "Trace %s") % (self.exception,\
179                                traceback.format_exc()))
180            if self.pdata:
181                self.pdata.terminate()
182
183    call_RequestAccess = service_caller('RequestAccess')
184    call_ReleaseAccess = service_caller('ReleaseAccess')
185    call_StartSegment = service_caller('StartSegment')
186    call_TerminateSegment = service_caller('TerminateSegment')
187    call_Ns2Topdl = service_caller('Ns2Topdl')
188
189    def __init__(self, config=None, auth=None):
190        """
191        Intialize the various attributes, most from the config object
192        """
193
194        def parse_tarfile_list(tf):
195            """
196            Parse a tarfile list from the configuration.  This is a set of
197            paths and tarfiles separated by spaces.
198            """
199            rv = [ ]
200            if tf is not None:
201                tl = tf.split()
202                while len(tl) > 1:
203                    p, t = tl[0:2]
204                    del tl[0:2]
205                    rv.append((p, t))
206            return rv
207
208        self.thread_with_rv = experiment_control_local.pooled_thread
209        self.thread_pool = experiment_control_local.thread_pool
210        self.list_log = list_log.list_log
211
212        self.cert_file = config.get("experiment_control", "cert_file")
213        if self.cert_file:
214            self.cert_pwd = config.get("experiment_control", "cert_pwd")
215        else:
216            self.cert_file = config.get("globals", "cert_file")
217            self.cert_pwd = config.get("globals", "cert_pwd")
218
219        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
220                or config.get("globals", "trusted_certs")
221
222        self.repodir = config.get("experiment_control", "repodir")
223        self.repo_url = config.get("experiment_control", "repo_url", 
224                "https://users.isi.deterlab.net:23235");
225
226        self.exp_stem = "fed-stem"
227        self.log = logging.getLogger("fedd.experiment_control")
228        set_log_level(config, "experiment_control", self.log)
229        self.muxmax = 2
230        self.nthreads = 10
231        self.randomize_experiments = False
232
233        self.splitter = None
234        self.ssh_keygen = "/usr/bin/ssh-keygen"
235        self.ssh_identity_file = None
236
237
238        self.debug = config.getboolean("experiment_control", "create_debug")
239        self.cleanup = not config.getboolean("experiment_control", 
240                "leave_tmpfiles")
241        self.state_filename = config.get("experiment_control", 
242                "experiment_state")
243        self.store_filename = config.get("experiment_control", 
244                "synch_store")
245        self.store_url = config.get("experiment_control", "store_url")
246        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
247        self.fedkit = parse_tarfile_list(\
248                config.get("experiment_control", "fedkit"))
249        self.gatewaykit = parse_tarfile_list(\
250                config.get("experiment_control", "gatewaykit"))
251        accessdb_file = config.get("experiment_control", "accessdb")
252
253        self.ssh_pubkey_file = config.get("experiment_control", 
254                "ssh_pubkey_file")
255        self.ssh_privkey_file = config.get("experiment_control",
256                "ssh_privkey_file")
257        # NB for internal master/slave ops, not experiment setup
258        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
259
260        self.overrides = set([])
261        ovr = config.get('experiment_control', 'overrides')
262        if ovr:
263            for o in ovr.split(","):
264                o = o.strip()
265                if o.startswith('fedid:'): o = o[len('fedid:'):]
266                self.overrides.add(fedid(hexstr=o))
267
268        self.state = { }
269        self.state_lock = Lock()
270        self.tclsh = "/usr/local/bin/otclsh"
271        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
272                config.get("experiment_control", "tcl_splitter",
273                        "/usr/testbed/lib/ns2ir/parse.tcl")
274        mapdb_file = config.get("experiment_control", "mapdb")
275        self.trace_file = sys.stderr
276
277        self.def_expstart = \
278                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
279                "/tmp/federate";
280        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
281                "FEDDIR/hosts";
282        self.def_gwstart = \
283                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
284                "/tmp/bridge.log";
285        self.def_mgwstart = \
286                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
287                "/tmp/bridge.log";
288        self.def_gwimage = "FBSD61-TUNNEL2";
289        self.def_gwtype = "pc";
290        self.local_access = { }
291
292        if auth:
293            self.auth = auth
294        else:
295            self.log.error(\
296                    "[access]: No authorizer initialized, creating local one.")
297            auth = authorizer()
298
299
300        if self.ssh_pubkey_file:
301            try:
302                f = open(self.ssh_pubkey_file, 'r')
303                self.ssh_pubkey = f.read()
304                f.close()
305            except IOError:
306                raise service_error(service_error.internal,
307                        "Cannot read sshpubkey")
308        else:
309            raise service_error(service_error.internal, 
310                    "No SSH public key file?")
311
312        if not self.ssh_privkey_file:
313            raise service_error(service_error.internal, 
314                    "No SSH public key file?")
315
316
317        if mapdb_file:
318            self.read_mapdb(mapdb_file)
319        else:
320            self.log.warn("[experiment_control] No testbed map, using defaults")
321            self.tbmap = { 
322                    'deter':'https://users.isi.deterlab.net:23235',
323                    'emulab':'https://users.isi.deterlab.net:23236',
324                    'ucb':'https://users.isi.deterlab.net:23237',
325                    }
326
327        if accessdb_file:
328                self.read_accessdb(accessdb_file)
329        else:
330            raise service_error(service_error.internal,
331                    "No accessdb specified in config")
332
333        # Grab saved state.  OK to do this w/o locking because it's read only
334        # and only one thread should be in existence that can see self.state at
335        # this point.
336        if self.state_filename:
337            self.read_state()
338
339        if self.store_filename:
340            self.read_store()
341        else:
342            self.log.warning("No saved synch store")
343            self.synch_store = synch_store
344
345        # Dispatch tables
346        self.soap_services = {\
347                'New': soap_handler('New', self.new_experiment),
348                'Create': soap_handler('Create', self.create_experiment),
349                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
350                'Vis': soap_handler('Vis', self.get_vis),
351                'Info': soap_handler('Info', self.get_info),
352                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
353                'Terminate': soap_handler('Terminate', 
354                    self.terminate_experiment),
355                'GetValue': soap_handler('GetValue', self.GetValue),
356                'SetValue': soap_handler('SetValue', self.SetValue),
357        }
358
359        self.xmlrpc_services = {\
360                'New': xmlrpc_handler('New', self.new_experiment),
361                'Create': xmlrpc_handler('Create', self.create_experiment),
362                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
363                'Vis': xmlrpc_handler('Vis', self.get_vis),
364                'Info': xmlrpc_handler('Info', self.get_info),
365                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
366                'Terminate': xmlrpc_handler('Terminate',
367                    self.terminate_experiment),
368                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
369                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
370        }
371
372    # Call while holding self.state_lock
373    def write_state(self):
374        """
375        Write a new copy of experiment state after copying the existing state
376        to a backup.
377
378        State format is a simple pickling of the state dictionary.
379        """
380        if os.access(self.state_filename, os.W_OK):
381            copy_file(self.state_filename, \
382                    "%s.bak" % self.state_filename)
383        try:
384            f = open(self.state_filename, 'w')
385            pickle.dump(self.state, f)
386        except IOError, e:
387            self.log.error("Can't write file %s: %s" % \
388                    (self.state_filename, e))
389        except pickle.PicklingError, e:
390            self.log.error("Pickling problem: %s" % e)
391        except TypeError, e:
392            self.log.error("Pickling problem (TypeError): %s" % e)
393
394    @staticmethod
395    def get_alloc_ids(state):
396        """
397        Pull the fedids of the identifiers of each allocation from the
398        state.  Again, a dict dive that's best isolated.
399
400        Used by read_store and read state
401        """
402
403        return [ f['allocID']['fedid'] 
404                for f in state.get('federant',[]) \
405                    if f.has_key('allocID') and \
406                        f['allocID'].has_key('fedid')]
407
408    # Call while holding self.state_lock
409    def read_state(self):
410        """
411        Read a new copy of experiment state.  Old state is overwritten.
412
413        State format is a simple pickling of the state dictionary.
414        """
415       
416        def get_experiment_id(state):
417            """
418            Pull the fedid experimentID out of the saved state.  This is kind
419            of a gross walk through the dict.
420            """
421
422            if state.has_key('experimentID'):
423                for e in state['experimentID']:
424                    if e.has_key('fedid'):
425                        return e['fedid']
426                else:
427                    return None
428            else:
429                return None
430
431        try:
432            f = open(self.state_filename, "r")
433            self.state = pickle.load(f)
434            self.log.debug("[read_state]: Read state from %s" % \
435                    self.state_filename)
436        except IOError, e:
437            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
438                    % (self.state_filename, e))
439        except pickle.UnpicklingError, e:
440            self.log.warning(("[read_state]: No saved state: " + \
441                    "Unpickling failed: %s") % e)
442       
443        for s in self.state.values():
444            try:
445
446                eid = get_experiment_id(s)
447                if eid : 
448                    # Give the owner rights to the experiment
449                    self.auth.set_attribute(s['owner'], eid)
450                    # And holders of the eid as well
451                    self.auth.set_attribute(eid, eid)
452                    # allow overrides to control experiments as well
453                    for o in self.overrides:
454                        self.auth.set_attribute(o, eid)
455                    # Set permissions to allow reading of the software repo, if
456                    # any, as well.
457                    for a in self.get_alloc_ids(s):
458                        self.auth.set_attribute(a, 'repo/%s' % eid)
459                else:
460                    raise KeyError("No experiment id")
461            except KeyError, e:
462                self.log.warning("[read_state]: State ownership or identity " +\
463                        "misformatted in %s: %s" % (self.state_filename, e))
464
465
466    def read_accessdb(self, accessdb_file):
467        """
468        Read the mapping from fedids that can create experiments to their name
469        in the 3-level access namespace.  All will be asserted from this
470        testbed and can include the local username and porject that will be
471        asserted on their behalf by this fedd.  Each fedid is also added to the
472        authorization system with the "create" attribute.
473        """
474        self.accessdb = {}
475        # These are the regexps for parsing the db
476        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
477        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
478                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
479        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
480                "\s*->\s*(" + name_expr + ")\s*$")
481        lineno = 0
482
483        # Parse the mappings and store in self.authdb, a dict of
484        # fedid -> (proj, user)
485        try:
486            f = open(accessdb_file, "r")
487            for line in f:
488                lineno += 1
489                line = line.strip()
490                if len(line) == 0 or line.startswith('#'):
491                    continue
492                m = project_line.match(line)
493                if m:
494                    fid = fedid(hexstr=m.group(1))
495                    project, user = m.group(2,3)
496                    if not self.accessdb.has_key(fid):
497                        self.accessdb[fid] = []
498                    self.accessdb[fid].append((project, user))
499                    continue
500
501                m = user_line.match(line)
502                if m:
503                    fid = fedid(hexstr=m.group(1))
504                    project = None
505                    user = m.group(2)
506                    if not self.accessdb.has_key(fid):
507                        self.accessdb[fid] = []
508                    self.accessdb[fid].append((project, user))
509                    continue
510                self.log.warn("[experiment_control] Error parsing access " +\
511                        "db %s at line %d" %  (accessdb_file, lineno))
512        except IOError:
513            raise service_error(service_error.internal,
514                    "Error opening/reading %s as experiment " +\
515                            "control accessdb" %  accessdb_file)
516        f.close()
517
518        # Initialize the authorization attributes
519        for fid in self.accessdb.keys():
520            self.auth.set_attribute(fid, 'create')
521            self.auth.set_attribute(fid, 'new')
522
523    def read_mapdb(self, file):
524        """
525        Read a simple colon separated list of mappings for the
526        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
527        """
528
529        self.tbmap = { }
530        lineno =0
531        try:
532            f = open(file, "r")
533            for line in f:
534                lineno += 1
535                line = line.strip()
536                if line.startswith('#') or len(line) == 0:
537                    continue
538                try:
539                    label, url = line.split(':', 1)
540                    self.tbmap[label] = url
541                except ValueError, e:
542                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
543                            "map db: %s %s" % (lineno, line, e))
544        except IOError, e:
545            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
546                    "open %s: %s" % (file, e))
547        f.close()
548
549    def read_store(self):
550        try:
551            self.synch_store = synch_store()
552            self.synch_store.load(self.store_filename)
553            self.log.debug("[read_store]: Read store from %s" % \
554                    self.store_filename)
555        except IOError, e:
556            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
557                    % (self.state_filename, e))
558            self.synch_store = synch_store()
559
560        # Set the initial permissions on data in the store.  XXX: This ad hoc
561        # authorization attribute initialization is getting out of hand.
562        for k in self.synch_store.all_keys():
563            try:
564                if k.startswith('fedid:'):
565                    fid = fedid(hexstr=k[6:46])
566                    if self.state.has_key(fid):
567                        for a in self.get_alloc_ids(self.state[fid]):
568                            self.auth.set_attribute(a, k)
569            except ValueError, e:
570                self.log.warn("Cannot deduce permissions for %s" % k)
571
572
573    def write_store(self):
574        """
575        Write a new copy of synch_store after writing current state
576        to a backup.  We use the internal synch_store pickle method to avoid
577        incinsistent data.
578
579        State format is a simple pickling of the store.
580        """
581        if os.access(self.store_filename, os.W_OK):
582            copy_file(self.store_filename, \
583                    "%s.bak" % self.store_filename)
584        try:
585            self.synch_store.save(self.store_filename)
586        except IOError, e:
587            self.log.error("Can't write file %s: %s" % \
588                    (self.store_filename, e))
589        except TypeError, e:
590            self.log.error("Pickling problem (TypeError): %s" % e)
591
592       
593    def generate_ssh_keys(self, dest, type="rsa" ):
594        """
595        Generate a set of keys for the gateways to use to talk.
596
597        Keys are of type type and are stored in the required dest file.
598        """
599        valid_types = ("rsa", "dsa")
600        t = type.lower();
601        if t not in valid_types: raise ValueError
602        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
603
604        try:
605            trace = open("/dev/null", "w")
606        except IOError:
607            raise service_error(service_error.internal,
608                    "Cannot open /dev/null??");
609
610        # May raise CalledProcessError
611        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
612        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
613        if rv != 0:
614            raise service_error(service_error.internal, 
615                    "Cannot generate nonce ssh keys.  %s return code %d" \
616                            % (self.ssh_keygen, rv))
617
618    def gentopo(self, str):
619        """
620        Generate the topology dtat structure from the splitter's XML
621        representation of it.
622
623        The topology XML looks like:
624            <experiment>
625                <nodes>
626                    <node><vname></vname><ips>ip1:ip2</ips></node>
627                </nodes>
628                <lans>
629                    <lan>
630                        <vname></vname><vnode></vnode><ip></ip>
631                        <bandwidth></bandwidth><member>node:port</member>
632                    </lan>
633                </lans>
634        """
635        class topo_parse:
636            """
637            Parse the topology XML and create the dats structure.
638            """
639            def __init__(self):
640                # Typing of the subelements for data conversion
641                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
642                self.int_subelements = ( 'bandwidth',)
643                self.float_subelements = ( 'delay',)
644                # The final data structure
645                self.nodes = [ ]
646                self.lans =  [ ]
647                self.topo = { \
648                        'node': self.nodes,\
649                        'lan' : self.lans,\
650                    }
651                self.element = { }  # Current element being created
652                self.chars = ""     # Last text seen
653
654            def end_element(self, name):
655                # After each sub element the contents is added to the current
656                # element or to the appropriate list.
657                if name == 'node':
658                    self.nodes.append(self.element)
659                    self.element = { }
660                elif name == 'lan':
661                    self.lans.append(self.element)
662                    self.element = { }
663                elif name in self.str_subelements:
664                    self.element[name] = self.chars
665                    self.chars = ""
666                elif name in self.int_subelements:
667                    self.element[name] = int(self.chars)
668                    self.chars = ""
669                elif name in self.float_subelements:
670                    self.element[name] = float(self.chars)
671                    self.chars = ""
672
673            def found_chars(self, data):
674                self.chars += data.rstrip()
675
676
677        tp = topo_parse();
678        parser = xml.parsers.expat.ParserCreate()
679        parser.EndElementHandler = tp.end_element
680        parser.CharacterDataHandler = tp.found_chars
681
682        parser.Parse(str)
683
684        return tp.topo
685       
686
687    def genviz(self, topo):
688        """
689        Generate the visualization the virtual topology
690        """
691
692        neato = "/usr/local/bin/neato"
693        # These are used to parse neato output and to create the visualization
694        # file.
695        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
696        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
697                "%s</type></node>"
698
699        try:
700            # Node names
701            nodes = [ n['vname'] for n in topo['node'] ]
702            topo_lans = topo['lan']
703        except KeyError, e:
704            raise service_error(service_error.internal, "Bad topology: %s" %e)
705
706        lans = { }
707        links = { }
708
709        # Walk through the virtual topology, organizing the connections into
710        # 2-node connections (links) and more-than-2-node connections (lans).
711        # When a lan is created, it's added to the list of nodes (there's a
712        # node in the visualization for the lan).
713        for l in topo_lans:
714            if links.has_key(l['vname']):
715                if len(links[l['vname']]) < 2:
716                    links[l['vname']].append(l['vnode'])
717                else:
718                    nodes.append(l['vname'])
719                    lans[l['vname']] = links[l['vname']]
720                    del links[l['vname']]
721                    lans[l['vname']].append(l['vnode'])
722            elif lans.has_key(l['vname']):
723                lans[l['vname']].append(l['vnode'])
724            else:
725                links[l['vname']] = [ l['vnode'] ]
726
727
728        # Open up a temporary file for dot to turn into a visualization
729        try:
730            df, dotname = tempfile.mkstemp()
731            dotfile = os.fdopen(df, 'w')
732        except IOError:
733            raise service_error(service_error.internal,
734                    "Failed to open file in genviz")
735
736        try:
737            dnull = open('/dev/null', 'w')
738        except IOError:
739            service_error(service_error.internal,
740                    "Failed to open /dev/null in genviz")
741
742        # Generate a dot/neato input file from the links, nodes and lans
743        try:
744            print >>dotfile, "graph G {"
745            for n in nodes:
746                print >>dotfile, '\t"%s"' % n
747            for l in links.keys():
748                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
749            for l in lans.keys():
750                for n in lans[l]:
751                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
752            print >>dotfile, "}"
753            dotfile.close()
754        except TypeError:
755            raise service_error(service_error.internal,
756                    "Single endpoint link in vtopo")
757        except IOError:
758            raise service_error(service_error.internal, "Cannot write dot file")
759
760        # Use dot to create a visualization
761        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
762                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
763                close_fds=True)
764        dnull.close()
765
766        # Translate dot to vis format
767        vis_nodes = [ ]
768        vis = { 'node': vis_nodes }
769        for line in dot.stdout:
770            m = vis_re.match(line)
771            if m:
772                vn = m.group(1)
773                vis_node = {'name': vn, \
774                        'x': float(m.group(2)),\
775                        'y' : float(m.group(3)),\
776                    }
777                if vn in links.keys() or vn in lans.keys():
778                    vis_node['type'] = 'lan'
779                else:
780                    vis_node['type'] = 'node'
781                vis_nodes.append(vis_node)
782        rv = dot.wait()
783
784        os.remove(dotname)
785        if rv == 0 : return vis
786        else: return None
787
788    def get_access(self, tb, nodes, tbparam, master, export_project,
789            access_user, services):
790        """
791        Get access to testbed through fedd and set the parameters for that tb
792        """
793        uri = self.tbmap.get(tb, None)
794        if not uri:
795            raise service_error(service_error.server_config, 
796                    "Unknown testbed: %s" % tb)
797
798        # Tweak search order so that if there are entries in access_user that
799        # have a project matching the export project, we try them first
800        if export_project: 
801            access_sequence = [ (p, u) for p, u in access_user \
802                    if p == export_project] 
803            access_sequence.extend([(p, u) for p, u in access_user \
804                    if p != export_project]) 
805        else: 
806            access_sequence = access_user
807
808        for p, u in access_sequence: 
809            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
810                    "to %s") %  ((p or "None"), u, uri))
811
812            if p:
813                # Request with user and project specified
814                req = {\
815                        'destinationTestbed' : { 'uri' : uri },
816                        'credential': [ "project: %s" % p, "user: %s"  % u],
817                        'allocID' : { 'localname': 'test' },
818                    }
819            else:
820                # Request with only user specified
821                req = {\
822                        'destinationTestbed' : { 'uri' : uri },
823                        'credential': [ 'user: %s' % u ],
824                        'user':  [ {'userID': { 'localname': u } } ],
825                        'allocID' : { 'localname': 'test' },
826                    }
827
828            # If there is a master, and this is it, ask it to export services
829            # XXX move this to export pseudo-service
830            if tb == master:
831                req['service'] = [
832                        { 'name': 'userconfig', 'visibility': 'export'},
833                        { 'name': 'SMB', 'visibility': 'export'},
834                        { 'name': 'seer', 'visibility': 'export'},
835                        { 'name': 'tmcd', 'visibility': 'export'},
836                    ]
837
838            # node resources if any
839            if nodes != None and len(nodes) > 0:
840                rnodes = [ ]
841                for n in nodes:
842                    rn = { }
843                    image, hw, count = n.split(":")
844                    if image: rn['image'] = [ image ]
845                    if hw: rn['hardware'] = [ hw ]
846                    if count and int(count) >0 : rn['count'] = int(count)
847                    rnodes.append(rn)
848                req['resources']= { }
849                req['resources']['node'] = rnodes
850
851            try:
852                if self.local_access.has_key(uri):
853                    # Local access call
854                    req = { 'RequestAccessRequestBody' : req }
855                    r = self.local_access[uri].RequestAccess(req, 
856                            fedid(file=self.cert_file))
857                    r = { 'RequestAccessResponseBody' : r }
858                else:
859                    r = self.call_RequestAccess(uri, req, 
860                            self.cert_file, self.cert_pwd, self.trusted_certs)
861            except service_error, e:
862                if e.code == service_error.access:
863                    self.log.debug("[get_access] Access denied")
864                    r = None
865                    continue
866                else:
867                    raise e
868
869            if r.has_key('RequestAccessResponseBody'):
870                # Through to here we have a valid response, not a fault.
871                # Access denied is a fault, so something better or worse than
872                # access denied has happened.
873                r = r['RequestAccessResponseBody']
874                self.log.debug("[get_access] Access granted")
875                break
876            else:
877                raise service_error(service_error.protocol,
878                        "Bad proxy response")
879       
880        if not r:
881            raise service_error(service_error.access, 
882                    "Access denied by %s (%s)" % (tb, uri))
883
884        tbparam[tb] = { 
885                "allocID" : r['allocID'],
886                "uri": uri,
887                }
888        if 'service' in r:
889            for s in r['service']:
890                # Tag each service with the origin testbed
891                if s.has_key('fedAttr'):
892                    # The else connects with the for
893                    for a in s['fedAttr']:
894                        if a.get('attribute', None) == 'testbed'):
895                            break
896                    else:
897                        s['fedAttr'].append({'attribute': 'testbed',
898                            'value': tb})
899                else:
900                    s['fedAttr'] = [ {'attribute': 'testbed', 'value': tb} ]
901            services.extend(r['service'])
902
903        # Add attributes to parameter space.  We don't allow attributes to
904        # overlay any parameters already installed.
905        for a in r.get('fedAttr', []):
906            try:
907                if a['attribute'] and \
908                        isinstance(a['attribute'], basestring)\
909                        and not tbparam[tb].has_key(a['attribute'].lower()):
910                    tbparam[tb][a['attribute'].lower()] = a['value']
911            except KeyError:
912                self.log.error("Bad attribute in response: %s" % a)
913
914    def release_access(self, tb, aid, uri=None):
915        """
916        Release access to testbed through fedd
917        """
918
919        if not uri:
920            uri = self.tbmap.get(tb, None)
921        if not uri:
922            raise service_error(service_error.server_config, 
923                    "Unknown testbed: %s" % tb)
924
925        if self.local_access.has_key(uri):
926            resp = self.local_access[uri].ReleaseAccess(\
927                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
928                    fedid(file=self.cert_file))
929            resp = { 'ReleaseAccessResponseBody': resp } 
930        else:
931            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
932                    self.cert_file, self.cert_pwd, self.trusted_certs)
933
934        # better error coding
935
936    def remote_ns2topdl(self, uri, desc):
937
938        req = {
939                'description' : { 'ns2description': desc },
940            }
941
942        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
943                self.trusted_certs)
944
945        if r.has_key('Ns2TopdlResponseBody'):
946            r = r['Ns2TopdlResponseBody']
947            ed = r.get('experimentdescription', None)
948            if ed.has_key('topdldescription'):
949                return topdl.Topology(**ed['topdldescription'])
950            else:
951                raise service_error(service_error.protocol, 
952                        "Bad splitter response (no output)")
953        else:
954            raise service_error(service_error.protocol, "Bad splitter response")
955
956    class start_segment:
957        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
958                cert_pwd=None, trusted_certs=None, caller=None,
959                log_collector=None):
960            self.log = log
961            self.debug = debug
962            self.cert_file = cert_file
963            self.cert_pwd = cert_pwd
964            self.trusted_certs = None
965            self.caller = caller
966            self.testbed = testbed
967            self.log_collector = log_collector
968            self.response = None
969
970        def __call__(self, uri, aid, topo, master, attrs=None, connInfo=None,
971                services=None):
972            req = {
973                    'allocID': { 'fedid' : aid }, 
974                    'segmentdescription': { 
975                        'topdldescription': topo.to_dict(),
976                    },
977                    'master': master,
978                }
979
980            if connInfo:
981                req['connection'] = connInfo
982            # Add services to request.  The master exports, everyone else
983            # imports.
984            if services:
985                svcs = [ x.copy() for x in services]
986                for s in svcs:
987                    if master: s['visibility'] = 'export'
988                    else: s['visibility'] = 'import'
989                req['service'] = svcs
990            if attrs:
991                req['fedAttr'] = attrs
992
993            try:
994                self.log.debug("Calling StartSegment at %s " % uri)
995                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
996                        self.trusted_certs)
997                if r.has_key('StartSegmentResponseBody'):
998                    lval = r['StartSegmentResponseBody'].get('allocationLog',
999                            None)
1000                    if lval and self.log_collector:
1001                        for line in  lval.splitlines(True):
1002                            self.log_collector.write(line)
1003                    self.response = r
1004                else:
1005                    raise service_error(service_error.internal, 
1006                            "Bad response!?: %s" %r)
1007                return True
1008            except service_error, e:
1009                self.log.error("Start segment failed on %s: %s" % \
1010                        (self.testbed, e))
1011                return False
1012
1013
1014
1015    class terminate_segment:
1016        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1017                cert_pwd=None, trusted_certs=None, caller=None):
1018            self.log = log
1019            self.debug = debug
1020            self.cert_file = cert_file
1021            self.cert_pwd = cert_pwd
1022            self.trusted_certs = None
1023            self.caller = caller
1024            self.testbed = testbed
1025
1026        def __call__(self, uri, aid ):
1027            req = {
1028                    'allocID': aid , 
1029                }
1030            try:
1031                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1032                        self.trusted_certs)
1033                return True
1034            except service_error, e:
1035                self.log.error("Terminate segment failed on %s: %s" % \
1036                        (self.testbed, e))
1037                return False
1038   
1039
1040    def allocate_resources(self, allocated, master, eid, expid, 
1041            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1042            attrs=None, connInfo={}, services=[]):
1043
1044        started = { }           # Testbeds where a sub-experiment started
1045                                # successfully
1046
1047        # XXX
1048        fail_soft = False
1049
1050        log = alloc_log or self.log
1051
1052        thread_pool = self.thread_pool(self.nthreads)
1053        threads = [ ]
1054
1055        for tb in allocated.keys():
1056            # Create and start a thread to start the segment, and save it
1057            # to get the return value later
1058            thread_pool.wait_for_slot()
1059            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1060            if not uri:
1061                raise service_error(service_error.internal, 
1062                        "Unknown testbed %s !?" % tb)
1063
1064            if tbparams[tb].has_key('allocID') and \
1065                    tbparams[tb]['allocID'].has_key('fedid'):
1066                aid = tbparams[tb]['allocID']['fedid']
1067            else:
1068                raise service_error(service_error.internal, 
1069                        "No alloc id for testbed %s !?" % tb)
1070
1071            t  = self.pooled_thread(\
1072                    target=self.start_segment(log=log, debug=self.debug,
1073                        testbed=tb, cert_file=self.cert_file,
1074                        cert_pwd=self.cert_pwd,
1075                        trusted_certs=self.trusted_certs,
1076                        caller=self.call_StartSegment,
1077                        log_collector=log_collector), 
1078                    args=(uri, aid, topo[tb], tb == master, 
1079                        attrs, connInfo[tb], services),
1080                    name=tb,
1081                    pdata=thread_pool, trace_file=self.trace_file)
1082            threads.append(t)
1083            t.start()
1084
1085        # Wait until all finish (keep pinging the log, though)
1086        mins = 0
1087        revoked = False
1088        while not thread_pool.wait_for_all_done(60.0):
1089            mins += 1
1090            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1091                    % mins)
1092            if not revoked and \
1093                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1094                # a testbed has failed.  Revoke this experiment's
1095                # synchronizarion values so that sub experiments will not
1096                # deadlock waiting for synchronization that will never happen
1097                self.log.info("A subexperiment has failed to swap in, " + \
1098                        "revoking synch keys")
1099                var_key = "fedid:%s" % expid
1100                for k in self.synch_store.all_keys():
1101                    if len(k) > 45 and k[0:46] == var_key:
1102                        self.synch_store.revoke_key(k)
1103                revoked = True
1104
1105        failed = [ t.getName() for t in threads if not t.rv ]
1106        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1107
1108        # If one failed clean up, unless fail_soft is set
1109        if failed:
1110            if not fail_soft:
1111                thread_pool.clear()
1112                for tb in succeeded:
1113                    # Create and start a thread to stop the segment
1114                    thread_pool.wait_for_slot()
1115                    uri = tbparams[tb]['uri']
1116                    t  = self.pooled_thread(\
1117                            target=self.terminate_segment(log=log,
1118                                testbed=tb,
1119                                cert_file=self.cert_file, 
1120                                cert_pwd=self.cert_pwd,
1121                                trusted_certs=self.trusted_certs,
1122                                caller=self.call_TerminateSegment),
1123                            args=(uri, tbparams[tb]['federant']['allocID']),
1124                            name=tb,
1125                            pdata=thread_pool, trace_file=self.trace_file)
1126                    t.start()
1127                # Wait until all finish (if any are being stopped)
1128                if succeeded:
1129                    thread_pool.wait_for_all_done()
1130
1131                # release the allocations
1132                for tb in tbparams.keys():
1133                    self.release_access(tb, tbparams[tb]['allocID'],
1134                            tbparams[tb].get('uri', None))
1135                # Remove the placeholder
1136                self.state_lock.acquire()
1137                self.state[eid]['experimentStatus'] = 'failed'
1138                if self.state_filename: self.write_state()
1139                self.state_lock.release()
1140
1141                log.error("Swap in failed on %s" % ",".join(failed))
1142                return
1143        else:
1144            log.info("[start_segment]: Experiment %s active" % eid)
1145
1146
1147        # Walk up tmpdir, deleting as we go
1148        if self.cleanup:
1149            log.debug("[start_experiment]: removing %s" % tmpdir)
1150            for path, dirs, files in os.walk(tmpdir, topdown=False):
1151                for f in files:
1152                    os.remove(os.path.join(path, f))
1153                for d in dirs:
1154                    os.rmdir(os.path.join(path, d))
1155            os.rmdir(tmpdir)
1156        else:
1157            log.debug("[start_experiment]: not removing %s" % tmpdir)
1158
1159        # Insert the experiment into our state and update the disk copy
1160        self.state_lock.acquire()
1161        self.state[expid]['experimentStatus'] = 'active'
1162        self.state[eid] = self.state[expid]
1163        if self.state_filename: self.write_state()
1164        self.state_lock.release()
1165        return
1166
1167
1168    def add_kit(self, e, kit):
1169        """
1170        Add a Software object created from the list of (install, location)
1171        tuples passed as kit  to the software attribute of an object e.  We
1172        do this enough to break out the code, but it's kind of a hack to
1173        avoid changing the old tuple rep.
1174        """
1175
1176        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1177
1178        if isinstance(e.software, list): e.software.extend(s)
1179        else: e.software = s
1180
1181
1182    def create_experiment_state(self, fid, req, expid, expcert, 
1183            state='starting'):
1184        """
1185        Create the initial entry in the experiment's state.  The expid and
1186        expcert are the experiment's fedid and certifacte that represents that
1187        ID, which are installed in the experiment state.  If the request
1188        includes a suggested local name that is used if possible.  If the local
1189        name is already taken by an experiment owned by this user that has
1190        failed, it is overwritten.  Otherwise new letters are added until a
1191        valid localname is found.  The generated local name is returned.
1192        """
1193
1194        if req.has_key('experimentID') and \
1195                req['experimentID'].has_key('localname'):
1196            overwrite = False
1197            eid = req['experimentID']['localname']
1198            # If there's an old failed experiment here with the same local name
1199            # and accessible by this user, we'll overwrite it, otherwise we'll
1200            # fall through and do the collision avoidance.
1201            old_expid = self.get_experiment_fedid(eid)
1202            if old_expid and self.check_experiment_access(fid, old_expid):
1203                self.state_lock.acquire()
1204                status = self.state[eid].get('experimentStatus', None)
1205                if status and status == 'failed':
1206                    # remove the old access attribute
1207                    self.auth.unset_attribute(fid, old_expid)
1208                    overwrite = True
1209                    del self.state[eid]
1210                    del self.state[old_expid]
1211                self.state_lock.release()
1212            self.state_lock.acquire()
1213            while (self.state.has_key(eid) and not overwrite):
1214                eid += random.choice(string.ascii_letters)
1215            # Initial state
1216            self.state[eid] = {
1217                    'experimentID' : \
1218                            [ { 'localname' : eid }, {'fedid': expid } ],
1219                    'experimentStatus': state,
1220                    'experimentAccess': { 'X509' : expcert },
1221                    'owner': fid,
1222                    'log' : [],
1223                }
1224            self.state[expid] = self.state[eid]
1225            if self.state_filename: self.write_state()
1226            self.state_lock.release()
1227        else:
1228            eid = self.exp_stem
1229            for i in range(0,5):
1230                eid += random.choice(string.ascii_letters)
1231            self.state_lock.acquire()
1232            while (self.state.has_key(eid)):
1233                eid = self.exp_stem
1234                for i in range(0,5):
1235                    eid += random.choice(string.ascii_letters)
1236            # Initial state
1237            self.state[eid] = {
1238                    'experimentID' : \
1239                            [ { 'localname' : eid }, {'fedid': expid } ],
1240                    'experimentStatus': state,
1241                    'experimentAccess': { 'X509' : expcert },
1242                    'owner': fid,
1243                    'log' : [],
1244                }
1245            self.state[expid] = self.state[eid]
1246            if self.state_filename: self.write_state()
1247            self.state_lock.release()
1248
1249        return eid
1250
1251
1252    def allocate_ips_to_topo(self, top):
1253        """
1254        Add an ip4_address attribute to all the hosts in the topology, based on
1255        the shared substrates on which they sit.  An /etc/hosts file is also
1256        created and returned as a list of hostfiles entries.  We also return
1257        the allocator, because we may need to allocate IPs to portals
1258        (specifically DRAGON portals).
1259        """
1260        subs = sorted(top.substrates, 
1261                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1262                reverse=True)
1263        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1264        ifs = { }
1265        hosts = [ ]
1266
1267        for idx, s in enumerate(subs):
1268            net_size = len(s.interfaces)+2
1269
1270            a = ips.allocate(net_size)
1271            if a :
1272                base, num = a
1273                if num < net_size: 
1274                    raise service_error(service_error.internal,
1275                            "Allocator returned wrong number of IPs??")
1276            else:
1277                raise service_error(service_error.req, 
1278                        "Cannot allocate IP addresses")
1279            mask = ips.min_alloc
1280            while mask < net_size:
1281                mask *= 2
1282
1283            netmask = ((2**32-1) ^ (mask-1))
1284
1285            base += 1
1286            for i in s.interfaces:
1287                i.attribute.append(
1288                        topdl.Attribute('ip4_address', 
1289                            "%s" % ip_addr(base)))
1290                i.attribute.append(
1291                        topdl.Attribute('ip4_netmask', 
1292                            "%s" % ip_addr(int(netmask))))
1293
1294                hname = i.element.name[0]
1295                if ifs.has_key(hname):
1296                    hosts.append("%s\t%s-%s %s-%d" % \
1297                            (ip_addr(base), hname, s.name, hname,
1298                                ifs[hname]))
1299                else:
1300                    ifs[hname] = 0
1301                    hosts.append("%s\t%s-%s %s-%d %s" % \
1302                            (ip_addr(base), hname, s.name, hname,
1303                                ifs[hname], hname))
1304
1305                ifs[hname] += 1
1306                base += 1
1307        return hosts, ips
1308
1309    def get_access_to_testbeds(self, testbeds, access_user, 
1310            export_project, master, allocated, tbparams, services):
1311        """
1312        Request access to the various testbeds required for this instantiation
1313        (passed in as testbeds).  User, access_user, expoert_project and master
1314        are used to construct the correct requests.  Per-testbed parameters are
1315        returned in tbparams.
1316        """
1317        for tb in testbeds:
1318            self.get_access(tb, None, tbparams, master,
1319                    export_project, access_user, services)
1320            allocated[tb] = 1
1321
1322    def split_topology(self, top, topo, testbeds):
1323        """
1324        Create the sub-topologies that are needed for experiment instantiation.
1325        """
1326        for tb in testbeds:
1327            topo[tb] = top.clone()
1328            # copy in for loop allows deletions from the original
1329            for e in [ e for e in topo[tb].elements]:
1330                etb = e.get_attribute('testbed')
1331                # NB: elements without a testbed attribute won't appear in any
1332                # sub topologies. 
1333                if not etb or etb != tb:
1334                    for i in e.interface:
1335                        for s in i.subs:
1336                            try:
1337                                s.interfaces.remove(i)
1338                            except ValueError:
1339                                raise service_error(service_error.internal,
1340                                        "Can't remove interface??")
1341                    topo[tb].elements.remove(e)
1342            topo[tb].make_indices()
1343
1344    def wrangle_software(self, expid, top, topo, tbparams):
1345        """
1346        Copy software out to the repository directory, allocate permissions and
1347        rewrite the segment topologies to look for the software in local
1348        places.
1349        """
1350
1351        # Copy the rpms and tarfiles to a distribution directory from
1352        # which the federants can retrieve them
1353        linkpath = "%s/software" %  expid
1354        softdir ="%s/%s" % ( self.repodir, linkpath)
1355        softmap = { }
1356        # These are in a list of tuples format (each kit).  This comprehension
1357        # unwraps them into a single list of tuples that initilaizes the set of
1358        # tuples.
1359        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1360                for p, t in l ])
1361        pkgs.update([x.location for e in top.elements \
1362                for x in e.software])
1363        try:
1364            os.makedirs(softdir)
1365        except IOError, e:
1366            raise service_error(
1367                    "Cannot create software directory: %s" % e)
1368        # The actual copying.  Everything's converted into a url for copying.
1369        for pkg in pkgs:
1370            loc = pkg
1371
1372            scheme, host, path = urlparse(loc)[0:3]
1373            dest = os.path.basename(path)
1374            if not scheme:
1375                if not loc.startswith('/'):
1376                    loc = "/%s" % loc
1377                loc = "file://%s" %loc
1378            try:
1379                u = urlopen(loc)
1380            except Exception, e:
1381                raise service_error(service_error.req, 
1382                        "Cannot open %s: %s" % (loc, e))
1383            try:
1384                f = open("%s/%s" % (softdir, dest) , "w")
1385                self.log.debug("Writing %s/%s" % (softdir,dest) )
1386                data = u.read(4096)
1387                while data:
1388                    f.write(data)
1389                    data = u.read(4096)
1390                f.close()
1391                u.close()
1392            except Exception, e:
1393                raise service_error(service_error.internal,
1394                        "Could not copy %s: %s" % (loc, e))
1395            path = re.sub("/tmp", "", linkpath)
1396            # XXX
1397            softmap[pkg] = \
1398                    "%s/%s/%s" %\
1399                    ( self.repo_url, path, dest)
1400
1401            # Allow the individual segments to access the software.
1402            for tb in tbparams.keys():
1403                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1404                        "/%s/%s" % ( path, dest))
1405
1406        # Convert the software locations in the segments into the local
1407        # copies on this host
1408        for soft in [ s for tb in topo.values() \
1409                for e in tb.elements \
1410                    if getattr(e, 'software', False) \
1411                        for s in e.software ]:
1412            if softmap.has_key(soft.location):
1413                soft.location = softmap[soft.location]
1414
1415
1416    def new_experiment(self, req, fid):
1417        """
1418        The external interface to empty initial experiment creation called from
1419        the dispatcher.
1420
1421        Creates a working directory, splits the incoming description using the
1422        splitter script and parses out the avrious subsections using the
1423        lcasses above.  Once each sub-experiment is created, use pooled threads
1424        to instantiate them and start it all up.
1425        """
1426        if not self.auth.check_attribute(fid, 'new'):
1427            raise service_error(service_error.access, "New access denied")
1428
1429        try:
1430            tmpdir = tempfile.mkdtemp(prefix="split-")
1431        except IOError:
1432            raise service_error(service_error.internal, "Cannot create tmp dir")
1433
1434        try:
1435            access_user = self.accessdb[fid]
1436        except KeyError:
1437            raise service_error(service_error.internal,
1438                    "Access map and authorizer out of sync in " + \
1439                            "new_experiment for fedid %s"  % fid)
1440
1441        pid = "dummy"
1442        gid = "dummy"
1443
1444        req = req.get('NewRequestBody', None)
1445        if not req:
1446            raise service_error(service_error.req,
1447                    "Bad request format (no NewRequestBody)")
1448
1449        # Generate an ID for the experiment (slice) and a certificate that the
1450        # allocator can use to prove they own it.  We'll ship it back through
1451        # the encrypted connection.
1452        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1453
1454        #now we're done with the tmpdir, and it should be empty
1455        if self.cleanup:
1456            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1457            os.rmdir(tmpdir)
1458        else:
1459            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1460
1461        eid = self.create_experiment_state(fid, req, expid, expcert, 
1462                state='empty')
1463
1464        # Let users touch the state
1465        self.auth.set_attribute(fid, expid)
1466        self.auth.set_attribute(expid, expid)
1467        # Override fedids can manipulate state as well
1468        for o in self.overrides:
1469            self.auth.set_attribute(o, expid)
1470
1471        rv = {
1472                'experimentID': [
1473                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1474                ],
1475                'experimentStatus': 'empty',
1476                'experimentAccess': { 'X509' : expcert }
1477            }
1478
1479        return rv
1480
1481    def get_master_project(self, req):
1482        master= None
1483        export_project = None
1484        for e in [ s for s in req.get('service', []) \
1485                if s.get('name') == 'project_export']:
1486            for a in e.get('fedAttr', []):
1487                attr = a.get('attribute', None)
1488                if attr == 'testbed':
1489                    master = a.get('value', None)
1490                elif attr == 'project':
1491                    export_project = a.get('value', None)
1492       
1493        return (master, export_project)
1494
1495
1496
1497    def create_experiment(self, req, fid):
1498        """
1499        The external interface to experiment creation called from the
1500        dispatcher.
1501
1502        Creates a working directory, splits the incoming description using the
1503        splitter script and parses out the avrious subsections using the
1504        lcasses above.  Once each sub-experiment is created, use pooled threads
1505        to instantiate them and start it all up.
1506        """
1507
1508        req = req.get('CreateRequestBody', None)
1509        if not req:
1510            raise service_error(service_error.req,
1511                    "Bad request format (no CreateRequestBody)")
1512
1513        # Get the experiment access
1514        exp = req.get('experimentID', None)
1515        if exp:
1516            if exp.has_key('fedid'):
1517                key = exp['fedid']
1518                expid = key
1519                eid = None
1520            elif exp.has_key('localname'):
1521                key = exp['localname']
1522                eid = key
1523                expid = None
1524            else:
1525                raise service_error(service_error.req, "Unknown lookup type")
1526        else:
1527            raise service_error(service_error.req, "No request?")
1528
1529        self.check_experiment_access(fid, key)
1530
1531        try:
1532            tmpdir = tempfile.mkdtemp(prefix="split-")
1533            os.mkdir(tmpdir+"/keys")
1534        except IOError:
1535            raise service_error(service_error.internal, "Cannot create tmp dir")
1536
1537        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1538        gw_secretkey_base = "fed.%s" % self.ssh_type
1539        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1540        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1541        tclfile = tmpdir + "/experiment.tcl"
1542        tbparams = { }
1543        try:
1544            access_user = self.accessdb[fid]
1545        except KeyError:
1546            raise service_error(service_error.internal,
1547                    "Access map and authorizer out of sync in " + \
1548                            "create_experiment for fedid %s"  % fid)
1549
1550        pid = "dummy"
1551        gid = "dummy"
1552
1553        # The tcl parser needs to read a file so put the content into that file
1554        descr=req.get('experimentdescription', None)
1555        if descr:
1556            file_content=descr.get('ns2description', None)
1557            if file_content:
1558                try:
1559                    f = open(tclfile, 'w')
1560                    f.write(file_content)
1561                    f.close()
1562                except IOError:
1563                    raise service_error(service_error.internal,
1564                            "Cannot write temp experiment description")
1565            else:
1566                raise service_error(service_error.req, 
1567                        "Only ns2descriptions supported")
1568        else:
1569            raise service_error(service_error.req, "No experiment description")
1570
1571        self.state_lock.acquire()
1572        if self.state.has_key(key):
1573            self.state[key]['experimentStatus'] = "starting"
1574            for e in self.state[key].get('experimentID',[]):
1575                if not expid and e.has_key('fedid'):
1576                    expid = e['fedid']
1577                elif not eid and e.has_key('localname'):
1578                    eid = e['localname']
1579        self.state_lock.release()
1580
1581        if not (eid and expid):
1582            raise service_error(service_error.internal, 
1583                    "Cannot find local experiment info!?")
1584
1585        try: 
1586            # This catches exceptions to clear the placeholder if necessary
1587            try:
1588                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1589            except ValueError:
1590                raise service_error(service_error.server_config, 
1591                        "Bad key type (%s)" % self.ssh_type)
1592            master, export_project = self.get_master_project(req)
1593            # XXX get these out when master and project are optional
1594            if not master:
1595                raise service_error(service_error.req,
1596                        "No master testbed label")
1597            if not export_project:
1598                raise service_error(service_error.req, "No export project")
1599            # XXX
1600
1601            # Translate to topdl
1602            if self.splitter_url:
1603                # XXX: need remote topdl translator
1604                self.log.debug("Calling remote splitter at %s" % \
1605                        self.splitter_url)
1606                top = self.remote_ns2topdl(self.splitter_url, file_content)
1607            else:
1608                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1609                    str(self.muxmax), '-m', master]
1610
1611                tclcmd.extend([pid, gid, eid, tclfile])
1612
1613                self.log.debug("running local splitter %s", " ".join(tclcmd))
1614                # This is just fantastic.  As a side effect the parser copies
1615                # tb_compat.tcl into the current directory, so that directory
1616                # must be writable by the fedd user.  Doing this in the
1617                # temporary subdir ensures this is the case.
1618                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1619                        cwd=tmpdir)
1620                split_data = tclparser.stdout
1621
1622                top = topdl.topology_from_xml(file=split_data, top="experiment")
1623
1624            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1625             # Find the testbeds to look up
1626            testbeds = set([ a.value for e in top.elements \
1627                    for a in e.attribute \
1628                        if a.attribute == 'testbed'])
1629
1630            allocated = { }         # Testbeds we can access
1631            topo ={ }               # Sub topologies
1632            connInfo = { }          # Connection information
1633            services = [ ]
1634            masters = { }           # testbeds exporting services
1635            self.get_access_to_testbeds(testbeds, access_user, 
1636                    export_project, master, allocated, tbparams, services)
1637
1638            # After this masters will hold a set of services exported by each
1639            # testbed
1640            for s in services:
1641                i = s.get('visibility', 'import')
1642                if i == 'export':
1643                    for a in s.get('fedAttr', []):
1644                        if a.get('attribute', '') == 'testbed':
1645                            tb = a.get('value', None)
1646                            if tb:
1647                                if masters.has_key(tb): masters[tb].add(s.name)
1648                                else: masters[tb] = set([s.name])
1649                            else:
1650                                log.error('Testbed attribute with no value?')
1651
1652
1653            self.split_topology(top, topo, testbeds)
1654
1655            # Copy configuration files into the remote file store
1656            # The config urlpath
1657            configpath = "/%s/config" % expid
1658            # The config file system location
1659            configdir ="%s%s" % ( self.repodir, configpath)
1660            try:
1661                os.makedirs(configdir)
1662            except IOError, e:
1663                raise service_error(
1664                        "Cannot create config directory: %s" % e)
1665            try:
1666                f = open("%s/hosts" % configdir, "w")
1667                f.write('\n'.join(hosts))
1668                f.close()
1669            except IOError, e:
1670                raise service_error(service_error.internal, 
1671                        "Cannot write hosts file: %s" % e)
1672            try:
1673                copy_file("%s" % gw_pubkey, "%s/%s" % \
1674                        (configdir, gw_pubkey_base))
1675                copy_file("%s" % gw_secretkey, "%s/%s" % \
1676                        (configdir, gw_secretkey_base))
1677            except IOError, e:
1678                raise service_error(service_error.internal, 
1679                        "Cannot copy keyfiles: %s" % e)
1680
1681            # Allow the individual testbeds to access the configuration files.
1682            for tb in tbparams.keys():
1683                asignee = tbparams[tb]['allocID']['fedid']
1684                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1685                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1686
1687            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1688                    self.muxmax)
1689            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1690                    connInfo, expid)
1691            # Now get access to the dynamic testbeds
1692            for k, t in topo.items():
1693                if not t.get_attribute('dynamic'):
1694                    continue
1695                tb = t.get_attribute('testbed')
1696                if tb: 
1697                    self.get_access(tb, None, tbparams, master, 
1698                            export_project, access_user, services)
1699                    tbparams[k] = tbparams[tb]
1700                    del tbparams[tb]
1701                    allocated[k] = 1
1702                    store_keys = t.get_attribute('store_keys')
1703                    # Give the testbed access to keys it exports or imports
1704                    if store_keys:
1705                        for sk in store_keys.split(" "):
1706                            self.auth.set_attribute(\
1707                                    tbparams[k]['allocID']['fedid'], sk)
1708                else:
1709                    raise service_error(service_error.internal, 
1710                            "Dynamic allocation from no testbed!?")
1711
1712            self.wrangle_software(expid, top, topo, tbparams)
1713
1714            vtopo = topdl.topology_to_vtopo(top)
1715            vis = self.genviz(vtopo)
1716
1717            # save federant information
1718            for k in allocated.keys():
1719                tbparams[k]['federant'] = {
1720                        'name': [ { 'localname' : eid} ],
1721                        'allocID' : tbparams[k]['allocID'],
1722                        'master' : k == master,
1723                        'uri': tbparams[k]['uri'],
1724                    }
1725                if tbparams[k].has_key('emulab'):
1726                        tbparams[k]['federant']['emulab'] = \
1727                                tbparams[k]['emulab']
1728
1729            self.state_lock.acquire()
1730            self.state[eid]['vtopo'] = vtopo
1731            self.state[eid]['vis'] = vis
1732            self.state[expid]['federant'] = \
1733                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1734                        if tbparams[tb].has_key('federant') ]
1735            if self.state_filename: 
1736                self.write_state()
1737            self.state_lock.release()
1738        except service_error, e:
1739            # If something goes wrong in the parse (usually an access error)
1740            # clear the placeholder state.  From here on out the code delays
1741            # exceptions.  Failing at this point returns a fault to the remote
1742            # caller.
1743
1744            self.state_lock.acquire()
1745            del self.state[eid]
1746            del self.state[expid]
1747            if self.state_filename: self.write_state()
1748            self.state_lock.release()
1749            raise e
1750
1751
1752        # Start the background swapper and return the starting state.  From
1753        # here on out, the state will stick around a while.
1754
1755        # Let users touch the state
1756        self.auth.set_attribute(fid, expid)
1757        self.auth.set_attribute(expid, expid)
1758        # Override fedids can manipulate state as well
1759        for o in self.overrides:
1760            self.auth.set_attribute(o, expid)
1761
1762        # Create a logger that logs to the experiment's state object as well as
1763        # to the main log file.
1764        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1765        alloc_collector = self.list_log(self.state[eid]['log'])
1766        h = logging.StreamHandler(alloc_collector)
1767        # XXX: there should be a global one of these rather than repeating the
1768        # code.
1769        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1770                    '%d %b %y %H:%M:%S'))
1771        alloc_log.addHandler(h)
1772       
1773        attrs = [ 
1774                {
1775                    'attribute': 'ssh_pubkey', 
1776                    'value': '%s/%s/config/%s' % \
1777                            (self.repo_url, expid, gw_pubkey_base)
1778                },
1779                {
1780                    'attribute': 'ssh_secretkey', 
1781                    'value': '%s/%s/config/%s' % \
1782                            (self.repo_url, expid, gw_secretkey_base)
1783                },
1784                {
1785                    'attribute': 'hosts', 
1786                    'value': '%s/%s/config/hosts' % \
1787                            (self.repo_url, expid)
1788                },
1789                {
1790                    'attribute': 'experiment_name',
1791                    'value': eid,
1792                },
1793            ]
1794
1795        # transit and disconnected testbeds may not have a connInfo entry.
1796        # Fill in the blanks.
1797        for t in allocated.keys():
1798            if not connInfo.has_key(t):
1799                connInfo[t] = { }
1800
1801        # Start a thread to do the resource allocation
1802        t  = Thread(target=self.allocate_resources,
1803                args=(allocated, master, eid, expid, tbparams, 
1804                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo,
1805                    services),
1806                name=eid)
1807        t.start()
1808
1809        rv = {
1810                'experimentID': [
1811                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1812                ],
1813                'experimentStatus': 'starting',
1814            }
1815
1816        return rv
1817   
1818    def get_experiment_fedid(self, key):
1819        """
1820        find the fedid associated with the localname key in the state database.
1821        """
1822
1823        rv = None
1824        self.state_lock.acquire()
1825        if self.state.has_key(key):
1826            if isinstance(self.state[key], dict):
1827                try:
1828                    kl = [ f['fedid'] for f in \
1829                            self.state[key]['experimentID']\
1830                                if f.has_key('fedid') ]
1831                except KeyError:
1832                    self.state_lock.release()
1833                    raise service_error(service_error.internal, 
1834                            "No fedid for experiment %s when getting "+\
1835                                    "fedid(!?)" % key)
1836                if len(kl) == 1:
1837                    rv = kl[0]
1838                else:
1839                    self.state_lock.release()
1840                    raise service_error(service_error.internal, 
1841                            "multiple fedids for experiment %s when " +\
1842                                    "getting fedid(!?)" % key)
1843            else:
1844                self.state_lock.release()
1845                raise service_error(service_error.internal, 
1846                        "Unexpected state for %s" % key)
1847        self.state_lock.release()
1848        return rv
1849
1850    def check_experiment_access(self, fid, key):
1851        """
1852        Confirm that the fid has access to the experiment.  Though a request
1853        may be made in terms of a local name, the access attribute is always
1854        the experiment's fedid.
1855        """
1856        if not isinstance(key, fedid):
1857            key = self.get_experiment_fedid(key)
1858
1859        if self.auth.check_attribute(fid, key):
1860            return True
1861        else:
1862            raise service_error(service_error.access, "Access Denied")
1863
1864
1865    def get_handler(self, path, fid):
1866        self.log.info("Get handler %s %s" % (path, fid))
1867        if self.auth.check_attribute(fid, path):
1868            return ("%s/%s" % (self.repodir, path), "application/binary")
1869        else:
1870            return (None, None)
1871
1872    def get_vtopo(self, req, fid):
1873        """
1874        Return the stored virtual topology for this experiment
1875        """
1876        rv = None
1877        state = None
1878
1879        req = req.get('VtopoRequestBody', None)
1880        if not req:
1881            raise service_error(service_error.req,
1882                    "Bad request format (no VtopoRequestBody)")
1883        exp = req.get('experiment', None)
1884        if exp:
1885            if exp.has_key('fedid'):
1886                key = exp['fedid']
1887                keytype = "fedid"
1888            elif exp.has_key('localname'):
1889                key = exp['localname']
1890                keytype = "localname"
1891            else:
1892                raise service_error(service_error.req, "Unknown lookup type")
1893        else:
1894            raise service_error(service_error.req, "No request?")
1895
1896        self.check_experiment_access(fid, key)
1897
1898        self.state_lock.acquire()
1899        if self.state.has_key(key):
1900            if self.state[key].has_key('vtopo'):
1901                rv = { 'experiment' : {keytype: key },\
1902                        'vtopo': self.state[key]['vtopo'],\
1903                    }
1904            else:
1905                state = self.state[key]['experimentStatus']
1906        self.state_lock.release()
1907
1908        if rv: return rv
1909        else: 
1910            if state:
1911                raise service_error(service_error.partial, 
1912                        "Not ready: %s" % state)
1913            else:
1914                raise service_error(service_error.req, "No such experiment")
1915
1916    def get_vis(self, req, fid):
1917        """
1918        Return the stored visualization for this experiment
1919        """
1920        rv = None
1921        state = None
1922
1923        req = req.get('VisRequestBody', None)
1924        if not req:
1925            raise service_error(service_error.req,
1926                    "Bad request format (no VisRequestBody)")
1927        exp = req.get('experiment', None)
1928        if exp:
1929            if exp.has_key('fedid'):
1930                key = exp['fedid']
1931                keytype = "fedid"
1932            elif exp.has_key('localname'):
1933                key = exp['localname']
1934                keytype = "localname"
1935            else:
1936                raise service_error(service_error.req, "Unknown lookup type")
1937        else:
1938            raise service_error(service_error.req, "No request?")
1939
1940        self.check_experiment_access(fid, key)
1941
1942        self.state_lock.acquire()
1943        if self.state.has_key(key):
1944            if self.state[key].has_key('vis'):
1945                rv =  { 'experiment' : {keytype: key },\
1946                        'vis': self.state[key]['vis'],\
1947                        }
1948            else:
1949                state = self.state[key]['experimentStatus']
1950        self.state_lock.release()
1951
1952        if rv: return rv
1953        else:
1954            if state:
1955                raise service_error(service_error.partial, 
1956                        "Not ready: %s" % state)
1957            else:
1958                raise service_error(service_error.req, "No such experiment")
1959
1960    def clean_info_response(self, rv):
1961        """
1962        Remove the information in the experiment's state object that is not in
1963        the info response.
1964        """
1965        # Remove the owner info (should always be there, but...)
1966        if rv.has_key('owner'): del rv['owner']
1967
1968        # Convert the log into the allocationLog parameter and remove the
1969        # log entry (with defensive programming)
1970        if rv.has_key('log'):
1971            rv['allocationLog'] = "".join(rv['log'])
1972            del rv['log']
1973        else:
1974            rv['allocationLog'] = ""
1975
1976        if rv['experimentStatus'] != 'active':
1977            if rv.has_key('federant'): del rv['federant']
1978        else:
1979            # remove the allocationID and uri info from each federant
1980            for f in rv.get('federant', []):
1981                if f.has_key('allocID'): del f['allocID']
1982                if f.has_key('uri'): del f['uri']
1983        return rv
1984
1985    def get_info(self, req, fid):
1986        """
1987        Return all the stored info about this experiment
1988        """
1989        rv = None
1990
1991        req = req.get('InfoRequestBody', None)
1992        if not req:
1993            raise service_error(service_error.req,
1994                    "Bad request format (no InfoRequestBody)")
1995        exp = req.get('experiment', None)
1996        if exp:
1997            if exp.has_key('fedid'):
1998                key = exp['fedid']
1999                keytype = "fedid"
2000            elif exp.has_key('localname'):
2001                key = exp['localname']
2002                keytype = "localname"
2003            else:
2004                raise service_error(service_error.req, "Unknown lookup type")
2005        else:
2006            raise service_error(service_error.req, "No request?")
2007
2008        self.check_experiment_access(fid, key)
2009
2010        # The state may be massaged by the service function that called
2011        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2012        # state.
2013        self.state_lock.acquire()
2014        if self.state.has_key(key):
2015            rv = copy.deepcopy(self.state[key])
2016        self.state_lock.release()
2017
2018        if rv:
2019            return self.clean_info_response(rv)
2020        else:
2021            raise service_error(service_error.req, "No such experiment")
2022
2023    def get_multi_info(self, req, fid):
2024        """
2025        Return all the stored info that this fedid can access
2026        """
2027        rv = { 'info': [ ] }
2028
2029        self.state_lock.acquire()
2030        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2031            try:
2032                self.check_experiment_access(fid, key)
2033            except service_error, e:
2034                if e.code == service_error.access:
2035                    continue
2036                else:
2037                    self.state_lock.release()
2038                    raise e
2039
2040            if self.state.has_key(key):
2041                e = copy.deepcopy(self.state[key])
2042                e = self.clean_info_response(e)
2043                rv['info'].append(e)
2044        self.state_lock.release()
2045        return rv
2046
2047    def terminate_experiment(self, req, fid):
2048        """
2049        Swap this experiment out on the federants and delete the shared
2050        information
2051        """
2052        tbparams = { }
2053        req = req.get('TerminateRequestBody', None)
2054        if not req:
2055            raise service_error(service_error.req,
2056                    "Bad request format (no TerminateRequestBody)")
2057        force = req.get('force', False)
2058        exp = req.get('experiment', None)
2059        if exp:
2060            if exp.has_key('fedid'):
2061                key = exp['fedid']
2062                keytype = "fedid"
2063            elif exp.has_key('localname'):
2064                key = exp['localname']
2065                keytype = "localname"
2066            else:
2067                raise service_error(service_error.req, "Unknown lookup type")
2068        else:
2069            raise service_error(service_error.req, "No request?")
2070
2071        self.check_experiment_access(fid, key)
2072
2073        dealloc_list = [ ]
2074
2075
2076        # Create a logger that logs to the dealloc_list as well as to the main
2077        # log file.
2078        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2079        h = logging.StreamHandler(self.list_log(dealloc_list))
2080        # XXX: there should be a global one of these rather than repeating the
2081        # code.
2082        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2083                    '%d %b %y %H:%M:%S'))
2084        dealloc_log.addHandler(h)
2085
2086        self.state_lock.acquire()
2087        fed_exp = self.state.get(key, None)
2088
2089        if fed_exp:
2090            # This branch of the conditional holds the lock to generate a
2091            # consistent temporary tbparams variable to deallocate experiments.
2092            # It releases the lock to do the deallocations and reacquires it to
2093            # remove the experiment state when the termination is complete.
2094
2095            # First make sure that the experiment creation is complete.
2096            status = fed_exp.get('experimentStatus', None)
2097
2098            if status:
2099                if status in ('starting', 'terminating'):
2100                    if not force:
2101                        self.state_lock.release()
2102                        raise service_error(service_error.partial, 
2103                                'Experiment still being created or destroyed')
2104                    else:
2105                        self.log.warning('Experiment in %s state ' % status + \
2106                                'being terminated by force.')
2107            else:
2108                # No status??? trouble
2109                self.state_lock.release()
2110                raise service_error(service_error.internal,
2111                        "Experiment has no status!?")
2112
2113            ids = []
2114            #  experimentID is a list of dicts that are self-describing
2115            #  identifiers.  This finds all the fedids and localnames - the
2116            #  keys of self.state - and puts them into ids.
2117            for id in fed_exp.get('experimentID', []):
2118                if id.has_key('fedid'): ids.append(id['fedid'])
2119                if id.has_key('localname'): ids.append(id['localname'])
2120
2121            # Collect the allocation/segment ids into a dict keyed by the fedid
2122            # of the allocation (or a monotonically increasing integer) that
2123            # contains a tuple of uri, aid (which is a dict...)
2124            for i, fed in enumerate(fed_exp.get('federant', [])):
2125                try:
2126                    uri = fed['uri']
2127                    aid = fed['allocID']
2128                    k = fed['allocID'].get('fedid', i)
2129                except KeyError, e:
2130                    continue
2131                tbparams[k] = (uri, aid)
2132            fed_exp['experimentStatus'] = 'terminating'
2133            if self.state_filename: self.write_state()
2134            self.state_lock.release()
2135
2136            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2137            # then completes, so we can't wait if nothing starts.  So, no
2138            # tbparams, no start.
2139            if len(tbparams) > 0:
2140                thread_pool = self.thread_pool(self.nthreads)
2141                for k in tbparams.keys():
2142                    # Create and start a thread to stop the segment
2143                    thread_pool.wait_for_slot()
2144                    uri, aid = tbparams[k]
2145                    t  = self.pooled_thread(\
2146                            target=self.terminate_segment(log=dealloc_log,
2147                                testbed=uri,
2148                                cert_file=self.cert_file, 
2149                                cert_pwd=self.cert_pwd,
2150                                trusted_certs=self.trusted_certs,
2151                                caller=self.call_TerminateSegment),
2152                            args=(uri, aid), name=k,
2153                            pdata=thread_pool, trace_file=self.trace_file)
2154                    t.start()
2155                # Wait for completions
2156                thread_pool.wait_for_all_done()
2157
2158            # release the allocations (failed experiments have done this
2159            # already, and starting experiments may be in odd states, so we
2160            # ignore errors releasing those allocations
2161            try: 
2162                for k in tbparams.keys():
2163                    # This releases access by uri
2164                    uri, aid = tbparams[k]
2165                    self.release_access(None, aid, uri=uri)
2166            except service_error, e:
2167                if status != 'failed' and not force:
2168                    raise e
2169
2170            # Remove the terminated experiment
2171            self.state_lock.acquire()
2172            for id in ids:
2173                if self.state.has_key(id): del self.state[id]
2174
2175            if self.state_filename: self.write_state()
2176            self.state_lock.release()
2177
2178            # Delete any synch points associated with this experiment.  All
2179            # synch points begin with the fedid of the experiment.
2180            fedid_keys = set(["fedid:%s" % f for f in ids \
2181                    if isinstance(f, fedid)])
2182            for k in self.synch_store.all_keys():
2183                try:
2184                    if len(k) > 45 and k[0:46] in fedid_keys:
2185                        self.synch_store.del_value(k)
2186                except synch_store.BadDeletionError:
2187                    pass
2188            self.write_store()
2189               
2190            return { 
2191                    'experiment': exp , 
2192                    'deallocationLog': "".join(dealloc_list),
2193                    }
2194        else:
2195            # Don't forget to release the lock
2196            self.state_lock.release()
2197            raise service_error(service_error.req, "No saved state")
2198
2199
2200    def GetValue(self, req, fid):
2201        """
2202        Get a value from the synchronized store
2203        """
2204        req = req.get('GetValueRequestBody', None)
2205        if not req:
2206            raise service_error(service_error.req,
2207                    "Bad request format (no GetValueRequestBody)")
2208       
2209        name = req['name']
2210        wait = req['wait']
2211        rv = { 'name': name }
2212
2213        if self.auth.check_attribute(fid, name):
2214            try:
2215                v = self.synch_store.get_value(name, wait)
2216            except synch_store.RevokedKeyError:
2217                # No more synch on this key
2218                raise service_error(service_error.federant, 
2219                        "Synch key %s revoked" % name)
2220            if v is not None:
2221                rv['value'] = v
2222            self.log.debug("[GetValue] got %s from %s" % (v, name))
2223            return rv
2224        else:
2225            raise service_error(service_error.access, "Access Denied")
2226       
2227
2228    def SetValue(self, req, fid):
2229        """
2230        Set a value in the synchronized store
2231        """
2232        req = req.get('SetValueRequestBody', None)
2233        if not req:
2234            raise service_error(service_error.req,
2235                    "Bad request format (no SetValueRequestBody)")
2236       
2237        name = req['name']
2238        v = req['value']
2239
2240        if self.auth.check_attribute(fid, name):
2241            try:
2242                self.synch_store.set_value(name, v)
2243                self.write_store()
2244                self.log.debug("[SetValue] set %s to %s" % (name, v))
2245            except synch_store.CollisionError:
2246                # Translate into a service_error
2247                raise service_error(service_error.req,
2248                        "Value already set: %s" %name)
2249            except synch_store.RevokedKeyError:
2250                # No more synch on this key
2251                raise service_error(service_error.federant, 
2252                        "Synch key %s revoked" % name)
2253            return { 'name': name, 'value': v }
2254        else:
2255            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.