source: fedd/federation/experiment_control.py @ 4f2f41f

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 4f2f41f was 4f2f41f, checked in by Ted Faber <faber@…>, 14 years ago

needs to go, but not yet

  • Property mode set to 100644
File size: 79.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55class experiment_control_local:
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    class thread_pool:
66        """
67        A class to keep track of a set of threads all invoked for the same
68        task.  Manages the mutual exclusion of the states.
69        """
70        def __init__(self, nthreads):
71            """
72            Start a pool.
73            """
74            self.changed = Condition()
75            self.started = 0
76            self.terminated = 0
77            self.nthreads = nthreads
78
79        def acquire(self):
80            """
81            Get the pool's lock.
82            """
83            self.changed.acquire()
84
85        def release(self):
86            """
87            Release the pool's lock.
88            """
89            self.changed.release()
90
91        def wait(self, timeout = None):
92            """
93            Wait for a pool thread to start or stop.
94            """
95            self.changed.wait(timeout)
96
97        def start(self):
98            """
99            Called by a pool thread to report starting.
100            """
101            self.changed.acquire()
102            self.started += 1
103            self.changed.notifyAll()
104            self.changed.release()
105
106        def terminate(self):
107            """
108            Called by a pool thread to report finishing.
109            """
110            self.changed.acquire()
111            self.terminated += 1
112            self.changed.notifyAll()
113            self.changed.release()
114
115        def clear(self):
116            """
117            Clear all pool data.
118            """
119            self.changed.acquire()
120            self.started = 0
121            self.terminated =0
122            self.changed.notifyAll()
123            self.changed.release()
124
125        def wait_for_slot(self):
126            """
127            Wait until we have a free slot to start another pooled thread
128            """
129            self.acquire()
130            while self.started - self.terminated >= self.nthreads:
131                self.wait()
132            self.release()
133
134        def wait_for_all_done(self, timeout=None):
135            """
136            Wait until all active threads finish (and at least one has
137            started).  If a timeout is given, return after waiting that long
138            for termination.  If all threads are done (and one has started in
139            the since the last clear()) return True, otherwise False.
140            """
141            if timeout:
142                deadline = time.time() + timeout
143            self.acquire()
144            while self.started == 0 or self.started > self.terminated:
145                self.wait(timeout)
146                if timeout:
147                    if time.time() > deadline:
148                        break
149                    timeout = deadline - time.time()
150            self.release()
151            return not (self.started == 0 or self.started > self.terminated)
152
153    class pooled_thread(Thread):
154        """
155        One of a set of threads dedicated to a specific task.  Uses the
156        thread_pool class above for coordination.
157        """
158        def __init__(self, group=None, target=None, name=None, args=(), 
159                kwargs={}, pdata=None, trace_file=None):
160            Thread.__init__(self, group, target, name, args, kwargs)
161            self.rv = None          # Return value of the ops in this thread
162            self.exception = None   # Exception that terminated this thread
163            self.target=target      # Target function to run on start()
164            self.args = args        # Args to pass to target
165            self.kwargs = kwargs    # Additional kw args
166            self.pdata = pdata      # thread_pool for this class
167            # Logger for this thread
168            self.log = logging.getLogger("fedd.experiment_control")
169       
170        def run(self):
171            """
172            Emulate Thread.run, except add pool data manipulation and error
173            logging.
174            """
175            if self.pdata:
176                self.pdata.start()
177
178            if self.target:
179                try:
180                    self.rv = self.target(*self.args, **self.kwargs)
181                except service_error, s:
182                    self.exception = s
183                    self.log.error("Thread exception: %s %s" % \
184                            (s.code_string(), s.desc))
185                except:
186                    self.exception = sys.exc_info()[1]
187                    self.log.error(("Unexpected thread exception: %s" +\
188                            "Trace %s") % (self.exception,\
189                                traceback.format_exc()))
190            if self.pdata:
191                self.pdata.terminate()
192
193    call_RequestAccess = service_caller('RequestAccess')
194    call_ReleaseAccess = service_caller('ReleaseAccess')
195    call_StartSegment = service_caller('StartSegment')
196    call_TerminateSegment = service_caller('TerminateSegment')
197    call_Ns2Topdl = service_caller('Ns2Topdl')
198
199    def __init__(self, config=None, auth=None):
200        """
201        Intialize the various attributes, most from the config object
202        """
203
204        def parse_tarfile_list(tf):
205            """
206            Parse a tarfile list from the configuration.  This is a set of
207            paths and tarfiles separated by spaces.
208            """
209            rv = [ ]
210            if tf is not None:
211                tl = tf.split()
212                while len(tl) > 1:
213                    p, t = tl[0:2]
214                    del tl[0:2]
215                    rv.append((p, t))
216            return rv
217
218        self.thread_with_rv = experiment_control_local.pooled_thread
219        self.thread_pool = experiment_control_local.thread_pool
220        self.list_log = list_log.list_log
221
222        self.cert_file = config.get("experiment_control", "cert_file")
223        if self.cert_file:
224            self.cert_pwd = config.get("experiment_control", "cert_pwd")
225        else:
226            self.cert_file = config.get("globals", "cert_file")
227            self.cert_pwd = config.get("globals", "cert_pwd")
228
229        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
230                or config.get("globals", "trusted_certs")
231
232        self.repodir = config.get("experiment_control", "repodir")
233        self.repo_url = config.get("experiment_control", "repo_url", 
234                "https://users.isi.deterlab.net:23235");
235
236        self.exp_stem = "fed-stem"
237        self.log = logging.getLogger("fedd.experiment_control")
238        set_log_level(config, "experiment_control", self.log)
239        self.muxmax = 2
240        self.nthreads = 10
241        self.randomize_experiments = False
242
243        self.splitter = None
244        self.ssh_keygen = "/usr/bin/ssh-keygen"
245        self.ssh_identity_file = None
246
247
248        self.debug = config.getboolean("experiment_control", "create_debug")
249        self.cleanup = not config.getboolean("experiment_control", 
250                "leave_tmpfiles")
251        self.state_filename = config.get("experiment_control", 
252                "experiment_state")
253        self.store_filename = config.get("experiment_control", 
254                "synch_store")
255        self.store_url = config.get("experiment_control", "store_url")
256        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
257        self.fedkit = parse_tarfile_list(\
258                config.get("experiment_control", "fedkit"))
259        self.gatewaykit = parse_tarfile_list(\
260                config.get("experiment_control", "gatewaykit"))
261        accessdb_file = config.get("experiment_control", "accessdb")
262
263        self.ssh_pubkey_file = config.get("experiment_control", 
264                "ssh_pubkey_file")
265        self.ssh_privkey_file = config.get("experiment_control",
266                "ssh_privkey_file")
267        # NB for internal master/slave ops, not experiment setup
268        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
269
270        self.overrides = set([])
271        ovr = config.get('experiment_control', 'overrides')
272        if ovr:
273            for o in ovr.split(","):
274                o = o.strip()
275                if o.startswith('fedid:'): o = o[len('fedid:'):]
276                self.overrides.add(fedid(hexstr=o))
277
278        self.state = { }
279        self.state_lock = Lock()
280        self.tclsh = "/usr/local/bin/otclsh"
281        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
282                config.get("experiment_control", "tcl_splitter",
283                        "/usr/testbed/lib/ns2ir/parse.tcl")
284        mapdb_file = config.get("experiment_control", "mapdb")
285        self.trace_file = sys.stderr
286
287        self.def_expstart = \
288                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
289                "/tmp/federate";
290        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
291                "FEDDIR/hosts";
292        self.def_gwstart = \
293                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
294                "/tmp/bridge.log";
295        self.def_mgwstart = \
296                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
297                "/tmp/bridge.log";
298        self.def_gwimage = "FBSD61-TUNNEL2";
299        self.def_gwtype = "pc";
300        self.local_access = { }
301
302        if auth:
303            self.auth = auth
304        else:
305            self.log.error(\
306                    "[access]: No authorizer initialized, creating local one.")
307            auth = authorizer()
308
309
310        if self.ssh_pubkey_file:
311            try:
312                f = open(self.ssh_pubkey_file, 'r')
313                self.ssh_pubkey = f.read()
314                f.close()
315            except IOError:
316                raise service_error(service_error.internal,
317                        "Cannot read sshpubkey")
318        else:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322        if not self.ssh_privkey_file:
323            raise service_error(service_error.internal, 
324                    "No SSH public key file?")
325
326
327        if mapdb_file:
328            self.read_mapdb(mapdb_file)
329        else:
330            self.log.warn("[experiment_control] No testbed map, using defaults")
331            self.tbmap = { 
332                    'deter':'https://users.isi.deterlab.net:23235',
333                    'emulab':'https://users.isi.deterlab.net:23236',
334                    'ucb':'https://users.isi.deterlab.net:23237',
335                    }
336
337        if accessdb_file:
338                self.read_accessdb(accessdb_file)
339        else:
340            raise service_error(service_error.internal,
341                    "No accessdb specified in config")
342
343        # Grab saved state.  OK to do this w/o locking because it's read only
344        # and only one thread should be in existence that can see self.state at
345        # this point.
346        if self.state_filename:
347            self.read_state()
348
349        if self.store_filename:
350            self.read_store()
351        else:
352            self.log.warning("No saved synch store")
353            self.synch_store = synch_store
354
355        # Dispatch tables
356        self.soap_services = {\
357                'New': soap_handler('New', self.new_experiment),
358                'Create': soap_handler('Create', self.create_experiment),
359                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
360                'Vis': soap_handler('Vis', self.get_vis),
361                'Info': soap_handler('Info', self.get_info),
362                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
363                'Terminate': soap_handler('Terminate', 
364                    self.terminate_experiment),
365                'GetValue': soap_handler('GetValue', self.GetValue),
366                'SetValue': soap_handler('SetValue', self.SetValue),
367        }
368
369        self.xmlrpc_services = {\
370                'New': xmlrpc_handler('New', self.new_experiment),
371                'Create': xmlrpc_handler('Create', self.create_experiment),
372                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
373                'Vis': xmlrpc_handler('Vis', self.get_vis),
374                'Info': xmlrpc_handler('Info', self.get_info),
375                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
376                'Terminate': xmlrpc_handler('Terminate',
377                    self.terminate_experiment),
378                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
379                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
380        }
381
382    # Call while holding self.state_lock
383    def write_state(self):
384        """
385        Write a new copy of experiment state after copying the existing state
386        to a backup.
387
388        State format is a simple pickling of the state dictionary.
389        """
390        if os.access(self.state_filename, os.W_OK):
391            copy_file(self.state_filename, \
392                    "%s.bak" % self.state_filename)
393        try:
394            f = open(self.state_filename, 'w')
395            pickle.dump(self.state, f)
396        except IOError, e:
397            self.log.error("Can't write file %s: %s" % \
398                    (self.state_filename, e))
399        except pickle.PicklingError, e:
400            self.log.error("Pickling problem: %s" % e)
401        except TypeError, e:
402            self.log.error("Pickling problem (TypeError): %s" % e)
403
404    @staticmethod
405    def get_alloc_ids(state):
406        """
407        Pull the fedids of the identifiers of each allocation from the
408        state.  Again, a dict dive that's best isolated.
409
410        Used by read_store and read state
411        """
412
413        return [ f['allocID']['fedid'] 
414                for f in state.get('federant',[]) \
415                    if f.has_key('allocID') and \
416                        f['allocID'].has_key('fedid')]
417
418    # Call while holding self.state_lock
419    def read_state(self):
420        """
421        Read a new copy of experiment state.  Old state is overwritten.
422
423        State format is a simple pickling of the state dictionary.
424        """
425       
426        def get_experiment_id(state):
427            """
428            Pull the fedid experimentID out of the saved state.  This is kind
429            of a gross walk through the dict.
430            """
431
432            if state.has_key('experimentID'):
433                for e in state['experimentID']:
434                    if e.has_key('fedid'):
435                        return e['fedid']
436                else:
437                    return None
438            else:
439                return None
440
441        try:
442            f = open(self.state_filename, "r")
443            self.state = pickle.load(f)
444            self.log.debug("[read_state]: Read state from %s" % \
445                    self.state_filename)
446        except IOError, e:
447            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
448                    % (self.state_filename, e))
449        except pickle.UnpicklingError, e:
450            self.log.warning(("[read_state]: No saved state: " + \
451                    "Unpickling failed: %s") % e)
452       
453        for s in self.state.values():
454            try:
455
456                eid = get_experiment_id(s)
457                if eid : 
458                    # Give the owner rights to the experiment
459                    self.auth.set_attribute(s['owner'], eid)
460                    # And holders of the eid as well
461                    self.auth.set_attribute(eid, eid)
462                    # allow overrides to control experiments as well
463                    for o in self.overrides:
464                        self.auth.set_attribute(o, eid)
465                    # Set permissions to allow reading of the software repo, if
466                    # any, as well.
467                    for a in self.get_alloc_ids(s):
468                        self.auth.set_attribute(a, 'repo/%s' % eid)
469                else:
470                    raise KeyError("No experiment id")
471            except KeyError, e:
472                self.log.warning("[read_state]: State ownership or identity " +\
473                        "misformatted in %s: %s" % (self.state_filename, e))
474
475
476    def read_accessdb(self, accessdb_file):
477        """
478        Read the mapping from fedids that can create experiments to their name
479        in the 3-level access namespace.  All will be asserted from this
480        testbed and can include the local username and porject that will be
481        asserted on their behalf by this fedd.  Each fedid is also added to the
482        authorization system with the "create" attribute.
483        """
484        self.accessdb = {}
485        # These are the regexps for parsing the db
486        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
487        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
488                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
489        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
490                "\s*->\s*(" + name_expr + ")\s*$")
491        lineno = 0
492
493        # Parse the mappings and store in self.authdb, a dict of
494        # fedid -> (proj, user)
495        try:
496            f = open(accessdb_file, "r")
497            for line in f:
498                lineno += 1
499                line = line.strip()
500                if len(line) == 0 or line.startswith('#'):
501                    continue
502                m = project_line.match(line)
503                if m:
504                    fid = fedid(hexstr=m.group(1))
505                    project, user = m.group(2,3)
506                    if not self.accessdb.has_key(fid):
507                        self.accessdb[fid] = []
508                    self.accessdb[fid].append((project, user))
509                    continue
510
511                m = user_line.match(line)
512                if m:
513                    fid = fedid(hexstr=m.group(1))
514                    project = None
515                    user = m.group(2)
516                    if not self.accessdb.has_key(fid):
517                        self.accessdb[fid] = []
518                    self.accessdb[fid].append((project, user))
519                    continue
520                self.log.warn("[experiment_control] Error parsing access " +\
521                        "db %s at line %d" %  (accessdb_file, lineno))
522        except IOError:
523            raise service_error(service_error.internal,
524                    "Error opening/reading %s as experiment " +\
525                            "control accessdb" %  accessdb_file)
526        f.close()
527
528        # Initialize the authorization attributes
529        for fid in self.accessdb.keys():
530            self.auth.set_attribute(fid, 'create')
531            self.auth.set_attribute(fid, 'new')
532
533    def read_mapdb(self, file):
534        """
535        Read a simple colon separated list of mappings for the
536        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
537        """
538
539        self.tbmap = { }
540        lineno =0
541        try:
542            f = open(file, "r")
543            for line in f:
544                lineno += 1
545                line = line.strip()
546                if line.startswith('#') or len(line) == 0:
547                    continue
548                try:
549                    label, url = line.split(':', 1)
550                    self.tbmap[label] = url
551                except ValueError, e:
552                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
553                            "map db: %s %s" % (lineno, line, e))
554        except IOError, e:
555            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
556                    "open %s: %s" % (file, e))
557        f.close()
558
559    def read_store(self):
560        try:
561            self.synch_store = synch_store()
562            self.synch_store.load(self.store_filename)
563            self.log.debug("[read_store]: Read store from %s" % \
564                    self.store_filename)
565        except IOError, e:
566            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
567                    % (self.state_filename, e))
568            self.synch_store = synch_store()
569
570        # Set the initial permissions on data in the store.  XXX: This ad hoc
571        # authorization attribute initialization is getting out of hand.
572        for k in self.synch_store.all_keys():
573            try:
574                if k.startswith('fedid:'):
575                    fid = fedid(hexstr=k[6:46])
576                    if self.state.has_key(fid):
577                        for a in self.get_alloc_ids(self.state[fid]):
578                            self.auth.set_attribute(a, k)
579            except ValueError, e:
580                self.log.warn("Cannot deduce permissions for %s" % k)
581
582
583    def write_store(self):
584        """
585        Write a new copy of synch_store after writing current state
586        to a backup.  We use the internal synch_store pickle method to avoid
587        incinsistent data.
588
589        State format is a simple pickling of the store.
590        """
591        if os.access(self.store_filename, os.W_OK):
592            copy_file(self.store_filename, \
593                    "%s.bak" % self.store_filename)
594        try:
595            self.synch_store.save(self.store_filename)
596        except IOError, e:
597            self.log.error("Can't write file %s: %s" % \
598                    (self.store_filename, e))
599        except TypeError, e:
600            self.log.error("Pickling problem (TypeError): %s" % e)
601
602       
603    def generate_ssh_keys(self, dest, type="rsa" ):
604        """
605        Generate a set of keys for the gateways to use to talk.
606
607        Keys are of type type and are stored in the required dest file.
608        """
609        valid_types = ("rsa", "dsa")
610        t = type.lower();
611        if t not in valid_types: raise ValueError
612        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
613
614        try:
615            trace = open("/dev/null", "w")
616        except IOError:
617            raise service_error(service_error.internal,
618                    "Cannot open /dev/null??");
619
620        # May raise CalledProcessError
621        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
622        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
623        if rv != 0:
624            raise service_error(service_error.internal, 
625                    "Cannot generate nonce ssh keys.  %s return code %d" \
626                            % (self.ssh_keygen, rv))
627
628    def gentopo(self, str):
629        """
630        Generate the topology dtat structure from the splitter's XML
631        representation of it.
632
633        The topology XML looks like:
634            <experiment>
635                <nodes>
636                    <node><vname></vname><ips>ip1:ip2</ips></node>
637                </nodes>
638                <lans>
639                    <lan>
640                        <vname></vname><vnode></vnode><ip></ip>
641                        <bandwidth></bandwidth><member>node:port</member>
642                    </lan>
643                </lans>
644        """
645        class topo_parse:
646            """
647            Parse the topology XML and create the dats structure.
648            """
649            def __init__(self):
650                # Typing of the subelements for data conversion
651                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
652                self.int_subelements = ( 'bandwidth',)
653                self.float_subelements = ( 'delay',)
654                # The final data structure
655                self.nodes = [ ]
656                self.lans =  [ ]
657                self.topo = { \
658                        'node': self.nodes,\
659                        'lan' : self.lans,\
660                    }
661                self.element = { }  # Current element being created
662                self.chars = ""     # Last text seen
663
664            def end_element(self, name):
665                # After each sub element the contents is added to the current
666                # element or to the appropriate list.
667                if name == 'node':
668                    self.nodes.append(self.element)
669                    self.element = { }
670                elif name == 'lan':
671                    self.lans.append(self.element)
672                    self.element = { }
673                elif name in self.str_subelements:
674                    self.element[name] = self.chars
675                    self.chars = ""
676                elif name in self.int_subelements:
677                    self.element[name] = int(self.chars)
678                    self.chars = ""
679                elif name in self.float_subelements:
680                    self.element[name] = float(self.chars)
681                    self.chars = ""
682
683            def found_chars(self, data):
684                self.chars += data.rstrip()
685
686
687        tp = topo_parse();
688        parser = xml.parsers.expat.ParserCreate()
689        parser.EndElementHandler = tp.end_element
690        parser.CharacterDataHandler = tp.found_chars
691
692        parser.Parse(str)
693
694        return tp.topo
695       
696
697    def genviz(self, topo):
698        """
699        Generate the visualization the virtual topology
700        """
701
702        neato = "/usr/local/bin/neato"
703        # These are used to parse neato output and to create the visualization
704        # file.
705        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
706        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
707                "%s</type></node>"
708
709        try:
710            # Node names
711            nodes = [ n['vname'] for n in topo['node'] ]
712            topo_lans = topo['lan']
713        except KeyError, e:
714            raise service_error(service_error.internal, "Bad topology: %s" %e)
715
716        lans = { }
717        links = { }
718
719        # Walk through the virtual topology, organizing the connections into
720        # 2-node connections (links) and more-than-2-node connections (lans).
721        # When a lan is created, it's added to the list of nodes (there's a
722        # node in the visualization for the lan).
723        for l in topo_lans:
724            if links.has_key(l['vname']):
725                if len(links[l['vname']]) < 2:
726                    links[l['vname']].append(l['vnode'])
727                else:
728                    nodes.append(l['vname'])
729                    lans[l['vname']] = links[l['vname']]
730                    del links[l['vname']]
731                    lans[l['vname']].append(l['vnode'])
732            elif lans.has_key(l['vname']):
733                lans[l['vname']].append(l['vnode'])
734            else:
735                links[l['vname']] = [ l['vnode'] ]
736
737
738        # Open up a temporary file for dot to turn into a visualization
739        try:
740            df, dotname = tempfile.mkstemp()
741            dotfile = os.fdopen(df, 'w')
742        except IOError:
743            raise service_error(service_error.internal,
744                    "Failed to open file in genviz")
745
746        try:
747            dnull = open('/dev/null', 'w')
748        except IOError:
749            service_error(service_error.internal,
750                    "Failed to open /dev/null in genviz")
751
752        # Generate a dot/neato input file from the links, nodes and lans
753        try:
754            print >>dotfile, "graph G {"
755            for n in nodes:
756                print >>dotfile, '\t"%s"' % n
757            for l in links.keys():
758                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
759            for l in lans.keys():
760                for n in lans[l]:
761                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
762            print >>dotfile, "}"
763            dotfile.close()
764        except TypeError:
765            raise service_error(service_error.internal,
766                    "Single endpoint link in vtopo")
767        except IOError:
768            raise service_error(service_error.internal, "Cannot write dot file")
769
770        # Use dot to create a visualization
771        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
772                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
773                close_fds=True)
774        dnull.close()
775
776        # Translate dot to vis format
777        vis_nodes = [ ]
778        vis = { 'node': vis_nodes }
779        for line in dot.stdout:
780            m = vis_re.match(line)
781            if m:
782                vn = m.group(1)
783                vis_node = {'name': vn, \
784                        'x': float(m.group(2)),\
785                        'y' : float(m.group(3)),\
786                    }
787                if vn in links.keys() or vn in lans.keys():
788                    vis_node['type'] = 'lan'
789                else:
790                    vis_node['type'] = 'node'
791                vis_nodes.append(vis_node)
792        rv = dot.wait()
793
794        os.remove(dotname)
795        if rv == 0 : return vis
796        else: return None
797
798    def get_access(self, tb, nodes, tbparam, access_user, masters):
799        """
800        Get access to testbed through fedd and set the parameters for that tb
801        """
802        def get_export_project(svcs):
803            """
804            Look through for the list of federated_service for this testbed
805            objects for a project_export service, and extract the project
806            parameter.
807            """
808
809            pe = [s for s in svcs if s.name=='project_export']
810            if len(pe) == 1:
811                return pe[0].params.get('project', None)
812            elif len(pe) == 0:
813                return None
814            else:
815                raise service_error(service_error.req,
816                        "More than one project export is not supported")
817
818        uri = self.tbmap.get(tb, None)
819        if not uri:
820            raise service_error(service_error.server_config, 
821                    "Unknown testbed: %s" % tb)
822
823        export_svcs = masters.get(tb,[])
824        import_svcs = [ s for m in masters.values() \
825                for s in m \
826                    if tb in s.importers ]
827
828        export_project = get_export_project(export_svcs)
829
830        # Tweak search order so that if there are entries in access_user that
831        # have a project matching the export project, we try them first
832        if export_project: 
833            access_sequence = [ (p, u) for p, u in access_user \
834                    if p == export_project] 
835            access_sequence.extend([(p, u) for p, u in access_user \
836                    if p != export_project]) 
837        else: 
838            access_sequence = access_user
839
840        for p, u in access_sequence: 
841            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
842                    "to %s") %  ((p or "None"), u, uri))
843
844            if p:
845                # Request with user and project specified
846                req = {\
847                        'destinationTestbed' : { 'uri' : uri },
848                        'credential': [ "project: %s" % p, "user: %s"  % u],
849                        'allocID' : { 'localname': 'test' },
850                    }
851            else:
852                # Request with only user specified
853                req = {\
854                        'destinationTestbed' : { 'uri' : uri },
855                        'credential': [ 'user: %s' % u ],
856                        'allocID' : { 'localname': 'test' },
857                    }
858
859            # Make the service request from the services we're importing and
860            # exporting.  Keep track of the export request ids so we can
861            # collect the resulting info from the access response.
862            e_keys = { }
863            if import_svcs or export_svcs:
864                req['service'] = [ ]
865
866                for i, s in enumerate(import_svcs):
867                    idx = 'import%d' % i
868                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
869                    if s.params:
870                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
871                                for k, v in s.params.items()]
872                    req['service'].append(sr)
873
874                for i, s in enumerate(export_svcs):
875                    idx = 'export%d' % i
876                    e_keys[idx] = s
877                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
878                    if s.params:
879                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
880                                for k, v in s.params.items()]
881                    req['service'].append(sr)
882
883            # node resources if any
884            if nodes != None and len(nodes) > 0:
885                rnodes = [ ]
886                for n in nodes:
887                    rn = { }
888                    image, hw, count = n.split(":")
889                    if image: rn['image'] = [ image ]
890                    if hw: rn['hardware'] = [ hw ]
891                    if count and int(count) >0 : rn['count'] = int(count)
892                    rnodes.append(rn)
893                req['resources']= { }
894                req['resources']['node'] = rnodes
895
896            try:
897                if self.local_access.has_key(uri):
898                    # Local access call
899                    req = { 'RequestAccessRequestBody' : req }
900                    r = self.local_access[uri].RequestAccess(req, 
901                            fedid(file=self.cert_file))
902                    r = { 'RequestAccessResponseBody' : r }
903                else:
904                    r = self.call_RequestAccess(uri, req, 
905                            self.cert_file, self.cert_pwd, self.trusted_certs)
906            except service_error, e:
907                if e.code == service_error.access:
908                    self.log.debug("[get_access] Access denied")
909                    r = None
910                    continue
911                else:
912                    raise e
913
914            if r.has_key('RequestAccessResponseBody'):
915                # Through to here we have a valid response, not a fault.
916                # Access denied is a fault, so something better or worse than
917                # access denied has happened.
918                r = r['RequestAccessResponseBody']
919                self.log.debug("[get_access] Access granted")
920                break
921            else:
922                raise service_error(service_error.protocol,
923                        "Bad proxy response")
924       
925        if not r:
926            raise service_error(service_error.access, 
927                    "Access denied by %s (%s)" % (tb, uri))
928
929        tbparam[tb] = { 
930                "allocID" : r['allocID'],
931                "uri": uri,
932                }
933
934        # Collect the responses corresponding to the services this testbed
935        # exports.  These will be the service requests that we will include in
936        # the start segment requests (with appropriate visibility values) to
937        # import and export the segments.
938        for s in r.get('service', []):
939            id = s.get('id', None)
940            if id and id in e_keys:
941                e_keys[id].reqs.append(s)
942
943        # Add attributes to parameter space.  We don't allow attributes to
944        # overlay any parameters already installed.
945        for a in r.get('fedAttr', []):
946            try:
947                if a['attribute'] and \
948                        isinstance(a['attribute'], basestring)\
949                        and not tbparam[tb].has_key(a['attribute'].lower()):
950                    tbparam[tb][a['attribute'].lower()] = a['value']
951            except KeyError:
952                self.log.error("Bad attribute in response: %s" % a)
953
954    def release_access(self, tb, aid, uri=None):
955        """
956        Release access to testbed through fedd
957        """
958
959        if not uri:
960            uri = self.tbmap.get(tb, None)
961        if not uri:
962            raise service_error(service_error.server_config, 
963                    "Unknown testbed: %s" % tb)
964
965        if self.local_access.has_key(uri):
966            resp = self.local_access[uri].ReleaseAccess(\
967                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
968                    fedid(file=self.cert_file))
969            resp = { 'ReleaseAccessResponseBody': resp } 
970        else:
971            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
972                    self.cert_file, self.cert_pwd, self.trusted_certs)
973
974        # better error coding
975
976    def remote_ns2topdl(self, uri, desc):
977
978        req = {
979                'description' : { 'ns2description': desc },
980            }
981
982        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
983                self.trusted_certs)
984
985        if r.has_key('Ns2TopdlResponseBody'):
986            r = r['Ns2TopdlResponseBody']
987            ed = r.get('experimentdescription', None)
988            if ed.has_key('topdldescription'):
989                return topdl.Topology(**ed['topdldescription'])
990            else:
991                raise service_error(service_error.protocol, 
992                        "Bad splitter response (no output)")
993        else:
994            raise service_error(service_error.protocol, "Bad splitter response")
995
996    class start_segment:
997        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
998                cert_pwd=None, trusted_certs=None, caller=None,
999                log_collector=None):
1000            self.log = log
1001            self.debug = debug
1002            self.cert_file = cert_file
1003            self.cert_pwd = cert_pwd
1004            self.trusted_certs = None
1005            self.caller = caller
1006            self.testbed = testbed
1007            self.log_collector = log_collector
1008            self.response = None
1009
1010        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1011            req = {
1012                    'allocID': { 'fedid' : aid }, 
1013                    'segmentdescription': { 
1014                        'topdldescription': topo.to_dict(),
1015                    },
1016                    # XXX: deprecated
1017                    'master': False,
1018                }
1019
1020            if connInfo:
1021                req['connection'] = connInfo
1022
1023            import_svcs = [ s for m in masters.values() \
1024                    for s in m if self.testbed in s.importers]
1025
1026            if import_svcs or self.testbed in masters:
1027                req['service'] = []
1028
1029            for s in import_svcs:
1030                for r in s.reqs:
1031                    sr = copy.deepcopy(r)
1032                    sr['visibility'] = 'import';
1033                    req['service'].append(sr)
1034
1035            for s in masters.get(self.testbed, []):
1036                for r in s.reqs:
1037                    sr = copy.deepcopy(r)
1038                    sr['visibility'] = 'export';
1039                    req['service'].append(sr)
1040
1041            if attrs:
1042                req['fedAttr'] = attrs
1043
1044            try:
1045                self.log.debug("Calling StartSegment at %s " % uri)
1046                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1047                        self.trusted_certs)
1048                if r.has_key('StartSegmentResponseBody'):
1049                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1050                            None)
1051                    if lval and self.log_collector:
1052                        for line in  lval.splitlines(True):
1053                            self.log_collector.write(line)
1054                    self.response = r
1055                else:
1056                    raise service_error(service_error.internal, 
1057                            "Bad response!?: %s" %r)
1058                return True
1059            except service_error, e:
1060                self.log.error("Start segment failed on %s: %s" % \
1061                        (self.testbed, e))
1062                return False
1063
1064
1065
1066    class terminate_segment:
1067        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1068                cert_pwd=None, trusted_certs=None, caller=None):
1069            self.log = log
1070            self.debug = debug
1071            self.cert_file = cert_file
1072            self.cert_pwd = cert_pwd
1073            self.trusted_certs = None
1074            self.caller = caller
1075            self.testbed = testbed
1076
1077        def __call__(self, uri, aid ):
1078            req = {
1079                    'allocID': aid , 
1080                }
1081            try:
1082                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1083                        self.trusted_certs)
1084                return True
1085            except service_error, e:
1086                self.log.error("Terminate segment failed on %s: %s" % \
1087                        (self.testbed, e))
1088                return False
1089   
1090
1091    def allocate_resources(self, allocated, masters, eid, expid, 
1092            tbparams, topo, tmpdir, alloc_log=None, log_collector=None, 
1093            attrs=None, connInfo={}):
1094
1095        started = { }           # Testbeds where a sub-experiment started
1096                                # successfully
1097
1098        # XXX
1099        fail_soft = False
1100
1101        log = alloc_log or self.log
1102
1103        thread_pool = self.thread_pool(self.nthreads)
1104        threads = [ ]
1105
1106        for tb in allocated.keys():
1107            # Create and start a thread to start the segment, and save it
1108            # to get the return value later
1109            thread_pool.wait_for_slot()
1110            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1111            if not uri:
1112                raise service_error(service_error.internal, 
1113                        "Unknown testbed %s !?" % tb)
1114
1115            if tbparams[tb].has_key('allocID') and \
1116                    tbparams[tb]['allocID'].has_key('fedid'):
1117                aid = tbparams[tb]['allocID']['fedid']
1118            else:
1119                raise service_error(service_error.internal, 
1120                        "No alloc id for testbed %s !?" % tb)
1121
1122            t  = self.pooled_thread(\
1123                    target=self.start_segment(log=log, debug=self.debug,
1124                        testbed=tb, cert_file=self.cert_file,
1125                        cert_pwd=self.cert_pwd,
1126                        trusted_certs=self.trusted_certs,
1127                        caller=self.call_StartSegment,
1128                        log_collector=log_collector), 
1129                    args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]),
1130                    name=tb,
1131                    pdata=thread_pool, trace_file=self.trace_file)
1132            threads.append(t)
1133            t.start()
1134
1135        # Wait until all finish (keep pinging the log, though)
1136        mins = 0
1137        revoked = False
1138        while not thread_pool.wait_for_all_done(60.0):
1139            mins += 1
1140            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1141                    % mins)
1142            if not revoked and \
1143                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1144                # a testbed has failed.  Revoke this experiment's
1145                # synchronizarion values so that sub experiments will not
1146                # deadlock waiting for synchronization that will never happen
1147                self.log.info("A subexperiment has failed to swap in, " + \
1148                        "revoking synch keys")
1149                var_key = "fedid:%s" % expid
1150                for k in self.synch_store.all_keys():
1151                    if len(k) > 45 and k[0:46] == var_key:
1152                        self.synch_store.revoke_key(k)
1153                revoked = True
1154
1155        failed = [ t.getName() for t in threads if not t.rv ]
1156        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1157
1158        # If one failed clean up, unless fail_soft is set
1159        if failed:
1160            if not fail_soft:
1161                thread_pool.clear()
1162                for tb in succeeded:
1163                    # Create and start a thread to stop the segment
1164                    thread_pool.wait_for_slot()
1165                    uri = tbparams[tb]['uri']
1166                    t  = self.pooled_thread(\
1167                            target=self.terminate_segment(log=log,
1168                                testbed=tb,
1169                                cert_file=self.cert_file, 
1170                                cert_pwd=self.cert_pwd,
1171                                trusted_certs=self.trusted_certs,
1172                                caller=self.call_TerminateSegment),
1173                            args=(uri, tbparams[tb]['federant']['allocID']),
1174                            name=tb,
1175                            pdata=thread_pool, trace_file=self.trace_file)
1176                    t.start()
1177                # Wait until all finish (if any are being stopped)
1178                if succeeded:
1179                    thread_pool.wait_for_all_done()
1180
1181                # release the allocations
1182                for tb in tbparams.keys():
1183                    self.release_access(tb, tbparams[tb]['allocID'],
1184                            tbparams[tb].get('uri', None))
1185                # Remove the placeholder
1186                self.state_lock.acquire()
1187                self.state[eid]['experimentStatus'] = 'failed'
1188                if self.state_filename: self.write_state()
1189                self.state_lock.release()
1190
1191                log.error("Swap in failed on %s" % ",".join(failed))
1192                return
1193        else:
1194            log.info("[start_segment]: Experiment %s active" % eid)
1195
1196
1197        # Walk up tmpdir, deleting as we go
1198        if self.cleanup:
1199            log.debug("[start_experiment]: removing %s" % tmpdir)
1200            for path, dirs, files in os.walk(tmpdir, topdown=False):
1201                for f in files:
1202                    os.remove(os.path.join(path, f))
1203                for d in dirs:
1204                    os.rmdir(os.path.join(path, d))
1205            os.rmdir(tmpdir)
1206        else:
1207            log.debug("[start_experiment]: not removing %s" % tmpdir)
1208
1209        # Insert the experiment into our state and update the disk copy
1210        self.state_lock.acquire()
1211        self.state[expid]['experimentStatus'] = 'active'
1212        self.state[eid] = self.state[expid]
1213        if self.state_filename: self.write_state()
1214        self.state_lock.release()
1215        return
1216
1217
1218    def add_kit(self, e, kit):
1219        """
1220        Add a Software object created from the list of (install, location)
1221        tuples passed as kit  to the software attribute of an object e.  We
1222        do this enough to break out the code, but it's kind of a hack to
1223        avoid changing the old tuple rep.
1224        """
1225
1226        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1227
1228        if isinstance(e.software, list): e.software.extend(s)
1229        else: e.software = s
1230
1231
1232    def create_experiment_state(self, fid, req, expid, expcert, 
1233            state='starting'):
1234        """
1235        Create the initial entry in the experiment's state.  The expid and
1236        expcert are the experiment's fedid and certifacte that represents that
1237        ID, which are installed in the experiment state.  If the request
1238        includes a suggested local name that is used if possible.  If the local
1239        name is already taken by an experiment owned by this user that has
1240        failed, it is overwritten.  Otherwise new letters are added until a
1241        valid localname is found.  The generated local name is returned.
1242        """
1243
1244        if req.has_key('experimentID') and \
1245                req['experimentID'].has_key('localname'):
1246            overwrite = False
1247            eid = req['experimentID']['localname']
1248            # If there's an old failed experiment here with the same local name
1249            # and accessible by this user, we'll overwrite it, otherwise we'll
1250            # fall through and do the collision avoidance.
1251            old_expid = self.get_experiment_fedid(eid)
1252            if old_expid and self.check_experiment_access(fid, old_expid):
1253                self.state_lock.acquire()
1254                status = self.state[eid].get('experimentStatus', None)
1255                if status and status == 'failed':
1256                    # remove the old access attribute
1257                    self.auth.unset_attribute(fid, old_expid)
1258                    overwrite = True
1259                    del self.state[eid]
1260                    del self.state[old_expid]
1261                self.state_lock.release()
1262            self.state_lock.acquire()
1263            while (self.state.has_key(eid) and not overwrite):
1264                eid += random.choice(string.ascii_letters)
1265            # Initial state
1266            self.state[eid] = {
1267                    'experimentID' : \
1268                            [ { 'localname' : eid }, {'fedid': expid } ],
1269                    'experimentStatus': state,
1270                    'experimentAccess': { 'X509' : expcert },
1271                    'owner': fid,
1272                    'log' : [],
1273                }
1274            self.state[expid] = self.state[eid]
1275            if self.state_filename: self.write_state()
1276            self.state_lock.release()
1277        else:
1278            eid = self.exp_stem
1279            for i in range(0,5):
1280                eid += random.choice(string.ascii_letters)
1281            self.state_lock.acquire()
1282            while (self.state.has_key(eid)):
1283                eid = self.exp_stem
1284                for i in range(0,5):
1285                    eid += random.choice(string.ascii_letters)
1286            # Initial state
1287            self.state[eid] = {
1288                    'experimentID' : \
1289                            [ { 'localname' : eid }, {'fedid': expid } ],
1290                    'experimentStatus': state,
1291                    'experimentAccess': { 'X509' : expcert },
1292                    'owner': fid,
1293                    'log' : [],
1294                }
1295            self.state[expid] = self.state[eid]
1296            if self.state_filename: self.write_state()
1297            self.state_lock.release()
1298
1299        return eid
1300
1301
1302    def allocate_ips_to_topo(self, top):
1303        """
1304        Add an ip4_address attribute to all the hosts in the topology, based on
1305        the shared substrates on which they sit.  An /etc/hosts file is also
1306        created and returned as a list of hostfiles entries.  We also return
1307        the allocator, because we may need to allocate IPs to portals
1308        (specifically DRAGON portals).
1309        """
1310        subs = sorted(top.substrates, 
1311                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1312                reverse=True)
1313        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1314        ifs = { }
1315        hosts = [ ]
1316
1317        for idx, s in enumerate(subs):
1318            net_size = len(s.interfaces)+2
1319
1320            a = ips.allocate(net_size)
1321            if a :
1322                base, num = a
1323                if num < net_size: 
1324                    raise service_error(service_error.internal,
1325                            "Allocator returned wrong number of IPs??")
1326            else:
1327                raise service_error(service_error.req, 
1328                        "Cannot allocate IP addresses")
1329            mask = ips.min_alloc
1330            while mask < net_size:
1331                mask *= 2
1332
1333            netmask = ((2**32-1) ^ (mask-1))
1334
1335            base += 1
1336            for i in s.interfaces:
1337                i.attribute.append(
1338                        topdl.Attribute('ip4_address', 
1339                            "%s" % ip_addr(base)))
1340                i.attribute.append(
1341                        topdl.Attribute('ip4_netmask', 
1342                            "%s" % ip_addr(int(netmask))))
1343
1344                hname = i.element.name[0]
1345                if ifs.has_key(hname):
1346                    hosts.append("%s\t%s-%s %s-%d" % \
1347                            (ip_addr(base), hname, s.name, hname,
1348                                ifs[hname]))
1349                else:
1350                    ifs[hname] = 0
1351                    hosts.append("%s\t%s-%s %s-%d %s" % \
1352                            (ip_addr(base), hname, s.name, hname,
1353                                ifs[hname], hname))
1354
1355                ifs[hname] += 1
1356                base += 1
1357        return hosts, ips
1358
1359    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1360            tbparams, masters):
1361        """
1362        Request access to the various testbeds required for this instantiation
1363        (passed in as testbeds).  User, access_user, expoert_project and master
1364        are used to construct the correct requests.  Per-testbed parameters are
1365        returned in tbparams.
1366        """
1367        for tb in testbeds:
1368            self.get_access(tb, None, tbparams, access_user, masters)
1369            allocated[tb] = 1
1370
1371    def split_topology(self, top, topo, testbeds):
1372        """
1373        Create the sub-topologies that are needed for experiment instantiation.
1374        """
1375        for tb in testbeds:
1376            topo[tb] = top.clone()
1377            # copy in for loop allows deletions from the original
1378            for e in [ e for e in topo[tb].elements]:
1379                etb = e.get_attribute('testbed')
1380                # NB: elements without a testbed attribute won't appear in any
1381                # sub topologies. 
1382                if not etb or etb != tb:
1383                    for i in e.interface:
1384                        for s in i.subs:
1385                            try:
1386                                s.interfaces.remove(i)
1387                            except ValueError:
1388                                raise service_error(service_error.internal,
1389                                        "Can't remove interface??")
1390                    topo[tb].elements.remove(e)
1391            topo[tb].make_indices()
1392
1393    def wrangle_software(self, expid, top, topo, tbparams):
1394        """
1395        Copy software out to the repository directory, allocate permissions and
1396        rewrite the segment topologies to look for the software in local
1397        places.
1398        """
1399
1400        # Copy the rpms and tarfiles to a distribution directory from
1401        # which the federants can retrieve them
1402        linkpath = "%s/software" %  expid
1403        softdir ="%s/%s" % ( self.repodir, linkpath)
1404        softmap = { }
1405        # These are in a list of tuples format (each kit).  This comprehension
1406        # unwraps them into a single list of tuples that initilaizes the set of
1407        # tuples.
1408        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1409                for p, t in l ])
1410        pkgs.update([x.location for e in top.elements \
1411                for x in e.software])
1412        try:
1413            os.makedirs(softdir)
1414        except IOError, e:
1415            raise service_error(
1416                    "Cannot create software directory: %s" % e)
1417        # The actual copying.  Everything's converted into a url for copying.
1418        for pkg in pkgs:
1419            loc = pkg
1420
1421            scheme, host, path = urlparse(loc)[0:3]
1422            dest = os.path.basename(path)
1423            if not scheme:
1424                if not loc.startswith('/'):
1425                    loc = "/%s" % loc
1426                loc = "file://%s" %loc
1427            try:
1428                u = urlopen(loc)
1429            except Exception, e:
1430                raise service_error(service_error.req, 
1431                        "Cannot open %s: %s" % (loc, e))
1432            try:
1433                f = open("%s/%s" % (softdir, dest) , "w")
1434                self.log.debug("Writing %s/%s" % (softdir,dest) )
1435                data = u.read(4096)
1436                while data:
1437                    f.write(data)
1438                    data = u.read(4096)
1439                f.close()
1440                u.close()
1441            except Exception, e:
1442                raise service_error(service_error.internal,
1443                        "Could not copy %s: %s" % (loc, e))
1444            path = re.sub("/tmp", "", linkpath)
1445            # XXX
1446            softmap[pkg] = \
1447                    "%s/%s/%s" %\
1448                    ( self.repo_url, path, dest)
1449
1450            # Allow the individual segments to access the software.
1451            for tb in tbparams.keys():
1452                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1453                        "/%s/%s" % ( path, dest))
1454
1455        # Convert the software locations in the segments into the local
1456        # copies on this host
1457        for soft in [ s for tb in topo.values() \
1458                for e in tb.elements \
1459                    if getattr(e, 'software', False) \
1460                        for s in e.software ]:
1461            if softmap.has_key(soft.location):
1462                soft.location = softmap[soft.location]
1463
1464
1465    def new_experiment(self, req, fid):
1466        """
1467        The external interface to empty initial experiment creation called from
1468        the dispatcher.
1469
1470        Creates a working directory, splits the incoming description using the
1471        splitter script and parses out the avrious subsections using the
1472        lcasses above.  Once each sub-experiment is created, use pooled threads
1473        to instantiate them and start it all up.
1474        """
1475        if not self.auth.check_attribute(fid, 'new'):
1476            raise service_error(service_error.access, "New access denied")
1477
1478        try:
1479            tmpdir = tempfile.mkdtemp(prefix="split-")
1480        except IOError:
1481            raise service_error(service_error.internal, "Cannot create tmp dir")
1482
1483        try:
1484            access_user = self.accessdb[fid]
1485        except KeyError:
1486            raise service_error(service_error.internal,
1487                    "Access map and authorizer out of sync in " + \
1488                            "new_experiment for fedid %s"  % fid)
1489
1490        pid = "dummy"
1491        gid = "dummy"
1492
1493        req = req.get('NewRequestBody', None)
1494        if not req:
1495            raise service_error(service_error.req,
1496                    "Bad request format (no NewRequestBody)")
1497
1498        # Generate an ID for the experiment (slice) and a certificate that the
1499        # allocator can use to prove they own it.  We'll ship it back through
1500        # the encrypted connection.
1501        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1502
1503        #now we're done with the tmpdir, and it should be empty
1504        if self.cleanup:
1505            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1506            os.rmdir(tmpdir)
1507        else:
1508            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1509
1510        eid = self.create_experiment_state(fid, req, expid, expcert, 
1511                state='empty')
1512
1513        # Let users touch the state
1514        self.auth.set_attribute(fid, expid)
1515        self.auth.set_attribute(expid, expid)
1516        # Override fedids can manipulate state as well
1517        for o in self.overrides:
1518            self.auth.set_attribute(o, expid)
1519
1520        rv = {
1521                'experimentID': [
1522                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1523                ],
1524                'experimentStatus': 'empty',
1525                'experimentAccess': { 'X509' : expcert }
1526            }
1527
1528        return rv
1529
1530    def create_experiment(self, req, fid):
1531        """
1532        The external interface to experiment creation called from the
1533        dispatcher.
1534
1535        Creates a working directory, splits the incoming description using the
1536        splitter script and parses out the various subsections using the
1537        classes above.  Once each sub-experiment is created, use pooled threads
1538        to instantiate them and start it all up.
1539        """
1540
1541        req = req.get('CreateRequestBody', None)
1542        if not req:
1543            raise service_error(service_error.req,
1544                    "Bad request format (no CreateRequestBody)")
1545
1546        # Get the experiment access
1547        exp = req.get('experimentID', None)
1548        if exp:
1549            if exp.has_key('fedid'):
1550                key = exp['fedid']
1551                expid = key
1552                eid = None
1553            elif exp.has_key('localname'):
1554                key = exp['localname']
1555                eid = key
1556                expid = None
1557            else:
1558                raise service_error(service_error.req, "Unknown lookup type")
1559        else:
1560            raise service_error(service_error.req, "No request?")
1561
1562        self.check_experiment_access(fid, key)
1563
1564        try:
1565            tmpdir = tempfile.mkdtemp(prefix="split-")
1566            os.mkdir(tmpdir+"/keys")
1567        except IOError:
1568            raise service_error(service_error.internal, "Cannot create tmp dir")
1569
1570        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1571        gw_secretkey_base = "fed.%s" % self.ssh_type
1572        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1573        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1574        tclfile = tmpdir + "/experiment.tcl"
1575        tbparams = { }
1576        try:
1577            access_user = self.accessdb[fid]
1578        except KeyError:
1579            raise service_error(service_error.internal,
1580                    "Access map and authorizer out of sync in " + \
1581                            "create_experiment for fedid %s"  % fid)
1582
1583        pid = "dummy"
1584        gid = "dummy"
1585
1586        # The tcl parser needs to read a file so put the content into that file
1587        descr=req.get('experimentdescription', None)
1588        if descr:
1589            file_content=descr.get('ns2description', None)
1590            if file_content:
1591                try:
1592                    f = open(tclfile, 'w')
1593                    f.write(file_content)
1594                    f.close()
1595                except IOError:
1596                    raise service_error(service_error.internal,
1597                            "Cannot write temp experiment description")
1598            else:
1599                raise service_error(service_error.req, 
1600                        "Only ns2descriptions supported")
1601        else:
1602            raise service_error(service_error.req, "No experiment description")
1603
1604        self.state_lock.acquire()
1605        if self.state.has_key(key):
1606            self.state[key]['experimentStatus'] = "starting"
1607            for e in self.state[key].get('experimentID',[]):
1608                if not expid and e.has_key('fedid'):
1609                    expid = e['fedid']
1610                elif not eid and e.has_key('localname'):
1611                    eid = e['localname']
1612        self.state_lock.release()
1613
1614        if not (eid and expid):
1615            raise service_error(service_error.internal, 
1616                    "Cannot find local experiment info!?")
1617
1618        try: 
1619            # This catches exceptions to clear the placeholder if necessary
1620            try:
1621                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1622            except ValueError:
1623                raise service_error(service_error.server_config, 
1624                        "Bad key type (%s)" % self.ssh_type)
1625
1626            # Copy the service request
1627            tb_services = [ s for s in req.get('service',[]) ]
1628            # Translate to topdl
1629            if self.splitter_url:
1630                # XXX: need remote topdl translator
1631                self.log.debug("Calling remote splitter at %s" % \
1632                        self.splitter_url)
1633                top = self.remote_ns2topdl(self.splitter_url, file_content)
1634            else:
1635                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1636                    str(self.muxmax), '-m', 'dummy']
1637
1638                tclcmd.extend([pid, gid, eid, tclfile])
1639
1640                self.log.debug("running local splitter %s", " ".join(tclcmd))
1641                # This is just fantastic.  As a side effect the parser copies
1642                # tb_compat.tcl into the current directory, so that directory
1643                # must be writable by the fedd user.  Doing this in the
1644                # temporary subdir ensures this is the case.
1645                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1646                        cwd=tmpdir)
1647                split_data = tclparser.stdout
1648
1649                top = topdl.topology_from_xml(file=split_data, top="experiment")
1650
1651            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1652             # Find the testbeds to look up
1653            testbeds = set([ a.value for e in top.elements \
1654                    for a in e.attribute \
1655                        if a.attribute == 'testbed'])
1656
1657            masters = { }           # testbeds exporting services
1658            for s in tb_services:
1659                # If this is a project_export request with the imports field
1660                # blank, fill it in.
1661                if s.get('name', '') == 'project_export':
1662                    if 'import' not in s or len(s['import']) == 0:
1663                        s['import'] = [ tb for tb in testbeds \
1664                                if tb not in s.get('export',[])]
1665                # Add the service to masters
1666                for tb in s.get('export', []):
1667                    if s.get('name', None) and s.get('import', None):
1668                        if tb not in masters:
1669                            masters[tb] = [ ]
1670
1671                        params = { }
1672                        if 'fedAttr' in s:
1673                            for a in s['fedAttr']:
1674                                params[a.get('attribute', '')] = \
1675                                        a.get('value','')
1676
1677                        masters[tb].append(federated_service(name=s['name'],
1678                                exporter=tb, importers=s.get('import',[]),
1679                                params=params))
1680                    else:
1681                        log.error('Testbed service does not have name " + \
1682                                "and importers')
1683
1684
1685            allocated = { }         # Testbeds we can access
1686            topo ={ }               # Sub topologies
1687            connInfo = { }          # Connection information
1688            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1689                    tbparams, masters)
1690
1691            self.split_topology(top, topo, testbeds)
1692
1693            # Copy configuration files into the remote file store
1694            # The config urlpath
1695            configpath = "/%s/config" % expid
1696            # The config file system location
1697            configdir ="%s%s" % ( self.repodir, configpath)
1698            try:
1699                os.makedirs(configdir)
1700            except IOError, e:
1701                raise service_error(
1702                        "Cannot create config directory: %s" % e)
1703            try:
1704                f = open("%s/hosts" % configdir, "w")
1705                f.write('\n'.join(hosts))
1706                f.close()
1707            except IOError, e:
1708                raise service_error(service_error.internal, 
1709                        "Cannot write hosts file: %s" % e)
1710            try:
1711                copy_file("%s" % gw_pubkey, "%s/%s" % \
1712                        (configdir, gw_pubkey_base))
1713                copy_file("%s" % gw_secretkey, "%s/%s" % \
1714                        (configdir, gw_secretkey_base))
1715            except IOError, e:
1716                raise service_error(service_error.internal, 
1717                        "Cannot copy keyfiles: %s" % e)
1718
1719            # Allow the individual testbeds to access the configuration files.
1720            for tb in tbparams.keys():
1721                asignee = tbparams[tb]['allocID']['fedid']
1722                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1723                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1724
1725            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1726                    self.muxmax)
1727            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1728                    connInfo, expid)
1729            # Now get access to the dynamic testbeds
1730            for k, t in topo.items():
1731                if not t.get_attribute('dynamic'):
1732                    continue
1733                tb = t.get_attribute('testbed')
1734                if tb: 
1735                    self.get_access(tb, None, tbparams, export_project, 
1736                            access_user, masters)
1737                    tbparams[k] = tbparams[tb]
1738                    del tbparams[tb]
1739                    allocated[k] = 1
1740                    store_keys = t.get_attribute('store_keys')
1741                    # Give the testbed access to keys it exports or imports
1742                    if store_keys:
1743                        for sk in store_keys.split(" "):
1744                            self.auth.set_attribute(\
1745                                    tbparams[k]['allocID']['fedid'], sk)
1746                else:
1747                    raise service_error(service_error.internal, 
1748                            "Dynamic allocation from no testbed!?")
1749
1750            self.wrangle_software(expid, top, topo, tbparams)
1751
1752            vtopo = topdl.topology_to_vtopo(top)
1753            vis = self.genviz(vtopo)
1754
1755            # save federant information
1756            for k in allocated.keys():
1757                tbparams[k]['federant'] = {
1758                        'name': [ { 'localname' : eid} ],
1759                        'allocID' : tbparams[k]['allocID'],
1760                        # XXX:
1761                        'master' : False,
1762                        'uri': tbparams[k]['uri'],
1763                    }
1764                if tbparams[k].has_key('emulab'):
1765                        tbparams[k]['federant']['emulab'] = \
1766                                tbparams[k]['emulab']
1767
1768            self.state_lock.acquire()
1769            self.state[eid]['vtopo'] = vtopo
1770            self.state[eid]['vis'] = vis
1771            self.state[expid]['federant'] = \
1772                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1773                        if tbparams[tb].has_key('federant') ]
1774            if self.state_filename: 
1775                self.write_state()
1776            self.state_lock.release()
1777        except service_error, e:
1778            # If something goes wrong in the parse (usually an access error)
1779            # clear the placeholder state.  From here on out the code delays
1780            # exceptions.  Failing at this point returns a fault to the remote
1781            # caller.
1782
1783            self.state_lock.acquire()
1784            del self.state[eid]
1785            del self.state[expid]
1786            if self.state_filename: self.write_state()
1787            self.state_lock.release()
1788            raise e
1789
1790
1791        # Start the background swapper and return the starting state.  From
1792        # here on out, the state will stick around a while.
1793
1794        # Let users touch the state
1795        self.auth.set_attribute(fid, expid)
1796        self.auth.set_attribute(expid, expid)
1797        # Override fedids can manipulate state as well
1798        for o in self.overrides:
1799            self.auth.set_attribute(o, expid)
1800
1801        # Create a logger that logs to the experiment's state object as well as
1802        # to the main log file.
1803        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1804        alloc_collector = self.list_log(self.state[eid]['log'])
1805        h = logging.StreamHandler(alloc_collector)
1806        # XXX: there should be a global one of these rather than repeating the
1807        # code.
1808        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1809                    '%d %b %y %H:%M:%S'))
1810        alloc_log.addHandler(h)
1811       
1812        attrs = [ 
1813                {
1814                    'attribute': 'ssh_pubkey', 
1815                    'value': '%s/%s/config/%s' % \
1816                            (self.repo_url, expid, gw_pubkey_base)
1817                },
1818                {
1819                    'attribute': 'ssh_secretkey', 
1820                    'value': '%s/%s/config/%s' % \
1821                            (self.repo_url, expid, gw_secretkey_base)
1822                },
1823                {
1824                    'attribute': 'hosts', 
1825                    'value': '%s/%s/config/hosts' % \
1826                            (self.repo_url, expid)
1827                },
1828                {
1829                    'attribute': 'experiment_name',
1830                    'value': eid,
1831                },
1832            ]
1833
1834        # transit and disconnected testbeds may not have a connInfo entry.
1835        # Fill in the blanks.
1836        for t in allocated.keys():
1837            if not connInfo.has_key(t):
1838                connInfo[t] = { }
1839
1840        # Start a thread to do the resource allocation
1841        t  = Thread(target=self.allocate_resources,
1842                args=(allocated, masters, eid, expid, tbparams, 
1843                    topo, tmpdir, alloc_log, alloc_collector, attrs, connInfo),
1844                name=eid)
1845        t.start()
1846
1847        rv = {
1848                'experimentID': [
1849                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1850                ],
1851                'experimentStatus': 'starting',
1852            }
1853
1854        return rv
1855   
1856    def get_experiment_fedid(self, key):
1857        """
1858        find the fedid associated with the localname key in the state database.
1859        """
1860
1861        rv = None
1862        self.state_lock.acquire()
1863        if self.state.has_key(key):
1864            if isinstance(self.state[key], dict):
1865                try:
1866                    kl = [ f['fedid'] for f in \
1867                            self.state[key]['experimentID']\
1868                                if f.has_key('fedid') ]
1869                except KeyError:
1870                    self.state_lock.release()
1871                    raise service_error(service_error.internal, 
1872                            "No fedid for experiment %s when getting "+\
1873                                    "fedid(!?)" % key)
1874                if len(kl) == 1:
1875                    rv = kl[0]
1876                else:
1877                    self.state_lock.release()
1878                    raise service_error(service_error.internal, 
1879                            "multiple fedids for experiment %s when " +\
1880                                    "getting fedid(!?)" % key)
1881            else:
1882                self.state_lock.release()
1883                raise service_error(service_error.internal, 
1884                        "Unexpected state for %s" % key)
1885        self.state_lock.release()
1886        return rv
1887
1888    def check_experiment_access(self, fid, key):
1889        """
1890        Confirm that the fid has access to the experiment.  Though a request
1891        may be made in terms of a local name, the access attribute is always
1892        the experiment's fedid.
1893        """
1894        if not isinstance(key, fedid):
1895            key = self.get_experiment_fedid(key)
1896
1897        if self.auth.check_attribute(fid, key):
1898            return True
1899        else:
1900            raise service_error(service_error.access, "Access Denied")
1901
1902
1903    def get_handler(self, path, fid):
1904        self.log.info("Get handler %s %s" % (path, fid))
1905        if self.auth.check_attribute(fid, path):
1906            return ("%s/%s" % (self.repodir, path), "application/binary")
1907        else:
1908            return (None, None)
1909
1910    def get_vtopo(self, req, fid):
1911        """
1912        Return the stored virtual topology for this experiment
1913        """
1914        rv = None
1915        state = None
1916
1917        req = req.get('VtopoRequestBody', None)
1918        if not req:
1919            raise service_error(service_error.req,
1920                    "Bad request format (no VtopoRequestBody)")
1921        exp = req.get('experiment', None)
1922        if exp:
1923            if exp.has_key('fedid'):
1924                key = exp['fedid']
1925                keytype = "fedid"
1926            elif exp.has_key('localname'):
1927                key = exp['localname']
1928                keytype = "localname"
1929            else:
1930                raise service_error(service_error.req, "Unknown lookup type")
1931        else:
1932            raise service_error(service_error.req, "No request?")
1933
1934        self.check_experiment_access(fid, key)
1935
1936        self.state_lock.acquire()
1937        if self.state.has_key(key):
1938            if self.state[key].has_key('vtopo'):
1939                rv = { 'experiment' : {keytype: key },\
1940                        'vtopo': self.state[key]['vtopo'],\
1941                    }
1942            else:
1943                state = self.state[key]['experimentStatus']
1944        self.state_lock.release()
1945
1946        if rv: return rv
1947        else: 
1948            if state:
1949                raise service_error(service_error.partial, 
1950                        "Not ready: %s" % state)
1951            else:
1952                raise service_error(service_error.req, "No such experiment")
1953
1954    def get_vis(self, req, fid):
1955        """
1956        Return the stored visualization for this experiment
1957        """
1958        rv = None
1959        state = None
1960
1961        req = req.get('VisRequestBody', None)
1962        if not req:
1963            raise service_error(service_error.req,
1964                    "Bad request format (no VisRequestBody)")
1965        exp = req.get('experiment', None)
1966        if exp:
1967            if exp.has_key('fedid'):
1968                key = exp['fedid']
1969                keytype = "fedid"
1970            elif exp.has_key('localname'):
1971                key = exp['localname']
1972                keytype = "localname"
1973            else:
1974                raise service_error(service_error.req, "Unknown lookup type")
1975        else:
1976            raise service_error(service_error.req, "No request?")
1977
1978        self.check_experiment_access(fid, key)
1979
1980        self.state_lock.acquire()
1981        if self.state.has_key(key):
1982            if self.state[key].has_key('vis'):
1983                rv =  { 'experiment' : {keytype: key },\
1984                        'vis': self.state[key]['vis'],\
1985                        }
1986            else:
1987                state = self.state[key]['experimentStatus']
1988        self.state_lock.release()
1989
1990        if rv: return rv
1991        else:
1992            if state:
1993                raise service_error(service_error.partial, 
1994                        "Not ready: %s" % state)
1995            else:
1996                raise service_error(service_error.req, "No such experiment")
1997
1998    def clean_info_response(self, rv):
1999        """
2000        Remove the information in the experiment's state object that is not in
2001        the info response.
2002        """
2003        # Remove the owner info (should always be there, but...)
2004        if rv.has_key('owner'): del rv['owner']
2005
2006        # Convert the log into the allocationLog parameter and remove the
2007        # log entry (with defensive programming)
2008        if rv.has_key('log'):
2009            rv['allocationLog'] = "".join(rv['log'])
2010            del rv['log']
2011        else:
2012            rv['allocationLog'] = ""
2013
2014        if rv['experimentStatus'] != 'active':
2015            if rv.has_key('federant'): del rv['federant']
2016        else:
2017            # remove the allocationID and uri info from each federant
2018            for f in rv.get('federant', []):
2019                if f.has_key('allocID'): del f['allocID']
2020                if f.has_key('uri'): del f['uri']
2021        return rv
2022
2023    def get_info(self, req, fid):
2024        """
2025        Return all the stored info about this experiment
2026        """
2027        rv = None
2028
2029        req = req.get('InfoRequestBody', None)
2030        if not req:
2031            raise service_error(service_error.req,
2032                    "Bad request format (no InfoRequestBody)")
2033        exp = req.get('experiment', None)
2034        if exp:
2035            if exp.has_key('fedid'):
2036                key = exp['fedid']
2037                keytype = "fedid"
2038            elif exp.has_key('localname'):
2039                key = exp['localname']
2040                keytype = "localname"
2041            else:
2042                raise service_error(service_error.req, "Unknown lookup type")
2043        else:
2044            raise service_error(service_error.req, "No request?")
2045
2046        self.check_experiment_access(fid, key)
2047
2048        # The state may be massaged by the service function that called
2049        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2050        # state.
2051        self.state_lock.acquire()
2052        if self.state.has_key(key):
2053            rv = copy.deepcopy(self.state[key])
2054        self.state_lock.release()
2055
2056        if rv:
2057            return self.clean_info_response(rv)
2058        else:
2059            raise service_error(service_error.req, "No such experiment")
2060
2061    def get_multi_info(self, req, fid):
2062        """
2063        Return all the stored info that this fedid can access
2064        """
2065        rv = { 'info': [ ] }
2066
2067        self.state_lock.acquire()
2068        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2069            try:
2070                self.check_experiment_access(fid, key)
2071            except service_error, e:
2072                if e.code == service_error.access:
2073                    continue
2074                else:
2075                    self.state_lock.release()
2076                    raise e
2077
2078            if self.state.has_key(key):
2079                e = copy.deepcopy(self.state[key])
2080                e = self.clean_info_response(e)
2081                rv['info'].append(e)
2082        self.state_lock.release()
2083        return rv
2084
2085    def terminate_experiment(self, req, fid):
2086        """
2087        Swap this experiment out on the federants and delete the shared
2088        information
2089        """
2090        tbparams = { }
2091        req = req.get('TerminateRequestBody', None)
2092        if not req:
2093            raise service_error(service_error.req,
2094                    "Bad request format (no TerminateRequestBody)")
2095        force = req.get('force', False)
2096        exp = req.get('experiment', None)
2097        if exp:
2098            if exp.has_key('fedid'):
2099                key = exp['fedid']
2100                keytype = "fedid"
2101            elif exp.has_key('localname'):
2102                key = exp['localname']
2103                keytype = "localname"
2104            else:
2105                raise service_error(service_error.req, "Unknown lookup type")
2106        else:
2107            raise service_error(service_error.req, "No request?")
2108
2109        self.check_experiment_access(fid, key)
2110
2111        dealloc_list = [ ]
2112
2113
2114        # Create a logger that logs to the dealloc_list as well as to the main
2115        # log file.
2116        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2117        h = logging.StreamHandler(self.list_log(dealloc_list))
2118        # XXX: there should be a global one of these rather than repeating the
2119        # code.
2120        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2121                    '%d %b %y %H:%M:%S'))
2122        dealloc_log.addHandler(h)
2123
2124        self.state_lock.acquire()
2125        fed_exp = self.state.get(key, None)
2126
2127        if fed_exp:
2128            # This branch of the conditional holds the lock to generate a
2129            # consistent temporary tbparams variable to deallocate experiments.
2130            # It releases the lock to do the deallocations and reacquires it to
2131            # remove the experiment state when the termination is complete.
2132
2133            # First make sure that the experiment creation is complete.
2134            status = fed_exp.get('experimentStatus', None)
2135
2136            if status:
2137                if status in ('starting', 'terminating'):
2138                    if not force:
2139                        self.state_lock.release()
2140                        raise service_error(service_error.partial, 
2141                                'Experiment still being created or destroyed')
2142                    else:
2143                        self.log.warning('Experiment in %s state ' % status + \
2144                                'being terminated by force.')
2145            else:
2146                # No status??? trouble
2147                self.state_lock.release()
2148                raise service_error(service_error.internal,
2149                        "Experiment has no status!?")
2150
2151            ids = []
2152            #  experimentID is a list of dicts that are self-describing
2153            #  identifiers.  This finds all the fedids and localnames - the
2154            #  keys of self.state - and puts them into ids.
2155            for id in fed_exp.get('experimentID', []):
2156                if id.has_key('fedid'): ids.append(id['fedid'])
2157                if id.has_key('localname'): ids.append(id['localname'])
2158
2159            # Collect the allocation/segment ids into a dict keyed by the fedid
2160            # of the allocation (or a monotonically increasing integer) that
2161            # contains a tuple of uri, aid (which is a dict...)
2162            for i, fed in enumerate(fed_exp.get('federant', [])):
2163                try:
2164                    uri = fed['uri']
2165                    aid = fed['allocID']
2166                    k = fed['allocID'].get('fedid', i)
2167                except KeyError, e:
2168                    continue
2169                tbparams[k] = (uri, aid)
2170            fed_exp['experimentStatus'] = 'terminating'
2171            if self.state_filename: self.write_state()
2172            self.state_lock.release()
2173
2174            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2175            # then completes, so we can't wait if nothing starts.  So, no
2176            # tbparams, no start.
2177            if len(tbparams) > 0:
2178                thread_pool = self.thread_pool(self.nthreads)
2179                for k in tbparams.keys():
2180                    # Create and start a thread to stop the segment
2181                    thread_pool.wait_for_slot()
2182                    uri, aid = tbparams[k]
2183                    t  = self.pooled_thread(\
2184                            target=self.terminate_segment(log=dealloc_log,
2185                                testbed=uri,
2186                                cert_file=self.cert_file, 
2187                                cert_pwd=self.cert_pwd,
2188                                trusted_certs=self.trusted_certs,
2189                                caller=self.call_TerminateSegment),
2190                            args=(uri, aid), name=k,
2191                            pdata=thread_pool, trace_file=self.trace_file)
2192                    t.start()
2193                # Wait for completions
2194                thread_pool.wait_for_all_done()
2195
2196            # release the allocations (failed experiments have done this
2197            # already, and starting experiments may be in odd states, so we
2198            # ignore errors releasing those allocations
2199            try: 
2200                for k in tbparams.keys():
2201                    # This releases access by uri
2202                    uri, aid = tbparams[k]
2203                    self.release_access(None, aid, uri=uri)
2204            except service_error, e:
2205                if status != 'failed' and not force:
2206                    raise e
2207
2208            # Remove the terminated experiment
2209            self.state_lock.acquire()
2210            for id in ids:
2211                if self.state.has_key(id): del self.state[id]
2212
2213            if self.state_filename: self.write_state()
2214            self.state_lock.release()
2215
2216            # Delete any synch points associated with this experiment.  All
2217            # synch points begin with the fedid of the experiment.
2218            fedid_keys = set(["fedid:%s" % f for f in ids \
2219                    if isinstance(f, fedid)])
2220            for k in self.synch_store.all_keys():
2221                try:
2222                    if len(k) > 45 and k[0:46] in fedid_keys:
2223                        self.synch_store.del_value(k)
2224                except synch_store.BadDeletionError:
2225                    pass
2226            self.write_store()
2227               
2228            return { 
2229                    'experiment': exp , 
2230                    'deallocationLog': "".join(dealloc_list),
2231                    }
2232        else:
2233            # Don't forget to release the lock
2234            self.state_lock.release()
2235            raise service_error(service_error.req, "No saved state")
2236
2237
2238    def GetValue(self, req, fid):
2239        """
2240        Get a value from the synchronized store
2241        """
2242        req = req.get('GetValueRequestBody', None)
2243        if not req:
2244            raise service_error(service_error.req,
2245                    "Bad request format (no GetValueRequestBody)")
2246       
2247        name = req['name']
2248        wait = req['wait']
2249        rv = { 'name': name }
2250
2251        if self.auth.check_attribute(fid, name):
2252            try:
2253                v = self.synch_store.get_value(name, wait)
2254            except synch_store.RevokedKeyError:
2255                # No more synch on this key
2256                raise service_error(service_error.federant, 
2257                        "Synch key %s revoked" % name)
2258            if v is not None:
2259                rv['value'] = v
2260            self.log.debug("[GetValue] got %s from %s" % (v, name))
2261            return rv
2262        else:
2263            raise service_error(service_error.access, "Access Denied")
2264       
2265
2266    def SetValue(self, req, fid):
2267        """
2268        Set a value in the synchronized store
2269        """
2270        req = req.get('SetValueRequestBody', None)
2271        if not req:
2272            raise service_error(service_error.req,
2273                    "Bad request format (no SetValueRequestBody)")
2274       
2275        name = req['name']
2276        v = req['value']
2277
2278        if self.auth.check_attribute(fid, name):
2279            try:
2280                self.synch_store.set_value(name, v)
2281                self.write_store()
2282                self.log.debug("[SetValue] set %s to %s" % (name, v))
2283            except synch_store.CollisionError:
2284                # Translate into a service_error
2285                raise service_error(service_error.req,
2286                        "Value already set: %s" %name)
2287            except synch_store.RevokedKeyError:
2288                # No more synch on this key
2289                raise service_error(service_error.federant, 
2290                        "Synch key %s revoked" % name)
2291            return { 'name': name, 'value': v }
2292        else:
2293            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.