source: fedd/federation/experiment_control.py @ 10a7053

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 10a7053 was 175b444, checked in by Ted Faber <faber@…>, 15 years ago

Add a new plugin for the DETER internal network, and generalize the dragon partition code to accept different networks.

  • Property mode set to 100644
File size: 80.7 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55    def __str__(self):
56        return "name %s export %s import %s params %s reqs %s" % \
57                (self.name, self.exporter, self.importers, self.params,
58                        [ (r['name'], r['visibility']) for r in self.reqs] )
59
60class experiment_control_local:
61    """
62    Control of experiments that this system can directly access.
63
64    Includes experiment creation, termination and information dissemination.
65    Thred safe.
66    """
67
68    class ssh_cmd_timeout(RuntimeError): pass
69   
70    class thread_pool:
71        """
72        A class to keep track of a set of threads all invoked for the same
73        task.  Manages the mutual exclusion of the states.
74        """
75        def __init__(self, nthreads):
76            """
77            Start a pool.
78            """
79            self.changed = Condition()
80            self.started = 0
81            self.terminated = 0
82            self.nthreads = nthreads
83
84        def acquire(self):
85            """
86            Get the pool's lock.
87            """
88            self.changed.acquire()
89
90        def release(self):
91            """
92            Release the pool's lock.
93            """
94            self.changed.release()
95
96        def wait(self, timeout = None):
97            """
98            Wait for a pool thread to start or stop.
99            """
100            self.changed.wait(timeout)
101
102        def start(self):
103            """
104            Called by a pool thread to report starting.
105            """
106            self.changed.acquire()
107            self.started += 1
108            self.changed.notifyAll()
109            self.changed.release()
110
111        def terminate(self):
112            """
113            Called by a pool thread to report finishing.
114            """
115            self.changed.acquire()
116            self.terminated += 1
117            self.changed.notifyAll()
118            self.changed.release()
119
120        def clear(self):
121            """
122            Clear all pool data.
123            """
124            self.changed.acquire()
125            self.started = 0
126            self.terminated =0
127            self.changed.notifyAll()
128            self.changed.release()
129
130        def wait_for_slot(self):
131            """
132            Wait until we have a free slot to start another pooled thread
133            """
134            self.acquire()
135            while self.started - self.terminated >= self.nthreads:
136                self.wait()
137            self.release()
138
139        def wait_for_all_done(self, timeout=None):
140            """
141            Wait until all active threads finish (and at least one has
142            started).  If a timeout is given, return after waiting that long
143            for termination.  If all threads are done (and one has started in
144            the since the last clear()) return True, otherwise False.
145            """
146            if timeout:
147                deadline = time.time() + timeout
148            self.acquire()
149            while self.started == 0 or self.started > self.terminated:
150                self.wait(timeout)
151                if timeout:
152                    if time.time() > deadline:
153                        break
154                    timeout = deadline - time.time()
155            self.release()
156            return not (self.started == 0 or self.started > self.terminated)
157
158    class pooled_thread(Thread):
159        """
160        One of a set of threads dedicated to a specific task.  Uses the
161        thread_pool class above for coordination.
162        """
163        def __init__(self, group=None, target=None, name=None, args=(), 
164                kwargs={}, pdata=None, trace_file=None):
165            Thread.__init__(self, group, target, name, args, kwargs)
166            self.rv = None          # Return value of the ops in this thread
167            self.exception = None   # Exception that terminated this thread
168            self.target=target      # Target function to run on start()
169            self.args = args        # Args to pass to target
170            self.kwargs = kwargs    # Additional kw args
171            self.pdata = pdata      # thread_pool for this class
172            # Logger for this thread
173            self.log = logging.getLogger("fedd.experiment_control")
174       
175        def run(self):
176            """
177            Emulate Thread.run, except add pool data manipulation and error
178            logging.
179            """
180            if self.pdata:
181                self.pdata.start()
182
183            if self.target:
184                try:
185                    self.rv = self.target(*self.args, **self.kwargs)
186                except service_error, s:
187                    self.exception = s
188                    self.log.error("Thread exception: %s %s" % \
189                            (s.code_string(), s.desc))
190                except:
191                    self.exception = sys.exc_info()[1]
192                    self.log.error(("Unexpected thread exception: %s" +\
193                            "Trace %s") % (self.exception,\
194                                traceback.format_exc()))
195            if self.pdata:
196                self.pdata.terminate()
197
198    call_RequestAccess = service_caller('RequestAccess')
199    call_ReleaseAccess = service_caller('ReleaseAccess')
200    call_StartSegment = service_caller('StartSegment')
201    call_TerminateSegment = service_caller('TerminateSegment')
202    call_Ns2Topdl = service_caller('Ns2Topdl')
203
204    def __init__(self, config=None, auth=None):
205        """
206        Intialize the various attributes, most from the config object
207        """
208
209        def parse_tarfile_list(tf):
210            """
211            Parse a tarfile list from the configuration.  This is a set of
212            paths and tarfiles separated by spaces.
213            """
214            rv = [ ]
215            if tf is not None:
216                tl = tf.split()
217                while len(tl) > 1:
218                    p, t = tl[0:2]
219                    del tl[0:2]
220                    rv.append((p, t))
221            return rv
222
223        self.thread_with_rv = experiment_control_local.pooled_thread
224        self.thread_pool = experiment_control_local.thread_pool
225        self.list_log = list_log.list_log
226
227        self.cert_file = config.get("experiment_control", "cert_file")
228        if self.cert_file:
229            self.cert_pwd = config.get("experiment_control", "cert_pwd")
230        else:
231            self.cert_file = config.get("globals", "cert_file")
232            self.cert_pwd = config.get("globals", "cert_pwd")
233
234        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
235                or config.get("globals", "trusted_certs")
236
237        self.repodir = config.get("experiment_control", "repodir")
238        self.repo_url = config.get("experiment_control", "repo_url", 
239                "https://users.isi.deterlab.net:23235");
240
241        self.exp_stem = "fed-stem"
242        self.log = logging.getLogger("fedd.experiment_control")
243        set_log_level(config, "experiment_control", self.log)
244        self.muxmax = 2
245        self.nthreads = 10
246        self.randomize_experiments = False
247
248        self.splitter = None
249        self.ssh_keygen = "/usr/bin/ssh-keygen"
250        self.ssh_identity_file = None
251
252
253        self.debug = config.getboolean("experiment_control", "create_debug")
254        self.cleanup = not config.getboolean("experiment_control", 
255                "leave_tmpfiles")
256        self.state_filename = config.get("experiment_control", 
257                "experiment_state")
258        self.store_filename = config.get("experiment_control", 
259                "synch_store")
260        self.store_url = config.get("experiment_control", "store_url")
261        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
262        self.fedkit = parse_tarfile_list(\
263                config.get("experiment_control", "fedkit"))
264        self.gatewaykit = parse_tarfile_list(\
265                config.get("experiment_control", "gatewaykit"))
266        accessdb_file = config.get("experiment_control", "accessdb")
267
268        self.ssh_pubkey_file = config.get("experiment_control", 
269                "ssh_pubkey_file")
270        self.ssh_privkey_file = config.get("experiment_control",
271                "ssh_privkey_file")
272        dt = config.get("experiment_control", "direct_transit")
273        self.direct_transit = [ tb.strip() for tb in dt.split(",")]
274        # NB for internal master/slave ops, not experiment setup
275        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
276
277        self.overrides = set([])
278        ovr = config.get('experiment_control', 'overrides')
279        if ovr:
280            for o in ovr.split(","):
281                o = o.strip()
282                if o.startswith('fedid:'): o = o[len('fedid:'):]
283                self.overrides.add(fedid(hexstr=o))
284
285        self.state = { }
286        self.state_lock = Lock()
287        self.tclsh = "/usr/local/bin/otclsh"
288        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
289                config.get("experiment_control", "tcl_splitter",
290                        "/usr/testbed/lib/ns2ir/parse.tcl")
291        mapdb_file = config.get("experiment_control", "mapdb")
292        self.trace_file = sys.stderr
293
294        self.def_expstart = \
295                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
296                "/tmp/federate";
297        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
298                "FEDDIR/hosts";
299        self.def_gwstart = \
300                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
301                "/tmp/bridge.log";
302        self.def_mgwstart = \
303                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
304                "/tmp/bridge.log";
305        self.def_gwimage = "FBSD61-TUNNEL2";
306        self.def_gwtype = "pc";
307        self.local_access = { }
308
309        if auth:
310            self.auth = auth
311        else:
312            self.log.error(\
313                    "[access]: No authorizer initialized, creating local one.")
314            auth = authorizer()
315
316
317        if self.ssh_pubkey_file:
318            try:
319                f = open(self.ssh_pubkey_file, 'r')
320                self.ssh_pubkey = f.read()
321                f.close()
322            except IOError:
323                raise service_error(service_error.internal,
324                        "Cannot read sshpubkey")
325        else:
326            raise service_error(service_error.internal, 
327                    "No SSH public key file?")
328
329        if not self.ssh_privkey_file:
330            raise service_error(service_error.internal, 
331                    "No SSH public key file?")
332
333
334        if mapdb_file:
335            self.read_mapdb(mapdb_file)
336        else:
337            self.log.warn("[experiment_control] No testbed map, using defaults")
338            self.tbmap = { 
339                    'deter':'https://users.isi.deterlab.net:23235',
340                    'emulab':'https://users.isi.deterlab.net:23236',
341                    'ucb':'https://users.isi.deterlab.net:23237',
342                    }
343
344        if accessdb_file:
345                self.read_accessdb(accessdb_file)
346        else:
347            raise service_error(service_error.internal,
348                    "No accessdb specified in config")
349
350        # Grab saved state.  OK to do this w/o locking because it's read only
351        # and only one thread should be in existence that can see self.state at
352        # this point.
353        if self.state_filename:
354            self.read_state()
355
356        if self.store_filename:
357            self.read_store()
358        else:
359            self.log.warning("No saved synch store")
360            self.synch_store = synch_store
361
362        # Dispatch tables
363        self.soap_services = {\
364                'New': soap_handler('New', self.new_experiment),
365                'Create': soap_handler('Create', self.create_experiment),
366                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
367                'Vis': soap_handler('Vis', self.get_vis),
368                'Info': soap_handler('Info', self.get_info),
369                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
370                'Terminate': soap_handler('Terminate', 
371                    self.terminate_experiment),
372                'GetValue': soap_handler('GetValue', self.GetValue),
373                'SetValue': soap_handler('SetValue', self.SetValue),
374        }
375
376        self.xmlrpc_services = {\
377                'New': xmlrpc_handler('New', self.new_experiment),
378                'Create': xmlrpc_handler('Create', self.create_experiment),
379                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
380                'Vis': xmlrpc_handler('Vis', self.get_vis),
381                'Info': xmlrpc_handler('Info', self.get_info),
382                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
383                'Terminate': xmlrpc_handler('Terminate',
384                    self.terminate_experiment),
385                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
386                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
387        }
388
389    # Call while holding self.state_lock
390    def write_state(self):
391        """
392        Write a new copy of experiment state after copying the existing state
393        to a backup.
394
395        State format is a simple pickling of the state dictionary.
396        """
397        if os.access(self.state_filename, os.W_OK):
398            copy_file(self.state_filename, \
399                    "%s.bak" % self.state_filename)
400        try:
401            f = open(self.state_filename, 'w')
402            pickle.dump(self.state, f)
403        except IOError, e:
404            self.log.error("Can't write file %s: %s" % \
405                    (self.state_filename, e))
406        except pickle.PicklingError, e:
407            self.log.error("Pickling problem: %s" % e)
408        except TypeError, e:
409            self.log.error("Pickling problem (TypeError): %s" % e)
410
411    @staticmethod
412    def get_alloc_ids(state):
413        """
414        Pull the fedids of the identifiers of each allocation from the
415        state.  Again, a dict dive that's best isolated.
416
417        Used by read_store and read state
418        """
419
420        return [ f['allocID']['fedid'] 
421                for f in state.get('federant',[]) \
422                    if f.has_key('allocID') and \
423                        f['allocID'].has_key('fedid')]
424
425    # Call while holding self.state_lock
426    def read_state(self):
427        """
428        Read a new copy of experiment state.  Old state is overwritten.
429
430        State format is a simple pickling of the state dictionary.
431        """
432       
433        def get_experiment_id(state):
434            """
435            Pull the fedid experimentID out of the saved state.  This is kind
436            of a gross walk through the dict.
437            """
438
439            if state.has_key('experimentID'):
440                for e in state['experimentID']:
441                    if e.has_key('fedid'):
442                        return e['fedid']
443                else:
444                    return None
445            else:
446                return None
447
448        try:
449            f = open(self.state_filename, "r")
450            self.state = pickle.load(f)
451            self.log.debug("[read_state]: Read state from %s" % \
452                    self.state_filename)
453        except IOError, e:
454            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
455                    % (self.state_filename, e))
456        except pickle.UnpicklingError, e:
457            self.log.warning(("[read_state]: No saved state: " + \
458                    "Unpickling failed: %s") % e)
459       
460        for s in self.state.values():
461            try:
462
463                eid = get_experiment_id(s)
464                if eid : 
465                    # Give the owner rights to the experiment
466                    self.auth.set_attribute(s['owner'], eid)
467                    # And holders of the eid as well
468                    self.auth.set_attribute(eid, eid)
469                    # allow overrides to control experiments as well
470                    for o in self.overrides:
471                        self.auth.set_attribute(o, eid)
472                    # Set permissions to allow reading of the software repo, if
473                    # any, as well.
474                    for a in self.get_alloc_ids(s):
475                        self.auth.set_attribute(a, 'repo/%s' % eid)
476                else:
477                    raise KeyError("No experiment id")
478            except KeyError, e:
479                self.log.warning("[read_state]: State ownership or identity " +\
480                        "misformatted in %s: %s" % (self.state_filename, e))
481
482
483    def read_accessdb(self, accessdb_file):
484        """
485        Read the mapping from fedids that can create experiments to their name
486        in the 3-level access namespace.  All will be asserted from this
487        testbed and can include the local username and porject that will be
488        asserted on their behalf by this fedd.  Each fedid is also added to the
489        authorization system with the "create" attribute.
490        """
491        self.accessdb = {}
492        # These are the regexps for parsing the db
493        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
494        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
495                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
496        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
497                "\s*->\s*(" + name_expr + ")\s*$")
498        lineno = 0
499
500        # Parse the mappings and store in self.authdb, a dict of
501        # fedid -> (proj, user)
502        try:
503            f = open(accessdb_file, "r")
504            for line in f:
505                lineno += 1
506                line = line.strip()
507                if len(line) == 0 or line.startswith('#'):
508                    continue
509                m = project_line.match(line)
510                if m:
511                    fid = fedid(hexstr=m.group(1))
512                    project, user = m.group(2,3)
513                    if not self.accessdb.has_key(fid):
514                        self.accessdb[fid] = []
515                    self.accessdb[fid].append((project, user))
516                    continue
517
518                m = user_line.match(line)
519                if m:
520                    fid = fedid(hexstr=m.group(1))
521                    project = None
522                    user = m.group(2)
523                    if not self.accessdb.has_key(fid):
524                        self.accessdb[fid] = []
525                    self.accessdb[fid].append((project, user))
526                    continue
527                self.log.warn("[experiment_control] Error parsing access " +\
528                        "db %s at line %d" %  (accessdb_file, lineno))
529        except IOError:
530            raise service_error(service_error.internal,
531                    ("Error opening/reading %s as experiment " +\
532                            "control accessdb") %  accessdb_file)
533        f.close()
534
535        # Initialize the authorization attributes
536        for fid in self.accessdb.keys():
537            self.auth.set_attribute(fid, 'create')
538            self.auth.set_attribute(fid, 'new')
539
540    def read_mapdb(self, file):
541        """
542        Read a simple colon separated list of mappings for the
543        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
544        """
545
546        self.tbmap = { }
547        lineno =0
548        try:
549            f = open(file, "r")
550            for line in f:
551                lineno += 1
552                line = line.strip()
553                if line.startswith('#') or len(line) == 0:
554                    continue
555                try:
556                    label, url = line.split(':', 1)
557                    self.tbmap[label] = url
558                except ValueError, e:
559                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
560                            "map db: %s %s" % (lineno, line, e))
561        except IOError, e:
562            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
563                    "open %s: %s" % (file, e))
564        f.close()
565
566    def read_store(self):
567        try:
568            self.synch_store = synch_store()
569            self.synch_store.load(self.store_filename)
570            self.log.debug("[read_store]: Read store from %s" % \
571                    self.store_filename)
572        except IOError, e:
573            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
574                    % (self.state_filename, e))
575            self.synch_store = synch_store()
576
577        # Set the initial permissions on data in the store.  XXX: This ad hoc
578        # authorization attribute initialization is getting out of hand.
579        for k in self.synch_store.all_keys():
580            try:
581                if k.startswith('fedid:'):
582                    fid = fedid(hexstr=k[6:46])
583                    if self.state.has_key(fid):
584                        for a in self.get_alloc_ids(self.state[fid]):
585                            self.auth.set_attribute(a, k)
586            except ValueError, e:
587                self.log.warn("Cannot deduce permissions for %s" % k)
588
589
590    def write_store(self):
591        """
592        Write a new copy of synch_store after writing current state
593        to a backup.  We use the internal synch_store pickle method to avoid
594        incinsistent data.
595
596        State format is a simple pickling of the store.
597        """
598        if os.access(self.store_filename, os.W_OK):
599            copy_file(self.store_filename, \
600                    "%s.bak" % self.store_filename)
601        try:
602            self.synch_store.save(self.store_filename)
603        except IOError, e:
604            self.log.error("Can't write file %s: %s" % \
605                    (self.store_filename, e))
606        except TypeError, e:
607            self.log.error("Pickling problem (TypeError): %s" % e)
608
609       
610    def generate_ssh_keys(self, dest, type="rsa" ):
611        """
612        Generate a set of keys for the gateways to use to talk.
613
614        Keys are of type type and are stored in the required dest file.
615        """
616        valid_types = ("rsa", "dsa")
617        t = type.lower();
618        if t not in valid_types: raise ValueError
619        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
620
621        try:
622            trace = open("/dev/null", "w")
623        except IOError:
624            raise service_error(service_error.internal,
625                    "Cannot open /dev/null??");
626
627        # May raise CalledProcessError
628        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
629        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
630        if rv != 0:
631            raise service_error(service_error.internal, 
632                    "Cannot generate nonce ssh keys.  %s return code %d" \
633                            % (self.ssh_keygen, rv))
634
635    def gentopo(self, str):
636        """
637        Generate the topology dtat structure from the splitter's XML
638        representation of it.
639
640        The topology XML looks like:
641            <experiment>
642                <nodes>
643                    <node><vname></vname><ips>ip1:ip2</ips></node>
644                </nodes>
645                <lans>
646                    <lan>
647                        <vname></vname><vnode></vnode><ip></ip>
648                        <bandwidth></bandwidth><member>node:port</member>
649                    </lan>
650                </lans>
651        """
652        class topo_parse:
653            """
654            Parse the topology XML and create the dats structure.
655            """
656            def __init__(self):
657                # Typing of the subelements for data conversion
658                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
659                self.int_subelements = ( 'bandwidth',)
660                self.float_subelements = ( 'delay',)
661                # The final data structure
662                self.nodes = [ ]
663                self.lans =  [ ]
664                self.topo = { \
665                        'node': self.nodes,\
666                        'lan' : self.lans,\
667                    }
668                self.element = { }  # Current element being created
669                self.chars = ""     # Last text seen
670
671            def end_element(self, name):
672                # After each sub element the contents is added to the current
673                # element or to the appropriate list.
674                if name == 'node':
675                    self.nodes.append(self.element)
676                    self.element = { }
677                elif name == 'lan':
678                    self.lans.append(self.element)
679                    self.element = { }
680                elif name in self.str_subelements:
681                    self.element[name] = self.chars
682                    self.chars = ""
683                elif name in self.int_subelements:
684                    self.element[name] = int(self.chars)
685                    self.chars = ""
686                elif name in self.float_subelements:
687                    self.element[name] = float(self.chars)
688                    self.chars = ""
689
690            def found_chars(self, data):
691                self.chars += data.rstrip()
692
693
694        tp = topo_parse();
695        parser = xml.parsers.expat.ParserCreate()
696        parser.EndElementHandler = tp.end_element
697        parser.CharacterDataHandler = tp.found_chars
698
699        parser.Parse(str)
700
701        return tp.topo
702       
703
704    def genviz(self, topo):
705        """
706        Generate the visualization the virtual topology
707        """
708
709        neato = "/usr/local/bin/neato"
710        # These are used to parse neato output and to create the visualization
711        # file.
712        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
713        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
714                "%s</type></node>"
715
716        try:
717            # Node names
718            nodes = [ n['vname'] for n in topo['node'] ]
719            topo_lans = topo['lan']
720        except KeyError, e:
721            raise service_error(service_error.internal, "Bad topology: %s" %e)
722
723        lans = { }
724        links = { }
725
726        # Walk through the virtual topology, organizing the connections into
727        # 2-node connections (links) and more-than-2-node connections (lans).
728        # When a lan is created, it's added to the list of nodes (there's a
729        # node in the visualization for the lan).
730        for l in topo_lans:
731            if links.has_key(l['vname']):
732                if len(links[l['vname']]) < 2:
733                    links[l['vname']].append(l['vnode'])
734                else:
735                    nodes.append(l['vname'])
736                    lans[l['vname']] = links[l['vname']]
737                    del links[l['vname']]
738                    lans[l['vname']].append(l['vnode'])
739            elif lans.has_key(l['vname']):
740                lans[l['vname']].append(l['vnode'])
741            else:
742                links[l['vname']] = [ l['vnode'] ]
743
744
745        # Open up a temporary file for dot to turn into a visualization
746        try:
747            df, dotname = tempfile.mkstemp()
748            dotfile = os.fdopen(df, 'w')
749        except IOError:
750            raise service_error(service_error.internal,
751                    "Failed to open file in genviz")
752
753        try:
754            dnull = open('/dev/null', 'w')
755        except IOError:
756            service_error(service_error.internal,
757                    "Failed to open /dev/null in genviz")
758
759        # Generate a dot/neato input file from the links, nodes and lans
760        try:
761            print >>dotfile, "graph G {"
762            for n in nodes:
763                print >>dotfile, '\t"%s"' % n
764            for l in links.keys():
765                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
766            for l in lans.keys():
767                for n in lans[l]:
768                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
769            print >>dotfile, "}"
770            dotfile.close()
771        except TypeError:
772            raise service_error(service_error.internal,
773                    "Single endpoint link in vtopo")
774        except IOError:
775            raise service_error(service_error.internal, "Cannot write dot file")
776
777        # Use dot to create a visualization
778        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
779                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
780                close_fds=True)
781        dnull.close()
782
783        # Translate dot to vis format
784        vis_nodes = [ ]
785        vis = { 'node': vis_nodes }
786        for line in dot.stdout:
787            m = vis_re.match(line)
788            if m:
789                vn = m.group(1)
790                vis_node = {'name': vn, \
791                        'x': float(m.group(2)),\
792                        'y' : float(m.group(3)),\
793                    }
794                if vn in links.keys() or vn in lans.keys():
795                    vis_node['type'] = 'lan'
796                else:
797                    vis_node['type'] = 'node'
798                vis_nodes.append(vis_node)
799        rv = dot.wait()
800
801        os.remove(dotname)
802        if rv == 0 : return vis
803        else: return None
804
805    def get_access(self, tb, nodes, tbparam, access_user, masters):
806        """
807        Get access to testbed through fedd and set the parameters for that tb
808        """
809        def get_export_project(svcs):
810            """
811            Look through for the list of federated_service for this testbed
812            objects for a project_export service, and extract the project
813            parameter.
814            """
815
816            pe = [s for s in svcs if s.name=='project_export']
817            if len(pe) == 1:
818                return pe[0].params.get('project', None)
819            elif len(pe) == 0:
820                return None
821            else:
822                raise service_error(service_error.req,
823                        "More than one project export is not supported")
824
825        uri = self.tbmap.get(testbed_base(tb), None)
826        if not uri:
827            raise service_error(service_error.server_config, 
828                    "Unknown testbed: %s" % tb)
829
830        export_svcs = masters.get(tb,[])
831        import_svcs = [ s for m in masters.values() \
832                for s in m \
833                    if tb in s.importers ]
834
835        export_project = get_export_project(export_svcs)
836
837        # Tweak search order so that if there are entries in access_user that
838        # have a project matching the export project, we try them first
839        if export_project: 
840            access_sequence = [ (p, u) for p, u in access_user \
841                    if p == export_project] 
842            access_sequence.extend([(p, u) for p, u in access_user \
843                    if p != export_project]) 
844        else: 
845            access_sequence = access_user
846
847        for p, u in access_sequence: 
848            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
849                    "to %s") %  ((p or "None"), u, uri))
850
851            if p:
852                # Request with user and project specified
853                req = {\
854                        'destinationTestbed' : { 'uri' : uri },
855                        'credential': [ "project: %s" % p, "user: %s"  % u],
856                        'allocID' : { 'localname': 'test' },
857                    }
858            else:
859                # Request with only user specified
860                req = {\
861                        'destinationTestbed' : { 'uri' : uri },
862                        'credential': [ 'user: %s' % u ],
863                        'allocID' : { 'localname': 'test' },
864                    }
865
866            # Make the service request from the services we're importing and
867            # exporting.  Keep track of the export request ids so we can
868            # collect the resulting info from the access response.
869            e_keys = { }
870            if import_svcs or export_svcs:
871                req['service'] = [ ]
872
873                for i, s in enumerate(import_svcs):
874                    idx = 'import%d' % i
875                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
876                    if s.params:
877                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
878                                for k, v in s.params.items()]
879                    req['service'].append(sr)
880
881                for i, s in enumerate(export_svcs):
882                    idx = 'export%d' % i
883                    e_keys[idx] = s
884                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
885                    if s.params:
886                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
887                                for k, v in s.params.items()]
888                    req['service'].append(sr)
889
890            # node resources if any
891            if nodes != None and len(nodes) > 0:
892                rnodes = [ ]
893                for n in nodes:
894                    rn = { }
895                    image, hw, count = n.split(":")
896                    if image: rn['image'] = [ image ]
897                    if hw: rn['hardware'] = [ hw ]
898                    if count and int(count) >0 : rn['count'] = int(count)
899                    rnodes.append(rn)
900                req['resources']= { }
901                req['resources']['node'] = rnodes
902
903            try:
904                if self.local_access.has_key(uri):
905                    # Local access call
906                    req = { 'RequestAccessRequestBody' : req }
907                    r = self.local_access[uri].RequestAccess(req, 
908                            fedid(file=self.cert_file))
909                    r = { 'RequestAccessResponseBody' : r }
910                else:
911                    r = self.call_RequestAccess(uri, req, 
912                            self.cert_file, self.cert_pwd, self.trusted_certs)
913            except service_error, e:
914                if e.code == service_error.access:
915                    self.log.debug("[get_access] Access denied")
916                    r = None
917                    continue
918                else:
919                    raise e
920
921            if r.has_key('RequestAccessResponseBody'):
922                # Through to here we have a valid response, not a fault.
923                # Access denied is a fault, so something better or worse than
924                # access denied has happened.
925                r = r['RequestAccessResponseBody']
926                self.log.debug("[get_access] Access granted")
927                break
928            else:
929                raise service_error(service_error.protocol,
930                        "Bad proxy response")
931       
932        if not r:
933            raise service_error(service_error.access, 
934                    "Access denied by %s (%s)" % (tb, uri))
935
936        tbparam[tb] = { 
937                "allocID" : r['allocID'],
938                "uri": uri,
939                }
940
941        # Collect the responses corresponding to the services this testbed
942        # exports.  These will be the service requests that we will include in
943        # the start segment requests (with appropriate visibility values) to
944        # import and export the segments.
945        for s in r.get('service', []):
946            id = s.get('id', None)
947            if id and id in e_keys:
948                e_keys[id].reqs.append(s)
949
950        # Add attributes to parameter space.  We don't allow attributes to
951        # overlay any parameters already installed.
952        for a in r.get('fedAttr', []):
953            try:
954                if a['attribute'] and \
955                        isinstance(a['attribute'], basestring)\
956                        and not tbparam[tb].has_key(a['attribute'].lower()):
957                    tbparam[tb][a['attribute'].lower()] = a['value']
958            except KeyError:
959                self.log.error("Bad attribute in response: %s" % a)
960
961    def release_access(self, tb, aid, uri=None):
962        """
963        Release access to testbed through fedd
964        """
965
966        if not uri:
967            uri = self.tbmap.get(tb, None)
968        if not uri:
969            raise service_error(service_error.server_config, 
970                    "Unknown testbed: %s" % tb)
971
972        if self.local_access.has_key(uri):
973            resp = self.local_access[uri].ReleaseAccess(\
974                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
975                    fedid(file=self.cert_file))
976            resp = { 'ReleaseAccessResponseBody': resp } 
977        else:
978            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
979                    self.cert_file, self.cert_pwd, self.trusted_certs)
980
981        # better error coding
982
983    def remote_ns2topdl(self, uri, desc):
984
985        req = {
986                'description' : { 'ns2description': desc },
987            }
988
989        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
990                self.trusted_certs)
991
992        if r.has_key('Ns2TopdlResponseBody'):
993            r = r['Ns2TopdlResponseBody']
994            ed = r.get('experimentdescription', None)
995            if ed.has_key('topdldescription'):
996                return topdl.Topology(**ed['topdldescription'])
997            else:
998                raise service_error(service_error.protocol, 
999                        "Bad splitter response (no output)")
1000        else:
1001            raise service_error(service_error.protocol, "Bad splitter response")
1002
1003    class start_segment:
1004        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1005                cert_pwd=None, trusted_certs=None, caller=None,
1006                log_collector=None):
1007            self.log = log
1008            self.debug = debug
1009            self.cert_file = cert_file
1010            self.cert_pwd = cert_pwd
1011            self.trusted_certs = None
1012            self.caller = caller
1013            self.testbed = testbed
1014            self.log_collector = log_collector
1015            self.response = None
1016            self.node = { }
1017
1018        def make_map(self, resp):
1019            if 'segmentdescription' in resp and \
1020                    'topdldescription' in resp['segmentdescription']:
1021                top = topdl.Topology(\
1022                        **resp['segmentdescription']['topdldescription'])
1023                for e in [e for e in top.elements \
1024                        if isinstance(e, topdl.Computer)]:
1025                    hn = e.get_attribute('hostname')
1026                    if hn:
1027                        for n in e.name:
1028                            self.node[n] = hn
1029
1030        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1031            req = {
1032                    'allocID': { 'fedid' : aid }, 
1033                    'segmentdescription': { 
1034                        'topdldescription': topo.to_dict(),
1035                    },
1036                }
1037
1038            if connInfo:
1039                req['connection'] = connInfo
1040
1041            import_svcs = [ s for m in masters.values() \
1042                    for s in m if self.testbed in s.importers]
1043
1044            if import_svcs or self.testbed in masters:
1045                req['service'] = []
1046
1047            for s in import_svcs:
1048                for r in s.reqs:
1049                    sr = copy.deepcopy(r)
1050                    sr['visibility'] = 'import';
1051                    req['service'].append(sr)
1052
1053            for s in masters.get(self.testbed, []):
1054                for r in s.reqs:
1055                    sr = copy.deepcopy(r)
1056                    sr['visibility'] = 'export';
1057                    req['service'].append(sr)
1058
1059            if attrs:
1060                req['fedAttr'] = attrs
1061
1062            try:
1063                self.log.debug("Calling StartSegment at %s " % uri)
1064                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1065                        self.trusted_certs)
1066                if r.has_key('StartSegmentResponseBody'):
1067                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1068                            None)
1069                    if lval and self.log_collector:
1070                        for line in  lval.splitlines(True):
1071                            self.log_collector.write(line)
1072                    self.make_map(r['StartSegmentResponseBody'])
1073                    self.response = r
1074                else:
1075                    raise service_error(service_error.internal, 
1076                            "Bad response!?: %s" %r)
1077                return True
1078            except service_error, e:
1079                self.log.error("Start segment failed on %s: %s" % \
1080                        (self.testbed, e))
1081                return False
1082
1083
1084
1085    class terminate_segment:
1086        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1087                cert_pwd=None, trusted_certs=None, caller=None):
1088            self.log = log
1089            self.debug = debug
1090            self.cert_file = cert_file
1091            self.cert_pwd = cert_pwd
1092            self.trusted_certs = None
1093            self.caller = caller
1094            self.testbed = testbed
1095
1096        def __call__(self, uri, aid ):
1097            req = {
1098                    'allocID': aid , 
1099                }
1100            try:
1101                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1102                        self.trusted_certs)
1103                return True
1104            except service_error, e:
1105                self.log.error("Terminate segment failed on %s: %s" % \
1106                        (self.testbed, e))
1107                return False
1108   
1109
1110    def allocate_resources(self, allocated, masters, eid, expid, 
1111            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1112            attrs=None, connInfo={}):
1113
1114        started = { }           # Testbeds where a sub-experiment started
1115                                # successfully
1116
1117        # XXX
1118        fail_soft = False
1119
1120        log = alloc_log or self.log
1121
1122        thread_pool = self.thread_pool(self.nthreads)
1123        threads = [ ]
1124        starters = [ ]
1125
1126        for tb in allocated.keys():
1127            # Create and start a thread to start the segment, and save it
1128            # to get the return value later
1129            tb_attrs = copy.copy(attrs)
1130            thread_pool.wait_for_slot()
1131            uri = tbparams[tb].get('uri', \
1132                    self.tbmap.get(testbed_base(tb), None))
1133            base, suffix = split_testbed(tb)
1134            if suffix:
1135                tb_attrs.append({'attribute': 'experiment_name', 
1136                    'value': "%s-%s" % (eid, suffix)})
1137            else:
1138                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1139            if not uri:
1140                raise service_error(service_error.internal, 
1141                        "Unknown testbed %s !?" % tb)
1142
1143            if tbparams[tb].has_key('allocID') and \
1144                    tbparams[tb]['allocID'].has_key('fedid'):
1145                aid = tbparams[tb]['allocID']['fedid']
1146            else:
1147                raise service_error(service_error.internal, 
1148                        "No alloc id for testbed %s !?" % tb)
1149
1150            s = self.start_segment(log=log, debug=self.debug,
1151                    testbed=tb, cert_file=self.cert_file,
1152                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1153                    caller=self.call_StartSegment,
1154                    log_collector=log_collector)
1155            starters.append(s)
1156            t  = self.pooled_thread(\
1157                    target=s, name=tb,
1158                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1159                    pdata=thread_pool, trace_file=self.trace_file)
1160            threads.append(t)
1161            t.start()
1162
1163        # Wait until all finish (keep pinging the log, though)
1164        mins = 0
1165        revoked = False
1166        while not thread_pool.wait_for_all_done(60.0):
1167            mins += 1
1168            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1169                    % mins)
1170            if not revoked and \
1171                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1172                # a testbed has failed.  Revoke this experiment's
1173                # synchronizarion values so that sub experiments will not
1174                # deadlock waiting for synchronization that will never happen
1175                self.log.info("A subexperiment has failed to swap in, " + \
1176                        "revoking synch keys")
1177                var_key = "fedid:%s" % expid
1178                for k in self.synch_store.all_keys():
1179                    if len(k) > 45 and k[0:46] == var_key:
1180                        self.synch_store.revoke_key(k)
1181                revoked = True
1182
1183        failed = [ t.getName() for t in threads if not t.rv ]
1184        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1185
1186        # If one failed clean up, unless fail_soft is set
1187        if failed:
1188            if not fail_soft:
1189                thread_pool.clear()
1190                for tb in succeeded:
1191                    # Create and start a thread to stop the segment
1192                    thread_pool.wait_for_slot()
1193                    uri = tbparams[tb]['uri']
1194                    t  = self.pooled_thread(\
1195                            target=self.terminate_segment(log=log,
1196                                testbed=tb,
1197                                cert_file=self.cert_file, 
1198                                cert_pwd=self.cert_pwd,
1199                                trusted_certs=self.trusted_certs,
1200                                caller=self.call_TerminateSegment),
1201                            args=(uri, tbparams[tb]['federant']['allocID']),
1202                            name=tb,
1203                            pdata=thread_pool, trace_file=self.trace_file)
1204                    t.start()
1205                # Wait until all finish (if any are being stopped)
1206                if succeeded:
1207                    thread_pool.wait_for_all_done()
1208
1209                # release the allocations
1210                for tb in tbparams.keys():
1211                    self.release_access(tb, tbparams[tb]['allocID'],
1212                            tbparams[tb].get('uri', None))
1213                # Remove the placeholder
1214                self.state_lock.acquire()
1215                self.state[eid]['experimentStatus'] = 'failed'
1216                if self.state_filename: self.write_state()
1217                self.state_lock.release()
1218
1219                log.error("Swap in failed on %s" % ",".join(failed))
1220                return
1221        else:
1222            # Walk through the successes and gather the virtual to physical
1223            # mapping.
1224            node = { }
1225            for s in starters:
1226                node.update(s.node)
1227            # Assigng the mapping as a hostname attribute
1228            for e in [ e for e in top.elements \
1229                    if isinstance(e, topdl.Computer)]:
1230                for n in e.name:
1231                    if n in node:
1232                        e.set_attribute('hostname', node[n])
1233            log.info("[start_segment]: Experiment %s active" % eid)
1234
1235
1236        # Walk up tmpdir, deleting as we go
1237        if self.cleanup:
1238            log.debug("[start_experiment]: removing %s" % tmpdir)
1239            for path, dirs, files in os.walk(tmpdir, topdown=False):
1240                for f in files:
1241                    os.remove(os.path.join(path, f))
1242                for d in dirs:
1243                    os.rmdir(os.path.join(path, d))
1244            os.rmdir(tmpdir)
1245        else:
1246            log.debug("[start_experiment]: not removing %s" % tmpdir)
1247
1248        # Insert the experiment into our state and update the disk copy.
1249        self.state_lock.acquire()
1250        self.state[expid]['experimentStatus'] = 'active'
1251        self.state[eid] = self.state[expid]
1252        self.state[eid]['experimentdescription']['topdldescription'] = \
1253                top.to_dict()
1254        if self.state_filename: self.write_state()
1255        self.state_lock.release()
1256        return
1257
1258
1259    def add_kit(self, e, kit):
1260        """
1261        Add a Software object created from the list of (install, location)
1262        tuples passed as kit  to the software attribute of an object e.  We
1263        do this enough to break out the code, but it's kind of a hack to
1264        avoid changing the old tuple rep.
1265        """
1266
1267        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1268
1269        if isinstance(e.software, list): e.software.extend(s)
1270        else: e.software = s
1271
1272
1273    def create_experiment_state(self, fid, req, expid, expcert,
1274            state='starting'):
1275        """
1276        Create the initial entry in the experiment's state.  The expid and
1277        expcert are the experiment's fedid and certifacte that represents that
1278        ID, which are installed in the experiment state.  If the request
1279        includes a suggested local name that is used if possible.  If the local
1280        name is already taken by an experiment owned by this user that has
1281        failed, it is overwritten.  Otherwise new letters are added until a
1282        valid localname is found.  The generated local name is returned.
1283        """
1284
1285        if req.has_key('experimentID') and \
1286                req['experimentID'].has_key('localname'):
1287            overwrite = False
1288            eid = req['experimentID']['localname']
1289            # If there's an old failed experiment here with the same local name
1290            # and accessible by this user, we'll overwrite it, otherwise we'll
1291            # fall through and do the collision avoidance.
1292            old_expid = self.get_experiment_fedid(eid)
1293            if old_expid and self.check_experiment_access(fid, old_expid):
1294                self.state_lock.acquire()
1295                status = self.state[eid].get('experimentStatus', None)
1296                if status and status == 'failed':
1297                    # remove the old access attribute
1298                    self.auth.unset_attribute(fid, old_expid)
1299                    overwrite = True
1300                    del self.state[eid]
1301                    del self.state[old_expid]
1302                self.state_lock.release()
1303            self.state_lock.acquire()
1304            while (self.state.has_key(eid) and not overwrite):
1305                eid += random.choice(string.ascii_letters)
1306            # Initial state
1307            self.state[eid] = {
1308                    'experimentID' : \
1309                            [ { 'localname' : eid }, {'fedid': expid } ],
1310                    'experimentStatus': state,
1311                    'experimentAccess': { 'X509' : expcert },
1312                    'owner': fid,
1313                    'log' : [],
1314                }
1315            self.state[expid] = self.state[eid]
1316            if self.state_filename: self.write_state()
1317            self.state_lock.release()
1318        else:
1319            eid = self.exp_stem
1320            for i in range(0,5):
1321                eid += random.choice(string.ascii_letters)
1322            self.state_lock.acquire()
1323            while (self.state.has_key(eid)):
1324                eid = self.exp_stem
1325                for i in range(0,5):
1326                    eid += random.choice(string.ascii_letters)
1327            # Initial state
1328            self.state[eid] = {
1329                    'experimentID' : \
1330                            [ { 'localname' : eid }, {'fedid': expid } ],
1331                    'experimentStatus': state,
1332                    'experimentAccess': { 'X509' : expcert },
1333                    'owner': fid,
1334                    'log' : [],
1335                }
1336            self.state[expid] = self.state[eid]
1337            if self.state_filename: self.write_state()
1338            self.state_lock.release()
1339
1340        return eid
1341
1342
1343    def allocate_ips_to_topo(self, top):
1344        """
1345        Add an ip4_address attribute to all the hosts in the topology, based on
1346        the shared substrates on which they sit.  An /etc/hosts file is also
1347        created and returned as a list of hostfiles entries.  We also return
1348        the allocator, because we may need to allocate IPs to portals
1349        (specifically DRAGON portals).
1350        """
1351        subs = sorted(top.substrates, 
1352                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1353                reverse=True)
1354        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1355        ifs = { }
1356        hosts = [ ]
1357
1358        for idx, s in enumerate(subs):
1359            net_size = len(s.interfaces)+2
1360
1361            a = ips.allocate(net_size)
1362            if a :
1363                base, num = a
1364                if num < net_size: 
1365                    raise service_error(service_error.internal,
1366                            "Allocator returned wrong number of IPs??")
1367            else:
1368                raise service_error(service_error.req, 
1369                        "Cannot allocate IP addresses")
1370            mask = ips.min_alloc
1371            while mask < net_size:
1372                mask *= 2
1373
1374            netmask = ((2**32-1) ^ (mask-1))
1375
1376            base += 1
1377            for i in s.interfaces:
1378                i.attribute.append(
1379                        topdl.Attribute('ip4_address', 
1380                            "%s" % ip_addr(base)))
1381                i.attribute.append(
1382                        topdl.Attribute('ip4_netmask', 
1383                            "%s" % ip_addr(int(netmask))))
1384
1385                hname = i.element.name[0]
1386                if ifs.has_key(hname):
1387                    hosts.append("%s\t%s-%s %s-%d" % \
1388                            (ip_addr(base), hname, s.name, hname,
1389                                ifs[hname]))
1390                else:
1391                    ifs[hname] = 0
1392                    hosts.append("%s\t%s-%s %s-%d %s" % \
1393                            (ip_addr(base), hname, s.name, hname,
1394                                ifs[hname], hname))
1395
1396                ifs[hname] += 1
1397                base += 1
1398        return hosts, ips
1399
1400    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1401            tbparams, masters):
1402        """
1403        Request access to the various testbeds required for this instantiation
1404        (passed in as testbeds).  User, access_user, expoert_project and master
1405        are used to construct the correct requests.  Per-testbed parameters are
1406        returned in tbparams.
1407        """
1408        for tb in testbeds:
1409            self.get_access(tb, None, tbparams, access_user, masters)
1410            allocated[tb] = 1
1411
1412    def split_topology(self, top, topo, testbeds):
1413        """
1414        Create the sub-topologies that are needed for experiment instantiation.
1415        """
1416        for tb in testbeds:
1417            topo[tb] = top.clone()
1418            # copy in for loop allows deletions from the original
1419            for e in [ e for e in topo[tb].elements]:
1420                etb = e.get_attribute('testbed')
1421                # NB: elements without a testbed attribute won't appear in any
1422                # sub topologies. 
1423                if not etb or etb != tb:
1424                    for i in e.interface:
1425                        for s in i.subs:
1426                            try:
1427                                s.interfaces.remove(i)
1428                            except ValueError:
1429                                raise service_error(service_error.internal,
1430                                        "Can't remove interface??")
1431                    topo[tb].elements.remove(e)
1432            topo[tb].make_indices()
1433
1434    def wrangle_software(self, expid, top, topo, tbparams):
1435        """
1436        Copy software out to the repository directory, allocate permissions and
1437        rewrite the segment topologies to look for the software in local
1438        places.
1439        """
1440
1441        # Copy the rpms and tarfiles to a distribution directory from
1442        # which the federants can retrieve them
1443        linkpath = "%s/software" %  expid
1444        softdir ="%s/%s" % ( self.repodir, linkpath)
1445        softmap = { }
1446        # These are in a list of tuples format (each kit).  This comprehension
1447        # unwraps them into a single list of tuples that initilaizes the set of
1448        # tuples.
1449        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1450                for p, t in l ])
1451        pkgs.update([x.location for e in top.elements \
1452                for x in e.software])
1453        try:
1454            os.makedirs(softdir)
1455        except IOError, e:
1456            raise service_error(
1457                    "Cannot create software directory: %s" % e)
1458        # The actual copying.  Everything's converted into a url for copying.
1459        for pkg in pkgs:
1460            loc = pkg
1461
1462            scheme, host, path = urlparse(loc)[0:3]
1463            dest = os.path.basename(path)
1464            if not scheme:
1465                if not loc.startswith('/'):
1466                    loc = "/%s" % loc
1467                loc = "file://%s" %loc
1468            try:
1469                u = urlopen(loc)
1470            except Exception, e:
1471                raise service_error(service_error.req, 
1472                        "Cannot open %s: %s" % (loc, e))
1473            try:
1474                f = open("%s/%s" % (softdir, dest) , "w")
1475                self.log.debug("Writing %s/%s" % (softdir,dest) )
1476                data = u.read(4096)
1477                while data:
1478                    f.write(data)
1479                    data = u.read(4096)
1480                f.close()
1481                u.close()
1482            except Exception, e:
1483                raise service_error(service_error.internal,
1484                        "Could not copy %s: %s" % (loc, e))
1485            path = re.sub("/tmp", "", linkpath)
1486            # XXX
1487            softmap[pkg] = \
1488                    "%s/%s/%s" %\
1489                    ( self.repo_url, path, dest)
1490
1491            # Allow the individual segments to access the software.
1492            for tb in tbparams.keys():
1493                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1494                        "/%s/%s" % ( path, dest))
1495
1496        # Convert the software locations in the segments into the local
1497        # copies on this host
1498        for soft in [ s for tb in topo.values() \
1499                for e in tb.elements \
1500                    if getattr(e, 'software', False) \
1501                        for s in e.software ]:
1502            if softmap.has_key(soft.location):
1503                soft.location = softmap[soft.location]
1504
1505
1506    def new_experiment(self, req, fid):
1507        """
1508        The external interface to empty initial experiment creation called from
1509        the dispatcher.
1510
1511        Creates a working directory, splits the incoming description using the
1512        splitter script and parses out the avrious subsections using the
1513        lcasses above.  Once each sub-experiment is created, use pooled threads
1514        to instantiate them and start it all up.
1515        """
1516        if not self.auth.check_attribute(fid, 'new'):
1517            raise service_error(service_error.access, "New access denied")
1518
1519        try:
1520            tmpdir = tempfile.mkdtemp(prefix="split-")
1521        except IOError:
1522            raise service_error(service_error.internal, "Cannot create tmp dir")
1523
1524        try:
1525            access_user = self.accessdb[fid]
1526        except KeyError:
1527            raise service_error(service_error.internal,
1528                    "Access map and authorizer out of sync in " + \
1529                            "new_experiment for fedid %s"  % fid)
1530
1531        pid = "dummy"
1532        gid = "dummy"
1533
1534        req = req.get('NewRequestBody', None)
1535        if not req:
1536            raise service_error(service_error.req,
1537                    "Bad request format (no NewRequestBody)")
1538
1539        # Generate an ID for the experiment (slice) and a certificate that the
1540        # allocator can use to prove they own it.  We'll ship it back through
1541        # the encrypted connection.
1542        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1543
1544        #now we're done with the tmpdir, and it should be empty
1545        if self.cleanup:
1546            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1547            os.rmdir(tmpdir)
1548        else:
1549            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1550
1551        eid = self.create_experiment_state(fid, req, expid, expcert, 
1552                state='empty')
1553
1554        # Let users touch the state
1555        self.auth.set_attribute(fid, expid)
1556        self.auth.set_attribute(expid, expid)
1557        # Override fedids can manipulate state as well
1558        for o in self.overrides:
1559            self.auth.set_attribute(o, expid)
1560
1561        rv = {
1562                'experimentID': [
1563                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1564                ],
1565                'experimentStatus': 'empty',
1566                'experimentAccess': { 'X509' : expcert }
1567            }
1568
1569        return rv
1570
1571    def create_experiment(self, req, fid):
1572        """
1573        The external interface to experiment creation called from the
1574        dispatcher.
1575
1576        Creates a working directory, splits the incoming description using the
1577        splitter script and parses out the various subsections using the
1578        classes above.  Once each sub-experiment is created, use pooled threads
1579        to instantiate them and start it all up.
1580        """
1581
1582        req = req.get('CreateRequestBody', None)
1583        if not req:
1584            raise service_error(service_error.req,
1585                    "Bad request format (no CreateRequestBody)")
1586
1587        # Get the experiment access
1588        exp = req.get('experimentID', None)
1589        if exp:
1590            if exp.has_key('fedid'):
1591                key = exp['fedid']
1592                expid = key
1593                eid = None
1594            elif exp.has_key('localname'):
1595                key = exp['localname']
1596                eid = key
1597                expid = None
1598            else:
1599                raise service_error(service_error.req, "Unknown lookup type")
1600        else:
1601            raise service_error(service_error.req, "No request?")
1602
1603        self.check_experiment_access(fid, key)
1604
1605        try:
1606            tmpdir = tempfile.mkdtemp(prefix="split-")
1607            os.mkdir(tmpdir+"/keys")
1608        except IOError:
1609            raise service_error(service_error.internal, "Cannot create tmp dir")
1610
1611        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1612        gw_secretkey_base = "fed.%s" % self.ssh_type
1613        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1614        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1615        tclfile = tmpdir + "/experiment.tcl"
1616        tbparams = { }
1617        try:
1618            access_user = self.accessdb[fid]
1619        except KeyError:
1620            raise service_error(service_error.internal,
1621                    "Access map and authorizer out of sync in " + \
1622                            "create_experiment for fedid %s"  % fid)
1623
1624        pid = "dummy"
1625        gid = "dummy"
1626
1627        # The tcl parser needs to read a file so put the content into that file
1628        descr=req.get('experimentdescription', None)
1629        if descr:
1630            file_content=descr.get('ns2description', None)
1631            if file_content:
1632                try:
1633                    f = open(tclfile, 'w')
1634                    f.write(file_content)
1635                    f.close()
1636                except IOError:
1637                    raise service_error(service_error.internal,
1638                            "Cannot write temp experiment description")
1639            else:
1640                raise service_error(service_error.req, 
1641                        "Only ns2descriptions supported")
1642        else:
1643            raise service_error(service_error.req, "No experiment description")
1644
1645        self.state_lock.acquire()
1646        if self.state.has_key(key):
1647            self.state[key]['experimentStatus'] = "starting"
1648            for e in self.state[key].get('experimentID',[]):
1649                if not expid and e.has_key('fedid'):
1650                    expid = e['fedid']
1651                elif not eid and e.has_key('localname'):
1652                    eid = e['localname']
1653        self.state_lock.release()
1654
1655        if not (eid and expid):
1656            raise service_error(service_error.internal, 
1657                    "Cannot find local experiment info!?")
1658
1659        try: 
1660            # This catches exceptions to clear the placeholder if necessary
1661            try:
1662                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1663            except ValueError:
1664                raise service_error(service_error.server_config, 
1665                        "Bad key type (%s)" % self.ssh_type)
1666
1667            # Copy the service request
1668            tb_services = [ s for s in req.get('service',[]) ]
1669            # Translate to topdl
1670            if self.splitter_url:
1671                self.log.debug("Calling remote topdl translator at %s" % \
1672                        self.splitter_url)
1673                top = self.remote_ns2topdl(self.splitter_url, file_content)
1674            else:
1675                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1676                    str(self.muxmax), '-m', 'dummy']
1677
1678                tclcmd.extend([pid, gid, eid, tclfile])
1679
1680                self.log.debug("running local splitter %s", " ".join(tclcmd))
1681                # This is just fantastic.  As a side effect the parser copies
1682                # tb_compat.tcl into the current directory, so that directory
1683                # must be writable by the fedd user.  Doing this in the
1684                # temporary subdir ensures this is the case.
1685                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1686                        cwd=tmpdir)
1687                split_data = tclparser.stdout
1688
1689                top = topdl.topology_from_xml(file=split_data, top="experiment")
1690
1691            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1692             # Find the testbeds to look up
1693            testbeds = set([ a.value for e in top.elements \
1694                    for a in e.attribute \
1695                        if a.attribute == 'testbed'])
1696
1697            masters = { }           # testbeds exporting services
1698            for s in tb_services:
1699                # If this is a project_export request with the importall field
1700                # set, fill it out.
1701
1702                if s.get('importall', False):
1703                    s['import'] = [ tb for tb in testbeds \
1704                            if tb not in s.get('export',[])]
1705                    del s['importall']
1706
1707                # Add the service to masters
1708                for tb in s.get('export', []):
1709                    if s.get('name', None):
1710                        if tb not in masters:
1711                            masters[tb] = [ ]
1712
1713                        params = { }
1714                        if 'fedAttr' in s:
1715                            for a in s['fedAttr']:
1716                                params[a.get('attribute', '')] = \
1717                                        a.get('value','')
1718
1719                        masters[tb].append(federated_service(name=s['name'],
1720                                exporter=tb, importers=s.get('import',[]),
1721                                params=params, reqs=[]))
1722                    else:
1723                        self.log.error('Testbed service does not have name " + \
1724                                "and importers')
1725
1726
1727            allocated = { }         # Testbeds we can access
1728            topo ={ }               # Sub topologies
1729            connInfo = { }          # Connection information
1730            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1731                    tbparams, masters)
1732
1733            self.split_topology(top, topo, testbeds)
1734
1735            # Copy configuration files into the remote file store
1736            # The config urlpath
1737            configpath = "/%s/config" % expid
1738            # The config file system location
1739            configdir ="%s%s" % ( self.repodir, configpath)
1740            try:
1741                os.makedirs(configdir)
1742            except EnvironmentError, e:
1743                raise service_error(service_error.internal,
1744                        "Cannot create config directory: %s" % e)
1745            try:
1746                f = open("%s/hosts" % configdir, "w")
1747                f.write('\n'.join(hosts))
1748                f.close()
1749            except IOError, e:
1750                raise service_error(service_error.internal, 
1751                        "Cannot write hosts file: %s" % e)
1752            try:
1753                copy_file("%s" % gw_pubkey, "%s/%s" % \
1754                        (configdir, gw_pubkey_base))
1755                copy_file("%s" % gw_secretkey, "%s/%s" % \
1756                        (configdir, gw_secretkey_base))
1757            except IOError, e:
1758                raise service_error(service_error.internal, 
1759                        "Cannot copy keyfiles: %s" % e)
1760
1761            # Allow the individual testbeds to access the configuration files.
1762            for tb in tbparams.keys():
1763                asignee = tbparams[tb]['allocID']['fedid']
1764                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1765                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1766
1767            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1768                    self.muxmax, self.direct_transit)
1769            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1770                    connInfo, expid)
1771            # Now get access to the dynamic testbeds (those added above)
1772            for tb in [ t for t in topo if t not in allocated]:
1773                self.get_access(tb, None, tbparams, access_user, masters)
1774                allocated[tb] = 1
1775                store_keys = topo[tb].get_attribute('store_keys')
1776                # Give the testbed access to keys it exports or imports
1777                if store_keys:
1778                    for sk in store_keys.split(" "):
1779                        self.auth.set_attribute(\
1780                                tbparams[tb]['allocID']['fedid'], sk)
1781
1782            self.wrangle_software(expid, top, topo, tbparams)
1783
1784            vtopo = topdl.topology_to_vtopo(top)
1785            vis = self.genviz(vtopo)
1786
1787            # save federant information
1788            for k in allocated.keys():
1789                tbparams[k]['federant'] = {
1790                        'name': [ { 'localname' : eid} ],
1791                        'allocID' : tbparams[k]['allocID'],
1792                        'uri': tbparams[k]['uri'],
1793                    }
1794
1795            self.state_lock.acquire()
1796            self.state[eid]['vtopo'] = vtopo
1797            self.state[eid]['vis'] = vis
1798            self.state[eid]['experimentdescription'] = \
1799                    { 'topdldescription': top.clone() }
1800            self.state[expid]['federant'] = \
1801                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1802                        if tbparams[tb].has_key('federant') ]
1803            if self.state_filename: 
1804                self.write_state()
1805            self.state_lock.release()
1806        except service_error, e:
1807            # If something goes wrong in the parse (usually an access error)
1808            # clear the placeholder state.  From here on out the code delays
1809            # exceptions.  Failing at this point returns a fault to the remote
1810            # caller.
1811
1812            self.state_lock.acquire()
1813            del self.state[eid]
1814            del self.state[expid]
1815            if self.state_filename: self.write_state()
1816            self.state_lock.release()
1817            raise e
1818
1819
1820        # Start the background swapper and return the starting state.  From
1821        # here on out, the state will stick around a while.
1822
1823        # Let users touch the state
1824        self.auth.set_attribute(fid, expid)
1825        self.auth.set_attribute(expid, expid)
1826        # Override fedids can manipulate state as well
1827        for o in self.overrides:
1828            self.auth.set_attribute(o, expid)
1829
1830        # Create a logger that logs to the experiment's state object as well as
1831        # to the main log file.
1832        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1833        alloc_collector = self.list_log(self.state[eid]['log'])
1834        h = logging.StreamHandler(alloc_collector)
1835        # XXX: there should be a global one of these rather than repeating the
1836        # code.
1837        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1838                    '%d %b %y %H:%M:%S'))
1839        alloc_log.addHandler(h)
1840       
1841        attrs = [ 
1842                {
1843                    'attribute': 'ssh_pubkey', 
1844                    'value': '%s/%s/config/%s' % \
1845                            (self.repo_url, expid, gw_pubkey_base)
1846                },
1847                {
1848                    'attribute': 'ssh_secretkey', 
1849                    'value': '%s/%s/config/%s' % \
1850                            (self.repo_url, expid, gw_secretkey_base)
1851                },
1852                {
1853                    'attribute': 'hosts', 
1854                    'value': '%s/%s/config/hosts' % \
1855                            (self.repo_url, expid)
1856                },
1857            ]
1858
1859        # transit and disconnected testbeds may not have a connInfo entry.
1860        # Fill in the blanks.
1861        for t in allocated.keys():
1862            if not connInfo.has_key(t):
1863                connInfo[t] = { }
1864
1865        # Start a thread to do the resource allocation
1866        t  = Thread(target=self.allocate_resources,
1867                args=(allocated, masters, eid, expid, tbparams, 
1868                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1869                    connInfo),
1870                name=eid)
1871        t.start()
1872
1873        rv = {
1874                'experimentID': [
1875                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1876                ],
1877                'experimentStatus': 'starting',
1878            }
1879
1880        return rv
1881   
1882    def get_experiment_fedid(self, key):
1883        """
1884        find the fedid associated with the localname key in the state database.
1885        """
1886
1887        rv = None
1888        self.state_lock.acquire()
1889        if self.state.has_key(key):
1890            if isinstance(self.state[key], dict):
1891                try:
1892                    kl = [ f['fedid'] for f in \
1893                            self.state[key]['experimentID']\
1894                                if f.has_key('fedid') ]
1895                except KeyError:
1896                    self.state_lock.release()
1897                    raise service_error(service_error.internal, 
1898                            "No fedid for experiment %s when getting "+\
1899                                    "fedid(!?)" % key)
1900                if len(kl) == 1:
1901                    rv = kl[0]
1902                else:
1903                    self.state_lock.release()
1904                    raise service_error(service_error.internal, 
1905                            "multiple fedids for experiment %s when " +\
1906                                    "getting fedid(!?)" % key)
1907            else:
1908                self.state_lock.release()
1909                raise service_error(service_error.internal, 
1910                        "Unexpected state for %s" % key)
1911        self.state_lock.release()
1912        return rv
1913
1914    def check_experiment_access(self, fid, key):
1915        """
1916        Confirm that the fid has access to the experiment.  Though a request
1917        may be made in terms of a local name, the access attribute is always
1918        the experiment's fedid.
1919        """
1920        if not isinstance(key, fedid):
1921            key = self.get_experiment_fedid(key)
1922
1923        if self.auth.check_attribute(fid, key):
1924            return True
1925        else:
1926            raise service_error(service_error.access, "Access Denied")
1927
1928
1929    def get_handler(self, path, fid):
1930        self.log.info("Get handler %s %s" % (path, fid))
1931        if self.auth.check_attribute(fid, path):
1932            return ("%s/%s" % (self.repodir, path), "application/binary")
1933        else:
1934            return (None, None)
1935
1936    def get_vtopo(self, req, fid):
1937        """
1938        Return the stored virtual topology for this experiment
1939        """
1940        rv = None
1941        state = None
1942
1943        req = req.get('VtopoRequestBody', None)
1944        if not req:
1945            raise service_error(service_error.req,
1946                    "Bad request format (no VtopoRequestBody)")
1947        exp = req.get('experiment', None)
1948        if exp:
1949            if exp.has_key('fedid'):
1950                key = exp['fedid']
1951                keytype = "fedid"
1952            elif exp.has_key('localname'):
1953                key = exp['localname']
1954                keytype = "localname"
1955            else:
1956                raise service_error(service_error.req, "Unknown lookup type")
1957        else:
1958            raise service_error(service_error.req, "No request?")
1959
1960        self.check_experiment_access(fid, key)
1961
1962        self.state_lock.acquire()
1963        if self.state.has_key(key):
1964            if self.state[key].has_key('vtopo'):
1965                rv = { 'experiment' : {keytype: key },\
1966                        'vtopo': self.state[key]['vtopo'],\
1967                    }
1968            else:
1969                state = self.state[key]['experimentStatus']
1970        self.state_lock.release()
1971
1972        if rv: return rv
1973        else: 
1974            if state:
1975                raise service_error(service_error.partial, 
1976                        "Not ready: %s" % state)
1977            else:
1978                raise service_error(service_error.req, "No such experiment")
1979
1980    def get_vis(self, req, fid):
1981        """
1982        Return the stored visualization for this experiment
1983        """
1984        rv = None
1985        state = None
1986
1987        req = req.get('VisRequestBody', None)
1988        if not req:
1989            raise service_error(service_error.req,
1990                    "Bad request format (no VisRequestBody)")
1991        exp = req.get('experiment', None)
1992        if exp:
1993            if exp.has_key('fedid'):
1994                key = exp['fedid']
1995                keytype = "fedid"
1996            elif exp.has_key('localname'):
1997                key = exp['localname']
1998                keytype = "localname"
1999            else:
2000                raise service_error(service_error.req, "Unknown lookup type")
2001        else:
2002            raise service_error(service_error.req, "No request?")
2003
2004        self.check_experiment_access(fid, key)
2005
2006        self.state_lock.acquire()
2007        if self.state.has_key(key):
2008            if self.state[key].has_key('vis'):
2009                rv =  { 'experiment' : {keytype: key },\
2010                        'vis': self.state[key]['vis'],\
2011                        }
2012            else:
2013                state = self.state[key]['experimentStatus']
2014        self.state_lock.release()
2015
2016        if rv: return rv
2017        else:
2018            if state:
2019                raise service_error(service_error.partial, 
2020                        "Not ready: %s" % state)
2021            else:
2022                raise service_error(service_error.req, "No such experiment")
2023
2024    def clean_info_response(self, rv):
2025        """
2026        Remove the information in the experiment's state object that is not in
2027        the info response.
2028        """
2029        # Remove the owner info (should always be there, but...)
2030        if rv.has_key('owner'): del rv['owner']
2031
2032        # Convert the log into the allocationLog parameter and remove the
2033        # log entry (with defensive programming)
2034        if rv.has_key('log'):
2035            rv['allocationLog'] = "".join(rv['log'])
2036            del rv['log']
2037        else:
2038            rv['allocationLog'] = ""
2039
2040        if rv['experimentStatus'] != 'active':
2041            if rv.has_key('federant'): del rv['federant']
2042        else:
2043            # remove the allocationID and uri info from each federant
2044            for f in rv.get('federant', []):
2045                if f.has_key('allocID'): del f['allocID']
2046                if f.has_key('uri'): del f['uri']
2047
2048        return rv
2049
2050    def get_info(self, req, fid):
2051        """
2052        Return all the stored info about this experiment
2053        """
2054        rv = None
2055
2056        req = req.get('InfoRequestBody', None)
2057        if not req:
2058            raise service_error(service_error.req,
2059                    "Bad request format (no InfoRequestBody)")
2060        exp = req.get('experiment', None)
2061        if exp:
2062            if exp.has_key('fedid'):
2063                key = exp['fedid']
2064                keytype = "fedid"
2065            elif exp.has_key('localname'):
2066                key = exp['localname']
2067                keytype = "localname"
2068            else:
2069                raise service_error(service_error.req, "Unknown lookup type")
2070        else:
2071            raise service_error(service_error.req, "No request?")
2072
2073        self.check_experiment_access(fid, key)
2074
2075        # The state may be massaged by the service function that called
2076        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2077        # state.
2078        self.state_lock.acquire()
2079        if self.state.has_key(key):
2080            rv = copy.deepcopy(self.state[key])
2081        self.state_lock.release()
2082
2083        if rv:
2084            return self.clean_info_response(rv)
2085        else:
2086            raise service_error(service_error.req, "No such experiment")
2087
2088    def get_multi_info(self, req, fid):
2089        """
2090        Return all the stored info that this fedid can access
2091        """
2092        rv = { 'info': [ ] }
2093
2094        self.state_lock.acquire()
2095        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2096            try:
2097                self.check_experiment_access(fid, key)
2098            except service_error, e:
2099                if e.code == service_error.access:
2100                    continue
2101                else:
2102                    self.state_lock.release()
2103                    raise e
2104
2105            if self.state.has_key(key):
2106                e = copy.deepcopy(self.state[key])
2107                e = self.clean_info_response(e)
2108                rv['info'].append(e)
2109        self.state_lock.release()
2110        return rv
2111
2112    def terminate_experiment(self, req, fid):
2113        """
2114        Swap this experiment out on the federants and delete the shared
2115        information
2116        """
2117        tbparams = { }
2118        req = req.get('TerminateRequestBody', None)
2119        if not req:
2120            raise service_error(service_error.req,
2121                    "Bad request format (no TerminateRequestBody)")
2122        force = req.get('force', False)
2123        exp = req.get('experiment', None)
2124        if exp:
2125            if exp.has_key('fedid'):
2126                key = exp['fedid']
2127                keytype = "fedid"
2128            elif exp.has_key('localname'):
2129                key = exp['localname']
2130                keytype = "localname"
2131            else:
2132                raise service_error(service_error.req, "Unknown lookup type")
2133        else:
2134            raise service_error(service_error.req, "No request?")
2135
2136        self.check_experiment_access(fid, key)
2137
2138        dealloc_list = [ ]
2139
2140
2141        # Create a logger that logs to the dealloc_list as well as to the main
2142        # log file.
2143        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2144        h = logging.StreamHandler(self.list_log(dealloc_list))
2145        # XXX: there should be a global one of these rather than repeating the
2146        # code.
2147        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2148                    '%d %b %y %H:%M:%S'))
2149        dealloc_log.addHandler(h)
2150
2151        self.state_lock.acquire()
2152        fed_exp = self.state.get(key, None)
2153
2154        if fed_exp:
2155            # This branch of the conditional holds the lock to generate a
2156            # consistent temporary tbparams variable to deallocate experiments.
2157            # It releases the lock to do the deallocations and reacquires it to
2158            # remove the experiment state when the termination is complete.
2159
2160            # First make sure that the experiment creation is complete.
2161            status = fed_exp.get('experimentStatus', None)
2162
2163            if status:
2164                if status in ('starting', 'terminating'):
2165                    if not force:
2166                        self.state_lock.release()
2167                        raise service_error(service_error.partial, 
2168                                'Experiment still being created or destroyed')
2169                    else:
2170                        self.log.warning('Experiment in %s state ' % status + \
2171                                'being terminated by force.')
2172            else:
2173                # No status??? trouble
2174                self.state_lock.release()
2175                raise service_error(service_error.internal,
2176                        "Experiment has no status!?")
2177
2178            ids = []
2179            #  experimentID is a list of dicts that are self-describing
2180            #  identifiers.  This finds all the fedids and localnames - the
2181            #  keys of self.state - and puts them into ids.
2182            for id in fed_exp.get('experimentID', []):
2183                if id.has_key('fedid'): ids.append(id['fedid'])
2184                if id.has_key('localname'): ids.append(id['localname'])
2185
2186            # Collect the allocation/segment ids into a dict keyed by the fedid
2187            # of the allocation (or a monotonically increasing integer) that
2188            # contains a tuple of uri, aid (which is a dict...)
2189            for i, fed in enumerate(fed_exp.get('federant', [])):
2190                try:
2191                    uri = fed['uri']
2192                    aid = fed['allocID']
2193                    k = fed['allocID'].get('fedid', i)
2194                except KeyError, e:
2195                    continue
2196                tbparams[k] = (uri, aid)
2197            fed_exp['experimentStatus'] = 'terminating'
2198            if self.state_filename: self.write_state()
2199            self.state_lock.release()
2200
2201            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2202            # then completes, so we can't wait if nothing starts.  So, no
2203            # tbparams, no start.
2204            if len(tbparams) > 0:
2205                thread_pool = self.thread_pool(self.nthreads)
2206                for k in tbparams.keys():
2207                    # Create and start a thread to stop the segment
2208                    thread_pool.wait_for_slot()
2209                    uri, aid = tbparams[k]
2210                    t  = self.pooled_thread(\
2211                            target=self.terminate_segment(log=dealloc_log,
2212                                testbed=uri,
2213                                cert_file=self.cert_file, 
2214                                cert_pwd=self.cert_pwd,
2215                                trusted_certs=self.trusted_certs,
2216                                caller=self.call_TerminateSegment),
2217                            args=(uri, aid), name=k,
2218                            pdata=thread_pool, trace_file=self.trace_file)
2219                    t.start()
2220                # Wait for completions
2221                thread_pool.wait_for_all_done()
2222
2223            # release the allocations (failed experiments have done this
2224            # already, and starting experiments may be in odd states, so we
2225            # ignore errors releasing those allocations
2226            try: 
2227                for k in tbparams.keys():
2228                    # This releases access by uri
2229                    uri, aid = tbparams[k]
2230                    self.release_access(None, aid, uri=uri)
2231            except service_error, e:
2232                if status != 'failed' and not force:
2233                    raise e
2234
2235            # Remove the terminated experiment
2236            self.state_lock.acquire()
2237            for id in ids:
2238                if self.state.has_key(id): del self.state[id]
2239
2240            if self.state_filename: self.write_state()
2241            self.state_lock.release()
2242
2243            # Delete any synch points associated with this experiment.  All
2244            # synch points begin with the fedid of the experiment.
2245            fedid_keys = set(["fedid:%s" % f for f in ids \
2246                    if isinstance(f, fedid)])
2247            for k in self.synch_store.all_keys():
2248                try:
2249                    if len(k) > 45 and k[0:46] in fedid_keys:
2250                        self.synch_store.del_value(k)
2251                except synch_store.BadDeletionError:
2252                    pass
2253            self.write_store()
2254               
2255            return { 
2256                    'experiment': exp , 
2257                    'deallocationLog': "".join(dealloc_list),
2258                    }
2259        else:
2260            # Don't forget to release the lock
2261            self.state_lock.release()
2262            raise service_error(service_error.req, "No saved state")
2263
2264
2265    def GetValue(self, req, fid):
2266        """
2267        Get a value from the synchronized store
2268        """
2269        req = req.get('GetValueRequestBody', None)
2270        if not req:
2271            raise service_error(service_error.req,
2272                    "Bad request format (no GetValueRequestBody)")
2273       
2274        name = req['name']
2275        wait = req['wait']
2276        rv = { 'name': name }
2277
2278        if self.auth.check_attribute(fid, name):
2279            self.log.debug("[GetValue] asking for %s " % name)
2280            try:
2281                v = self.synch_store.get_value(name, wait)
2282            except synch_store.RevokedKeyError:
2283                # No more synch on this key
2284                raise service_error(service_error.federant, 
2285                        "Synch key %s revoked" % name)
2286            if v is not None:
2287                rv['value'] = v
2288            self.log.debug("[GetValue] got %s from %s" % (v, name))
2289            return rv
2290        else:
2291            raise service_error(service_error.access, "Access Denied")
2292       
2293
2294    def SetValue(self, req, fid):
2295        """
2296        Set a value in the synchronized store
2297        """
2298        req = req.get('SetValueRequestBody', None)
2299        if not req:
2300            raise service_error(service_error.req,
2301                    "Bad request format (no SetValueRequestBody)")
2302       
2303        name = req['name']
2304        v = req['value']
2305
2306        if self.auth.check_attribute(fid, name):
2307            try:
2308                self.synch_store.set_value(name, v)
2309                self.write_store()
2310                self.log.debug("[SetValue] set %s to %s" % (name, v))
2311            except synch_store.CollisionError:
2312                # Translate into a service_error
2313                raise service_error(service_error.req,
2314                        "Value already set: %s" %name)
2315            except synch_store.RevokedKeyError:
2316                # No more synch on this key
2317                raise service_error(service_error.federant, 
2318                        "Synch key %s revoked" % name)
2319            return { 'name': name, 'value': v }
2320        else:
2321            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.