source: fedd/federation/experiment_control.py @ 4b68c58

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 4b68c58 was 139e2e2, checked in by Ted Faber <faber@…>, 15 years ago

Whoops. This doesn't fail of there is no direct_transit configuration

  • Property mode set to 100644
File size: 80.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55    def __str__(self):
56        return "name %s export %s import %s params %s reqs %s" % \
57                (self.name, self.exporter, self.importers, self.params,
58                        [ (r['name'], r['visibility']) for r in self.reqs] )
59
60class experiment_control_local:
61    """
62    Control of experiments that this system can directly access.
63
64    Includes experiment creation, termination and information dissemination.
65    Thred safe.
66    """
67
68    class ssh_cmd_timeout(RuntimeError): pass
69   
70    class thread_pool:
71        """
72        A class to keep track of a set of threads all invoked for the same
73        task.  Manages the mutual exclusion of the states.
74        """
75        def __init__(self, nthreads):
76            """
77            Start a pool.
78            """
79            self.changed = Condition()
80            self.started = 0
81            self.terminated = 0
82            self.nthreads = nthreads
83
84        def acquire(self):
85            """
86            Get the pool's lock.
87            """
88            self.changed.acquire()
89
90        def release(self):
91            """
92            Release the pool's lock.
93            """
94            self.changed.release()
95
96        def wait(self, timeout = None):
97            """
98            Wait for a pool thread to start or stop.
99            """
100            self.changed.wait(timeout)
101
102        def start(self):
103            """
104            Called by a pool thread to report starting.
105            """
106            self.changed.acquire()
107            self.started += 1
108            self.changed.notifyAll()
109            self.changed.release()
110
111        def terminate(self):
112            """
113            Called by a pool thread to report finishing.
114            """
115            self.changed.acquire()
116            self.terminated += 1
117            self.changed.notifyAll()
118            self.changed.release()
119
120        def clear(self):
121            """
122            Clear all pool data.
123            """
124            self.changed.acquire()
125            self.started = 0
126            self.terminated =0
127            self.changed.notifyAll()
128            self.changed.release()
129
130        def wait_for_slot(self):
131            """
132            Wait until we have a free slot to start another pooled thread
133            """
134            self.acquire()
135            while self.started - self.terminated >= self.nthreads:
136                self.wait()
137            self.release()
138
139        def wait_for_all_done(self, timeout=None):
140            """
141            Wait until all active threads finish (and at least one has
142            started).  If a timeout is given, return after waiting that long
143            for termination.  If all threads are done (and one has started in
144            the since the last clear()) return True, otherwise False.
145            """
146            if timeout:
147                deadline = time.time() + timeout
148            self.acquire()
149            while self.started == 0 or self.started > self.terminated:
150                self.wait(timeout)
151                if timeout:
152                    if time.time() > deadline:
153                        break
154                    timeout = deadline - time.time()
155            self.release()
156            return not (self.started == 0 or self.started > self.terminated)
157
158    class pooled_thread(Thread):
159        """
160        One of a set of threads dedicated to a specific task.  Uses the
161        thread_pool class above for coordination.
162        """
163        def __init__(self, group=None, target=None, name=None, args=(), 
164                kwargs={}, pdata=None, trace_file=None):
165            Thread.__init__(self, group, target, name, args, kwargs)
166            self.rv = None          # Return value of the ops in this thread
167            self.exception = None   # Exception that terminated this thread
168            self.target=target      # Target function to run on start()
169            self.args = args        # Args to pass to target
170            self.kwargs = kwargs    # Additional kw args
171            self.pdata = pdata      # thread_pool for this class
172            # Logger for this thread
173            self.log = logging.getLogger("fedd.experiment_control")
174       
175        def run(self):
176            """
177            Emulate Thread.run, except add pool data manipulation and error
178            logging.
179            """
180            if self.pdata:
181                self.pdata.start()
182
183            if self.target:
184                try:
185                    self.rv = self.target(*self.args, **self.kwargs)
186                except service_error, s:
187                    self.exception = s
188                    self.log.error("Thread exception: %s %s" % \
189                            (s.code_string(), s.desc))
190                except:
191                    self.exception = sys.exc_info()[1]
192                    self.log.error(("Unexpected thread exception: %s" +\
193                            "Trace %s") % (self.exception,\
194                                traceback.format_exc()))
195            if self.pdata:
196                self.pdata.terminate()
197
198    call_RequestAccess = service_caller('RequestAccess')
199    call_ReleaseAccess = service_caller('ReleaseAccess')
200    call_StartSegment = service_caller('StartSegment')
201    call_TerminateSegment = service_caller('TerminateSegment')
202    call_Ns2Topdl = service_caller('Ns2Topdl')
203
204    def __init__(self, config=None, auth=None):
205        """
206        Intialize the various attributes, most from the config object
207        """
208
209        def parse_tarfile_list(tf):
210            """
211            Parse a tarfile list from the configuration.  This is a set of
212            paths and tarfiles separated by spaces.
213            """
214            rv = [ ]
215            if tf is not None:
216                tl = tf.split()
217                while len(tl) > 1:
218                    p, t = tl[0:2]
219                    del tl[0:2]
220                    rv.append((p, t))
221            return rv
222
223        self.thread_with_rv = experiment_control_local.pooled_thread
224        self.thread_pool = experiment_control_local.thread_pool
225        self.list_log = list_log.list_log
226
227        self.cert_file = config.get("experiment_control", "cert_file")
228        if self.cert_file:
229            self.cert_pwd = config.get("experiment_control", "cert_pwd")
230        else:
231            self.cert_file = config.get("globals", "cert_file")
232            self.cert_pwd = config.get("globals", "cert_pwd")
233
234        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
235                or config.get("globals", "trusted_certs")
236
237        self.repodir = config.get("experiment_control", "repodir")
238        self.repo_url = config.get("experiment_control", "repo_url", 
239                "https://users.isi.deterlab.net:23235");
240
241        self.exp_stem = "fed-stem"
242        self.log = logging.getLogger("fedd.experiment_control")
243        set_log_level(config, "experiment_control", self.log)
244        self.muxmax = 2
245        self.nthreads = 10
246        self.randomize_experiments = False
247
248        self.splitter = None
249        self.ssh_keygen = "/usr/bin/ssh-keygen"
250        self.ssh_identity_file = None
251
252
253        self.debug = config.getboolean("experiment_control", "create_debug")
254        self.cleanup = not config.getboolean("experiment_control", 
255                "leave_tmpfiles")
256        self.state_filename = config.get("experiment_control", 
257                "experiment_state")
258        self.store_filename = config.get("experiment_control", 
259                "synch_store")
260        self.store_url = config.get("experiment_control", "store_url")
261        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
262        self.fedkit = parse_tarfile_list(\
263                config.get("experiment_control", "fedkit"))
264        self.gatewaykit = parse_tarfile_list(\
265                config.get("experiment_control", "gatewaykit"))
266        accessdb_file = config.get("experiment_control", "accessdb")
267
268        self.ssh_pubkey_file = config.get("experiment_control", 
269                "ssh_pubkey_file")
270        self.ssh_privkey_file = config.get("experiment_control",
271                "ssh_privkey_file")
272        dt = config.get("experiment_control", "direct_transit")
273        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
274        else: self.direct_transit = [ ]
275        # NB for internal master/slave ops, not experiment setup
276        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
277
278        self.overrides = set([])
279        ovr = config.get('experiment_control', 'overrides')
280        if ovr:
281            for o in ovr.split(","):
282                o = o.strip()
283                if o.startswith('fedid:'): o = o[len('fedid:'):]
284                self.overrides.add(fedid(hexstr=o))
285
286        self.state = { }
287        self.state_lock = Lock()
288        self.tclsh = "/usr/local/bin/otclsh"
289        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
290                config.get("experiment_control", "tcl_splitter",
291                        "/usr/testbed/lib/ns2ir/parse.tcl")
292        mapdb_file = config.get("experiment_control", "mapdb")
293        self.trace_file = sys.stderr
294
295        self.def_expstart = \
296                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
297                "/tmp/federate";
298        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
299                "FEDDIR/hosts";
300        self.def_gwstart = \
301                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
302                "/tmp/bridge.log";
303        self.def_mgwstart = \
304                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
305                "/tmp/bridge.log";
306        self.def_gwimage = "FBSD61-TUNNEL2";
307        self.def_gwtype = "pc";
308        self.local_access = { }
309
310        if auth:
311            self.auth = auth
312        else:
313            self.log.error(\
314                    "[access]: No authorizer initialized, creating local one.")
315            auth = authorizer()
316
317
318        if self.ssh_pubkey_file:
319            try:
320                f = open(self.ssh_pubkey_file, 'r')
321                self.ssh_pubkey = f.read()
322                f.close()
323            except IOError:
324                raise service_error(service_error.internal,
325                        "Cannot read sshpubkey")
326        else:
327            raise service_error(service_error.internal, 
328                    "No SSH public key file?")
329
330        if not self.ssh_privkey_file:
331            raise service_error(service_error.internal, 
332                    "No SSH public key file?")
333
334
335        if mapdb_file:
336            self.read_mapdb(mapdb_file)
337        else:
338            self.log.warn("[experiment_control] No testbed map, using defaults")
339            self.tbmap = { 
340                    'deter':'https://users.isi.deterlab.net:23235',
341                    'emulab':'https://users.isi.deterlab.net:23236',
342                    'ucb':'https://users.isi.deterlab.net:23237',
343                    }
344
345        if accessdb_file:
346                self.read_accessdb(accessdb_file)
347        else:
348            raise service_error(service_error.internal,
349                    "No accessdb specified in config")
350
351        # Grab saved state.  OK to do this w/o locking because it's read only
352        # and only one thread should be in existence that can see self.state at
353        # this point.
354        if self.state_filename:
355            self.read_state()
356
357        if self.store_filename:
358            self.read_store()
359        else:
360            self.log.warning("No saved synch store")
361            self.synch_store = synch_store
362
363        # Dispatch tables
364        self.soap_services = {\
365                'New': soap_handler('New', self.new_experiment),
366                'Create': soap_handler('Create', self.create_experiment),
367                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
368                'Vis': soap_handler('Vis', self.get_vis),
369                'Info': soap_handler('Info', self.get_info),
370                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
371                'Terminate': soap_handler('Terminate', 
372                    self.terminate_experiment),
373                'GetValue': soap_handler('GetValue', self.GetValue),
374                'SetValue': soap_handler('SetValue', self.SetValue),
375        }
376
377        self.xmlrpc_services = {\
378                'New': xmlrpc_handler('New', self.new_experiment),
379                'Create': xmlrpc_handler('Create', self.create_experiment),
380                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
381                'Vis': xmlrpc_handler('Vis', self.get_vis),
382                'Info': xmlrpc_handler('Info', self.get_info),
383                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
384                'Terminate': xmlrpc_handler('Terminate',
385                    self.terminate_experiment),
386                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
387                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
388        }
389
390    # Call while holding self.state_lock
391    def write_state(self):
392        """
393        Write a new copy of experiment state after copying the existing state
394        to a backup.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        if os.access(self.state_filename, os.W_OK):
399            copy_file(self.state_filename, \
400                    "%s.bak" % self.state_filename)
401        try:
402            f = open(self.state_filename, 'w')
403            pickle.dump(self.state, f)
404        except IOError, e:
405            self.log.error("Can't write file %s: %s" % \
406                    (self.state_filename, e))
407        except pickle.PicklingError, e:
408            self.log.error("Pickling problem: %s" % e)
409        except TypeError, e:
410            self.log.error("Pickling problem (TypeError): %s" % e)
411
412    @staticmethod
413    def get_alloc_ids(state):
414        """
415        Pull the fedids of the identifiers of each allocation from the
416        state.  Again, a dict dive that's best isolated.
417
418        Used by read_store and read state
419        """
420
421        return [ f['allocID']['fedid'] 
422                for f in state.get('federant',[]) \
423                    if f.has_key('allocID') and \
424                        f['allocID'].has_key('fedid')]
425
426    # Call while holding self.state_lock
427    def read_state(self):
428        """
429        Read a new copy of experiment state.  Old state is overwritten.
430
431        State format is a simple pickling of the state dictionary.
432        """
433       
434        def get_experiment_id(state):
435            """
436            Pull the fedid experimentID out of the saved state.  This is kind
437            of a gross walk through the dict.
438            """
439
440            if state.has_key('experimentID'):
441                for e in state['experimentID']:
442                    if e.has_key('fedid'):
443                        return e['fedid']
444                else:
445                    return None
446            else:
447                return None
448
449        try:
450            f = open(self.state_filename, "r")
451            self.state = pickle.load(f)
452            self.log.debug("[read_state]: Read state from %s" % \
453                    self.state_filename)
454        except IOError, e:
455            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
456                    % (self.state_filename, e))
457        except pickle.UnpicklingError, e:
458            self.log.warning(("[read_state]: No saved state: " + \
459                    "Unpickling failed: %s") % e)
460       
461        for s in self.state.values():
462            try:
463
464                eid = get_experiment_id(s)
465                if eid : 
466                    # Give the owner rights to the experiment
467                    self.auth.set_attribute(s['owner'], eid)
468                    # And holders of the eid as well
469                    self.auth.set_attribute(eid, eid)
470                    # allow overrides to control experiments as well
471                    for o in self.overrides:
472                        self.auth.set_attribute(o, eid)
473                    # Set permissions to allow reading of the software repo, if
474                    # any, as well.
475                    for a in self.get_alloc_ids(s):
476                        self.auth.set_attribute(a, 'repo/%s' % eid)
477                else:
478                    raise KeyError("No experiment id")
479            except KeyError, e:
480                self.log.warning("[read_state]: State ownership or identity " +\
481                        "misformatted in %s: %s" % (self.state_filename, e))
482
483
484    def read_accessdb(self, accessdb_file):
485        """
486        Read the mapping from fedids that can create experiments to their name
487        in the 3-level access namespace.  All will be asserted from this
488        testbed and can include the local username and porject that will be
489        asserted on their behalf by this fedd.  Each fedid is also added to the
490        authorization system with the "create" attribute.
491        """
492        self.accessdb = {}
493        # These are the regexps for parsing the db
494        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
495        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
496                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
497        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
498                "\s*->\s*(" + name_expr + ")\s*$")
499        lineno = 0
500
501        # Parse the mappings and store in self.authdb, a dict of
502        # fedid -> (proj, user)
503        try:
504            f = open(accessdb_file, "r")
505            for line in f:
506                lineno += 1
507                line = line.strip()
508                if len(line) == 0 or line.startswith('#'):
509                    continue
510                m = project_line.match(line)
511                if m:
512                    fid = fedid(hexstr=m.group(1))
513                    project, user = m.group(2,3)
514                    if not self.accessdb.has_key(fid):
515                        self.accessdb[fid] = []
516                    self.accessdb[fid].append((project, user))
517                    continue
518
519                m = user_line.match(line)
520                if m:
521                    fid = fedid(hexstr=m.group(1))
522                    project = None
523                    user = m.group(2)
524                    if not self.accessdb.has_key(fid):
525                        self.accessdb[fid] = []
526                    self.accessdb[fid].append((project, user))
527                    continue
528                self.log.warn("[experiment_control] Error parsing access " +\
529                        "db %s at line %d" %  (accessdb_file, lineno))
530        except IOError:
531            raise service_error(service_error.internal,
532                    ("Error opening/reading %s as experiment " +\
533                            "control accessdb") %  accessdb_file)
534        f.close()
535
536        # Initialize the authorization attributes
537        for fid in self.accessdb.keys():
538            self.auth.set_attribute(fid, 'create')
539            self.auth.set_attribute(fid, 'new')
540
541    def read_mapdb(self, file):
542        """
543        Read a simple colon separated list of mappings for the
544        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
545        """
546
547        self.tbmap = { }
548        lineno =0
549        try:
550            f = open(file, "r")
551            for line in f:
552                lineno += 1
553                line = line.strip()
554                if line.startswith('#') or len(line) == 0:
555                    continue
556                try:
557                    label, url = line.split(':', 1)
558                    self.tbmap[label] = url
559                except ValueError, e:
560                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
561                            "map db: %s %s" % (lineno, line, e))
562        except IOError, e:
563            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
564                    "open %s: %s" % (file, e))
565        f.close()
566
567    def read_store(self):
568        try:
569            self.synch_store = synch_store()
570            self.synch_store.load(self.store_filename)
571            self.log.debug("[read_store]: Read store from %s" % \
572                    self.store_filename)
573        except IOError, e:
574            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
575                    % (self.state_filename, e))
576            self.synch_store = synch_store()
577
578        # Set the initial permissions on data in the store.  XXX: This ad hoc
579        # authorization attribute initialization is getting out of hand.
580        for k in self.synch_store.all_keys():
581            try:
582                if k.startswith('fedid:'):
583                    fid = fedid(hexstr=k[6:46])
584                    if self.state.has_key(fid):
585                        for a in self.get_alloc_ids(self.state[fid]):
586                            self.auth.set_attribute(a, k)
587            except ValueError, e:
588                self.log.warn("Cannot deduce permissions for %s" % k)
589
590
591    def write_store(self):
592        """
593        Write a new copy of synch_store after writing current state
594        to a backup.  We use the internal synch_store pickle method to avoid
595        incinsistent data.
596
597        State format is a simple pickling of the store.
598        """
599        if os.access(self.store_filename, os.W_OK):
600            copy_file(self.store_filename, \
601                    "%s.bak" % self.store_filename)
602        try:
603            self.synch_store.save(self.store_filename)
604        except IOError, e:
605            self.log.error("Can't write file %s: %s" % \
606                    (self.store_filename, e))
607        except TypeError, e:
608            self.log.error("Pickling problem (TypeError): %s" % e)
609
610       
611    def generate_ssh_keys(self, dest, type="rsa" ):
612        """
613        Generate a set of keys for the gateways to use to talk.
614
615        Keys are of type type and are stored in the required dest file.
616        """
617        valid_types = ("rsa", "dsa")
618        t = type.lower();
619        if t not in valid_types: raise ValueError
620        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
621
622        try:
623            trace = open("/dev/null", "w")
624        except IOError:
625            raise service_error(service_error.internal,
626                    "Cannot open /dev/null??");
627
628        # May raise CalledProcessError
629        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
630        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
631        if rv != 0:
632            raise service_error(service_error.internal, 
633                    "Cannot generate nonce ssh keys.  %s return code %d" \
634                            % (self.ssh_keygen, rv))
635
636    def gentopo(self, str):
637        """
638        Generate the topology dtat structure from the splitter's XML
639        representation of it.
640
641        The topology XML looks like:
642            <experiment>
643                <nodes>
644                    <node><vname></vname><ips>ip1:ip2</ips></node>
645                </nodes>
646                <lans>
647                    <lan>
648                        <vname></vname><vnode></vnode><ip></ip>
649                        <bandwidth></bandwidth><member>node:port</member>
650                    </lan>
651                </lans>
652        """
653        class topo_parse:
654            """
655            Parse the topology XML and create the dats structure.
656            """
657            def __init__(self):
658                # Typing of the subelements for data conversion
659                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
660                self.int_subelements = ( 'bandwidth',)
661                self.float_subelements = ( 'delay',)
662                # The final data structure
663                self.nodes = [ ]
664                self.lans =  [ ]
665                self.topo = { \
666                        'node': self.nodes,\
667                        'lan' : self.lans,\
668                    }
669                self.element = { }  # Current element being created
670                self.chars = ""     # Last text seen
671
672            def end_element(self, name):
673                # After each sub element the contents is added to the current
674                # element or to the appropriate list.
675                if name == 'node':
676                    self.nodes.append(self.element)
677                    self.element = { }
678                elif name == 'lan':
679                    self.lans.append(self.element)
680                    self.element = { }
681                elif name in self.str_subelements:
682                    self.element[name] = self.chars
683                    self.chars = ""
684                elif name in self.int_subelements:
685                    self.element[name] = int(self.chars)
686                    self.chars = ""
687                elif name in self.float_subelements:
688                    self.element[name] = float(self.chars)
689                    self.chars = ""
690
691            def found_chars(self, data):
692                self.chars += data.rstrip()
693
694
695        tp = topo_parse();
696        parser = xml.parsers.expat.ParserCreate()
697        parser.EndElementHandler = tp.end_element
698        parser.CharacterDataHandler = tp.found_chars
699
700        parser.Parse(str)
701
702        return tp.topo
703       
704
705    def genviz(self, topo):
706        """
707        Generate the visualization the virtual topology
708        """
709
710        neato = "/usr/local/bin/neato"
711        # These are used to parse neato output and to create the visualization
712        # file.
713        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
714        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
715                "%s</type></node>"
716
717        try:
718            # Node names
719            nodes = [ n['vname'] for n in topo['node'] ]
720            topo_lans = topo['lan']
721        except KeyError, e:
722            raise service_error(service_error.internal, "Bad topology: %s" %e)
723
724        lans = { }
725        links = { }
726
727        # Walk through the virtual topology, organizing the connections into
728        # 2-node connections (links) and more-than-2-node connections (lans).
729        # When a lan is created, it's added to the list of nodes (there's a
730        # node in the visualization for the lan).
731        for l in topo_lans:
732            if links.has_key(l['vname']):
733                if len(links[l['vname']]) < 2:
734                    links[l['vname']].append(l['vnode'])
735                else:
736                    nodes.append(l['vname'])
737                    lans[l['vname']] = links[l['vname']]
738                    del links[l['vname']]
739                    lans[l['vname']].append(l['vnode'])
740            elif lans.has_key(l['vname']):
741                lans[l['vname']].append(l['vnode'])
742            else:
743                links[l['vname']] = [ l['vnode'] ]
744
745
746        # Open up a temporary file for dot to turn into a visualization
747        try:
748            df, dotname = tempfile.mkstemp()
749            dotfile = os.fdopen(df, 'w')
750        except IOError:
751            raise service_error(service_error.internal,
752                    "Failed to open file in genviz")
753
754        try:
755            dnull = open('/dev/null', 'w')
756        except IOError:
757            service_error(service_error.internal,
758                    "Failed to open /dev/null in genviz")
759
760        # Generate a dot/neato input file from the links, nodes and lans
761        try:
762            print >>dotfile, "graph G {"
763            for n in nodes:
764                print >>dotfile, '\t"%s"' % n
765            for l in links.keys():
766                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
767            for l in lans.keys():
768                for n in lans[l]:
769                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
770            print >>dotfile, "}"
771            dotfile.close()
772        except TypeError:
773            raise service_error(service_error.internal,
774                    "Single endpoint link in vtopo")
775        except IOError:
776            raise service_error(service_error.internal, "Cannot write dot file")
777
778        # Use dot to create a visualization
779        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
780                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
781                close_fds=True)
782        dnull.close()
783
784        # Translate dot to vis format
785        vis_nodes = [ ]
786        vis = { 'node': vis_nodes }
787        for line in dot.stdout:
788            m = vis_re.match(line)
789            if m:
790                vn = m.group(1)
791                vis_node = {'name': vn, \
792                        'x': float(m.group(2)),\
793                        'y' : float(m.group(3)),\
794                    }
795                if vn in links.keys() or vn in lans.keys():
796                    vis_node['type'] = 'lan'
797                else:
798                    vis_node['type'] = 'node'
799                vis_nodes.append(vis_node)
800        rv = dot.wait()
801
802        os.remove(dotname)
803        if rv == 0 : return vis
804        else: return None
805
806    def get_access(self, tb, nodes, tbparam, access_user, masters):
807        """
808        Get access to testbed through fedd and set the parameters for that tb
809        """
810        def get_export_project(svcs):
811            """
812            Look through for the list of federated_service for this testbed
813            objects for a project_export service, and extract the project
814            parameter.
815            """
816
817            pe = [s for s in svcs if s.name=='project_export']
818            if len(pe) == 1:
819                return pe[0].params.get('project', None)
820            elif len(pe) == 0:
821                return None
822            else:
823                raise service_error(service_error.req,
824                        "More than one project export is not supported")
825
826        uri = self.tbmap.get(testbed_base(tb), None)
827        if not uri:
828            raise service_error(service_error.server_config, 
829                    "Unknown testbed: %s" % tb)
830
831        export_svcs = masters.get(tb,[])
832        import_svcs = [ s for m in masters.values() \
833                for s in m \
834                    if tb in s.importers ]
835
836        export_project = get_export_project(export_svcs)
837
838        # Tweak search order so that if there are entries in access_user that
839        # have a project matching the export project, we try them first
840        if export_project: 
841            access_sequence = [ (p, u) for p, u in access_user \
842                    if p == export_project] 
843            access_sequence.extend([(p, u) for p, u in access_user \
844                    if p != export_project]) 
845        else: 
846            access_sequence = access_user
847
848        for p, u in access_sequence: 
849            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
850                    "to %s") %  ((p or "None"), u, uri))
851
852            if p:
853                # Request with user and project specified
854                req = {\
855                        'destinationTestbed' : { 'uri' : uri },
856                        'credential': [ "project: %s" % p, "user: %s"  % u],
857                        'allocID' : { 'localname': 'test' },
858                    }
859            else:
860                # Request with only user specified
861                req = {\
862                        'destinationTestbed' : { 'uri' : uri },
863                        'credential': [ 'user: %s' % u ],
864                        'allocID' : { 'localname': 'test' },
865                    }
866
867            # Make the service request from the services we're importing and
868            # exporting.  Keep track of the export request ids so we can
869            # collect the resulting info from the access response.
870            e_keys = { }
871            if import_svcs or export_svcs:
872                req['service'] = [ ]
873
874                for i, s in enumerate(import_svcs):
875                    idx = 'import%d' % i
876                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
877                    if s.params:
878                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
879                                for k, v in s.params.items()]
880                    req['service'].append(sr)
881
882                for i, s in enumerate(export_svcs):
883                    idx = 'export%d' % i
884                    e_keys[idx] = s
885                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
886                    if s.params:
887                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
888                                for k, v in s.params.items()]
889                    req['service'].append(sr)
890
891            # node resources if any
892            if nodes != None and len(nodes) > 0:
893                rnodes = [ ]
894                for n in nodes:
895                    rn = { }
896                    image, hw, count = n.split(":")
897                    if image: rn['image'] = [ image ]
898                    if hw: rn['hardware'] = [ hw ]
899                    if count and int(count) >0 : rn['count'] = int(count)
900                    rnodes.append(rn)
901                req['resources']= { }
902                req['resources']['node'] = rnodes
903
904            try:
905                if self.local_access.has_key(uri):
906                    # Local access call
907                    req = { 'RequestAccessRequestBody' : req }
908                    r = self.local_access[uri].RequestAccess(req, 
909                            fedid(file=self.cert_file))
910                    r = { 'RequestAccessResponseBody' : r }
911                else:
912                    r = self.call_RequestAccess(uri, req, 
913                            self.cert_file, self.cert_pwd, self.trusted_certs)
914            except service_error, e:
915                if e.code == service_error.access:
916                    self.log.debug("[get_access] Access denied")
917                    r = None
918                    continue
919                else:
920                    raise e
921
922            if r.has_key('RequestAccessResponseBody'):
923                # Through to here we have a valid response, not a fault.
924                # Access denied is a fault, so something better or worse than
925                # access denied has happened.
926                r = r['RequestAccessResponseBody']
927                self.log.debug("[get_access] Access granted")
928                break
929            else:
930                raise service_error(service_error.protocol,
931                        "Bad proxy response")
932       
933        if not r:
934            raise service_error(service_error.access, 
935                    "Access denied by %s (%s)" % (tb, uri))
936
937        tbparam[tb] = { 
938                "allocID" : r['allocID'],
939                "uri": uri,
940                }
941
942        # Collect the responses corresponding to the services this testbed
943        # exports.  These will be the service requests that we will include in
944        # the start segment requests (with appropriate visibility values) to
945        # import and export the segments.
946        for s in r.get('service', []):
947            id = s.get('id', None)
948            if id and id in e_keys:
949                e_keys[id].reqs.append(s)
950
951        # Add attributes to parameter space.  We don't allow attributes to
952        # overlay any parameters already installed.
953        for a in r.get('fedAttr', []):
954            try:
955                if a['attribute'] and \
956                        isinstance(a['attribute'], basestring)\
957                        and not tbparam[tb].has_key(a['attribute'].lower()):
958                    tbparam[tb][a['attribute'].lower()] = a['value']
959            except KeyError:
960                self.log.error("Bad attribute in response: %s" % a)
961
962    def release_access(self, tb, aid, uri=None):
963        """
964        Release access to testbed through fedd
965        """
966
967        if not uri:
968            uri = self.tbmap.get(tb, None)
969        if not uri:
970            raise service_error(service_error.server_config, 
971                    "Unknown testbed: %s" % tb)
972
973        if self.local_access.has_key(uri):
974            resp = self.local_access[uri].ReleaseAccess(\
975                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
976                    fedid(file=self.cert_file))
977            resp = { 'ReleaseAccessResponseBody': resp } 
978        else:
979            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
980                    self.cert_file, self.cert_pwd, self.trusted_certs)
981
982        # better error coding
983
984    def remote_ns2topdl(self, uri, desc):
985
986        req = {
987                'description' : { 'ns2description': desc },
988            }
989
990        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
991                self.trusted_certs)
992
993        if r.has_key('Ns2TopdlResponseBody'):
994            r = r['Ns2TopdlResponseBody']
995            ed = r.get('experimentdescription', None)
996            if ed.has_key('topdldescription'):
997                return topdl.Topology(**ed['topdldescription'])
998            else:
999                raise service_error(service_error.protocol, 
1000                        "Bad splitter response (no output)")
1001        else:
1002            raise service_error(service_error.protocol, "Bad splitter response")
1003
1004    class start_segment:
1005        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1006                cert_pwd=None, trusted_certs=None, caller=None,
1007                log_collector=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015            self.log_collector = log_collector
1016            self.response = None
1017            self.node = { }
1018
1019        def make_map(self, resp):
1020            if 'segmentdescription' in resp and \
1021                    'topdldescription' in resp['segmentdescription']:
1022                top = topdl.Topology(\
1023                        **resp['segmentdescription']['topdldescription'])
1024                for e in [e for e in top.elements \
1025                        if isinstance(e, topdl.Computer)]:
1026                    hn = e.get_attribute('hostname')
1027                    if hn:
1028                        for n in e.name:
1029                            self.node[n] = hn
1030
1031        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1032            req = {
1033                    'allocID': { 'fedid' : aid }, 
1034                    'segmentdescription': { 
1035                        'topdldescription': topo.to_dict(),
1036                    },
1037                }
1038
1039            if connInfo:
1040                req['connection'] = connInfo
1041
1042            import_svcs = [ s for m in masters.values() \
1043                    for s in m if self.testbed in s.importers]
1044
1045            if import_svcs or self.testbed in masters:
1046                req['service'] = []
1047
1048            for s in import_svcs:
1049                for r in s.reqs:
1050                    sr = copy.deepcopy(r)
1051                    sr['visibility'] = 'import';
1052                    req['service'].append(sr)
1053
1054            for s in masters.get(self.testbed, []):
1055                for r in s.reqs:
1056                    sr = copy.deepcopy(r)
1057                    sr['visibility'] = 'export';
1058                    req['service'].append(sr)
1059
1060            if attrs:
1061                req['fedAttr'] = attrs
1062
1063            try:
1064                self.log.debug("Calling StartSegment at %s " % uri)
1065                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1066                        self.trusted_certs)
1067                if r.has_key('StartSegmentResponseBody'):
1068                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1069                            None)
1070                    if lval and self.log_collector:
1071                        for line in  lval.splitlines(True):
1072                            self.log_collector.write(line)
1073                    self.make_map(r['StartSegmentResponseBody'])
1074                    self.response = r
1075                else:
1076                    raise service_error(service_error.internal, 
1077                            "Bad response!?: %s" %r)
1078                return True
1079            except service_error, e:
1080                self.log.error("Start segment failed on %s: %s" % \
1081                        (self.testbed, e))
1082                return False
1083
1084
1085
1086    class terminate_segment:
1087        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1088                cert_pwd=None, trusted_certs=None, caller=None):
1089            self.log = log
1090            self.debug = debug
1091            self.cert_file = cert_file
1092            self.cert_pwd = cert_pwd
1093            self.trusted_certs = None
1094            self.caller = caller
1095            self.testbed = testbed
1096
1097        def __call__(self, uri, aid ):
1098            req = {
1099                    'allocID': aid , 
1100                }
1101            try:
1102                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1103                        self.trusted_certs)
1104                return True
1105            except service_error, e:
1106                self.log.error("Terminate segment failed on %s: %s" % \
1107                        (self.testbed, e))
1108                return False
1109   
1110
1111    def allocate_resources(self, allocated, masters, eid, expid, 
1112            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1113            attrs=None, connInfo={}):
1114
1115        started = { }           # Testbeds where a sub-experiment started
1116                                # successfully
1117
1118        # XXX
1119        fail_soft = False
1120
1121        log = alloc_log or self.log
1122
1123        thread_pool = self.thread_pool(self.nthreads)
1124        threads = [ ]
1125        starters = [ ]
1126
1127        for tb in allocated.keys():
1128            # Create and start a thread to start the segment, and save it
1129            # to get the return value later
1130            tb_attrs = copy.copy(attrs)
1131            thread_pool.wait_for_slot()
1132            uri = tbparams[tb].get('uri', \
1133                    self.tbmap.get(testbed_base(tb), None))
1134            base, suffix = split_testbed(tb)
1135            if suffix:
1136                tb_attrs.append({'attribute': 'experiment_name', 
1137                    'value': "%s-%s" % (eid, suffix)})
1138            else:
1139                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1140            if not uri:
1141                raise service_error(service_error.internal, 
1142                        "Unknown testbed %s !?" % tb)
1143
1144            if tbparams[tb].has_key('allocID') and \
1145                    tbparams[tb]['allocID'].has_key('fedid'):
1146                aid = tbparams[tb]['allocID']['fedid']
1147            else:
1148                raise service_error(service_error.internal, 
1149                        "No alloc id for testbed %s !?" % tb)
1150
1151            s = self.start_segment(log=log, debug=self.debug,
1152                    testbed=tb, cert_file=self.cert_file,
1153                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1154                    caller=self.call_StartSegment,
1155                    log_collector=log_collector)
1156            starters.append(s)
1157            t  = self.pooled_thread(\
1158                    target=s, name=tb,
1159                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1160                    pdata=thread_pool, trace_file=self.trace_file)
1161            threads.append(t)
1162            t.start()
1163
1164        # Wait until all finish (keep pinging the log, though)
1165        mins = 0
1166        revoked = False
1167        while not thread_pool.wait_for_all_done(60.0):
1168            mins += 1
1169            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1170                    % mins)
1171            if not revoked and \
1172                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1173                # a testbed has failed.  Revoke this experiment's
1174                # synchronizarion values so that sub experiments will not
1175                # deadlock waiting for synchronization that will never happen
1176                self.log.info("A subexperiment has failed to swap in, " + \
1177                        "revoking synch keys")
1178                var_key = "fedid:%s" % expid
1179                for k in self.synch_store.all_keys():
1180                    if len(k) > 45 and k[0:46] == var_key:
1181                        self.synch_store.revoke_key(k)
1182                revoked = True
1183
1184        failed = [ t.getName() for t in threads if not t.rv ]
1185        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1186
1187        # If one failed clean up, unless fail_soft is set
1188        if failed:
1189            if not fail_soft:
1190                thread_pool.clear()
1191                for tb in succeeded:
1192                    # Create and start a thread to stop the segment
1193                    thread_pool.wait_for_slot()
1194                    uri = tbparams[tb]['uri']
1195                    t  = self.pooled_thread(\
1196                            target=self.terminate_segment(log=log,
1197                                testbed=tb,
1198                                cert_file=self.cert_file, 
1199                                cert_pwd=self.cert_pwd,
1200                                trusted_certs=self.trusted_certs,
1201                                caller=self.call_TerminateSegment),
1202                            args=(uri, tbparams[tb]['federant']['allocID']),
1203                            name=tb,
1204                            pdata=thread_pool, trace_file=self.trace_file)
1205                    t.start()
1206                # Wait until all finish (if any are being stopped)
1207                if succeeded:
1208                    thread_pool.wait_for_all_done()
1209
1210                # release the allocations
1211                for tb in tbparams.keys():
1212                    self.release_access(tb, tbparams[tb]['allocID'],
1213                            tbparams[tb].get('uri', None))
1214                # Remove the placeholder
1215                self.state_lock.acquire()
1216                self.state[eid]['experimentStatus'] = 'failed'
1217                if self.state_filename: self.write_state()
1218                self.state_lock.release()
1219
1220                log.error("Swap in failed on %s" % ",".join(failed))
1221                return
1222        else:
1223            # Walk through the successes and gather the virtual to physical
1224            # mapping.
1225            node = { }
1226            for s in starters:
1227                node.update(s.node)
1228            # Assigng the mapping as a hostname attribute
1229            for e in [ e for e in top.elements \
1230                    if isinstance(e, topdl.Computer)]:
1231                for n in e.name:
1232                    if n in node:
1233                        e.set_attribute('hostname', node[n])
1234            log.info("[start_segment]: Experiment %s active" % eid)
1235
1236
1237        # Walk up tmpdir, deleting as we go
1238        if self.cleanup:
1239            log.debug("[start_experiment]: removing %s" % tmpdir)
1240            for path, dirs, files in os.walk(tmpdir, topdown=False):
1241                for f in files:
1242                    os.remove(os.path.join(path, f))
1243                for d in dirs:
1244                    os.rmdir(os.path.join(path, d))
1245            os.rmdir(tmpdir)
1246        else:
1247            log.debug("[start_experiment]: not removing %s" % tmpdir)
1248
1249        # Insert the experiment into our state and update the disk copy.
1250        self.state_lock.acquire()
1251        self.state[expid]['experimentStatus'] = 'active'
1252        self.state[eid] = self.state[expid]
1253        self.state[eid]['experimentdescription']['topdldescription'] = \
1254                top.to_dict()
1255        if self.state_filename: self.write_state()
1256        self.state_lock.release()
1257        return
1258
1259
1260    def add_kit(self, e, kit):
1261        """
1262        Add a Software object created from the list of (install, location)
1263        tuples passed as kit  to the software attribute of an object e.  We
1264        do this enough to break out the code, but it's kind of a hack to
1265        avoid changing the old tuple rep.
1266        """
1267
1268        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1269
1270        if isinstance(e.software, list): e.software.extend(s)
1271        else: e.software = s
1272
1273
1274    def create_experiment_state(self, fid, req, expid, expcert,
1275            state='starting'):
1276        """
1277        Create the initial entry in the experiment's state.  The expid and
1278        expcert are the experiment's fedid and certifacte that represents that
1279        ID, which are installed in the experiment state.  If the request
1280        includes a suggested local name that is used if possible.  If the local
1281        name is already taken by an experiment owned by this user that has
1282        failed, it is overwritten.  Otherwise new letters are added until a
1283        valid localname is found.  The generated local name is returned.
1284        """
1285
1286        if req.has_key('experimentID') and \
1287                req['experimentID'].has_key('localname'):
1288            overwrite = False
1289            eid = req['experimentID']['localname']
1290            # If there's an old failed experiment here with the same local name
1291            # and accessible by this user, we'll overwrite it, otherwise we'll
1292            # fall through and do the collision avoidance.
1293            old_expid = self.get_experiment_fedid(eid)
1294            if old_expid and self.check_experiment_access(fid, old_expid):
1295                self.state_lock.acquire()
1296                status = self.state[eid].get('experimentStatus', None)
1297                if status and status == 'failed':
1298                    # remove the old access attribute
1299                    self.auth.unset_attribute(fid, old_expid)
1300                    overwrite = True
1301                    del self.state[eid]
1302                    del self.state[old_expid]
1303                self.state_lock.release()
1304            self.state_lock.acquire()
1305            while (self.state.has_key(eid) and not overwrite):
1306                eid += random.choice(string.ascii_letters)
1307            # Initial state
1308            self.state[eid] = {
1309                    'experimentID' : \
1310                            [ { 'localname' : eid }, {'fedid': expid } ],
1311                    'experimentStatus': state,
1312                    'experimentAccess': { 'X509' : expcert },
1313                    'owner': fid,
1314                    'log' : [],
1315                }
1316            self.state[expid] = self.state[eid]
1317            if self.state_filename: self.write_state()
1318            self.state_lock.release()
1319        else:
1320            eid = self.exp_stem
1321            for i in range(0,5):
1322                eid += random.choice(string.ascii_letters)
1323            self.state_lock.acquire()
1324            while (self.state.has_key(eid)):
1325                eid = self.exp_stem
1326                for i in range(0,5):
1327                    eid += random.choice(string.ascii_letters)
1328            # Initial state
1329            self.state[eid] = {
1330                    'experimentID' : \
1331                            [ { 'localname' : eid }, {'fedid': expid } ],
1332                    'experimentStatus': state,
1333                    'experimentAccess': { 'X509' : expcert },
1334                    'owner': fid,
1335                    'log' : [],
1336                }
1337            self.state[expid] = self.state[eid]
1338            if self.state_filename: self.write_state()
1339            self.state_lock.release()
1340
1341        return eid
1342
1343
1344    def allocate_ips_to_topo(self, top):
1345        """
1346        Add an ip4_address attribute to all the hosts in the topology, based on
1347        the shared substrates on which they sit.  An /etc/hosts file is also
1348        created and returned as a list of hostfiles entries.  We also return
1349        the allocator, because we may need to allocate IPs to portals
1350        (specifically DRAGON portals).
1351        """
1352        subs = sorted(top.substrates, 
1353                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1354                reverse=True)
1355        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1356        ifs = { }
1357        hosts = [ ]
1358
1359        for idx, s in enumerate(subs):
1360            net_size = len(s.interfaces)+2
1361
1362            a = ips.allocate(net_size)
1363            if a :
1364                base, num = a
1365                if num < net_size: 
1366                    raise service_error(service_error.internal,
1367                            "Allocator returned wrong number of IPs??")
1368            else:
1369                raise service_error(service_error.req, 
1370                        "Cannot allocate IP addresses")
1371            mask = ips.min_alloc
1372            while mask < net_size:
1373                mask *= 2
1374
1375            netmask = ((2**32-1) ^ (mask-1))
1376
1377            base += 1
1378            for i in s.interfaces:
1379                i.attribute.append(
1380                        topdl.Attribute('ip4_address', 
1381                            "%s" % ip_addr(base)))
1382                i.attribute.append(
1383                        topdl.Attribute('ip4_netmask', 
1384                            "%s" % ip_addr(int(netmask))))
1385
1386                hname = i.element.name
1387                if ifs.has_key(hname):
1388                    hosts.append("%s\t%s-%s %s-%d" % \
1389                            (ip_addr(base), hname, s.name, hname,
1390                                ifs[hname]))
1391                else:
1392                    ifs[hname] = 0
1393                    hosts.append("%s\t%s-%s %s-%d %s" % \
1394                            (ip_addr(base), hname, s.name, hname,
1395                                ifs[hname], hname))
1396
1397                ifs[hname] += 1
1398                base += 1
1399        return hosts, ips
1400
1401    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1402            tbparams, masters):
1403        """
1404        Request access to the various testbeds required for this instantiation
1405        (passed in as testbeds).  User, access_user, expoert_project and master
1406        are used to construct the correct requests.  Per-testbed parameters are
1407        returned in tbparams.
1408        """
1409        for tb in testbeds:
1410            self.get_access(tb, None, tbparams, access_user, masters)
1411            allocated[tb] = 1
1412
1413    def split_topology(self, top, topo, testbeds):
1414        """
1415        Create the sub-topologies that are needed for experiment instantiation.
1416        """
1417        for tb in testbeds:
1418            topo[tb] = top.clone()
1419            # copy in for loop allows deletions from the original
1420            for e in [ e for e in topo[tb].elements]:
1421                etb = e.get_attribute('testbed')
1422                # NB: elements without a testbed attribute won't appear in any
1423                # sub topologies. 
1424                if not etb or etb != tb:
1425                    for i in e.interface:
1426                        for s in i.subs:
1427                            try:
1428                                s.interfaces.remove(i)
1429                            except ValueError:
1430                                raise service_error(service_error.internal,
1431                                        "Can't remove interface??")
1432                    topo[tb].elements.remove(e)
1433            topo[tb].make_indices()
1434
1435    def wrangle_software(self, expid, top, topo, tbparams):
1436        """
1437        Copy software out to the repository directory, allocate permissions and
1438        rewrite the segment topologies to look for the software in local
1439        places.
1440        """
1441
1442        # Copy the rpms and tarfiles to a distribution directory from
1443        # which the federants can retrieve them
1444        linkpath = "%s/software" %  expid
1445        softdir ="%s/%s" % ( self.repodir, linkpath)
1446        softmap = { }
1447        # These are in a list of tuples format (each kit).  This comprehension
1448        # unwraps them into a single list of tuples that initilaizes the set of
1449        # tuples.
1450        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1451                for p, t in l ])
1452        pkgs.update([x.location for e in top.elements \
1453                for x in e.software])
1454        try:
1455            os.makedirs(softdir)
1456        except IOError, e:
1457            raise service_error(
1458                    "Cannot create software directory: %s" % e)
1459        # The actual copying.  Everything's converted into a url for copying.
1460        for pkg in pkgs:
1461            loc = pkg
1462
1463            scheme, host, path = urlparse(loc)[0:3]
1464            dest = os.path.basename(path)
1465            if not scheme:
1466                if not loc.startswith('/'):
1467                    loc = "/%s" % loc
1468                loc = "file://%s" %loc
1469            try:
1470                u = urlopen(loc)
1471            except Exception, e:
1472                raise service_error(service_error.req, 
1473                        "Cannot open %s: %s" % (loc, e))
1474            try:
1475                f = open("%s/%s" % (softdir, dest) , "w")
1476                self.log.debug("Writing %s/%s" % (softdir,dest) )
1477                data = u.read(4096)
1478                while data:
1479                    f.write(data)
1480                    data = u.read(4096)
1481                f.close()
1482                u.close()
1483            except Exception, e:
1484                raise service_error(service_error.internal,
1485                        "Could not copy %s: %s" % (loc, e))
1486            path = re.sub("/tmp", "", linkpath)
1487            # XXX
1488            softmap[pkg] = \
1489                    "%s/%s/%s" %\
1490                    ( self.repo_url, path, dest)
1491
1492            # Allow the individual segments to access the software.
1493            for tb in tbparams.keys():
1494                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1495                        "/%s/%s" % ( path, dest))
1496
1497        # Convert the software locations in the segments into the local
1498        # copies on this host
1499        for soft in [ s for tb in topo.values() \
1500                for e in tb.elements \
1501                    if getattr(e, 'software', False) \
1502                        for s in e.software ]:
1503            if softmap.has_key(soft.location):
1504                soft.location = softmap[soft.location]
1505
1506
1507    def new_experiment(self, req, fid):
1508        """
1509        The external interface to empty initial experiment creation called from
1510        the dispatcher.
1511
1512        Creates a working directory, splits the incoming description using the
1513        splitter script and parses out the avrious subsections using the
1514        lcasses above.  Once each sub-experiment is created, use pooled threads
1515        to instantiate them and start it all up.
1516        """
1517        if not self.auth.check_attribute(fid, 'new'):
1518            raise service_error(service_error.access, "New access denied")
1519
1520        try:
1521            tmpdir = tempfile.mkdtemp(prefix="split-")
1522        except IOError:
1523            raise service_error(service_error.internal, "Cannot create tmp dir")
1524
1525        try:
1526            access_user = self.accessdb[fid]
1527        except KeyError:
1528            raise service_error(service_error.internal,
1529                    "Access map and authorizer out of sync in " + \
1530                            "new_experiment for fedid %s"  % fid)
1531
1532        pid = "dummy"
1533        gid = "dummy"
1534
1535        req = req.get('NewRequestBody', None)
1536        if not req:
1537            raise service_error(service_error.req,
1538                    "Bad request format (no NewRequestBody)")
1539
1540        # Generate an ID for the experiment (slice) and a certificate that the
1541        # allocator can use to prove they own it.  We'll ship it back through
1542        # the encrypted connection.
1543        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1544
1545        #now we're done with the tmpdir, and it should be empty
1546        if self.cleanup:
1547            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1548            os.rmdir(tmpdir)
1549        else:
1550            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1551
1552        eid = self.create_experiment_state(fid, req, expid, expcert, 
1553                state='empty')
1554
1555        # Let users touch the state
1556        self.auth.set_attribute(fid, expid)
1557        self.auth.set_attribute(expid, expid)
1558        # Override fedids can manipulate state as well
1559        for o in self.overrides:
1560            self.auth.set_attribute(o, expid)
1561
1562        rv = {
1563                'experimentID': [
1564                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1565                ],
1566                'experimentStatus': 'empty',
1567                'experimentAccess': { 'X509' : expcert }
1568            }
1569
1570        return rv
1571
1572    def create_experiment(self, req, fid):
1573        """
1574        The external interface to experiment creation called from the
1575        dispatcher.
1576
1577        Creates a working directory, splits the incoming description using the
1578        splitter script and parses out the various subsections using the
1579        classes above.  Once each sub-experiment is created, use pooled threads
1580        to instantiate them and start it all up.
1581        """
1582
1583        req = req.get('CreateRequestBody', None)
1584        if not req:
1585            raise service_error(service_error.req,
1586                    "Bad request format (no CreateRequestBody)")
1587
1588        # Get the experiment access
1589        exp = req.get('experimentID', None)
1590        if exp:
1591            if exp.has_key('fedid'):
1592                key = exp['fedid']
1593                expid = key
1594                eid = None
1595            elif exp.has_key('localname'):
1596                key = exp['localname']
1597                eid = key
1598                expid = None
1599            else:
1600                raise service_error(service_error.req, "Unknown lookup type")
1601        else:
1602            raise service_error(service_error.req, "No request?")
1603
1604        self.check_experiment_access(fid, key)
1605
1606        try:
1607            tmpdir = tempfile.mkdtemp(prefix="split-")
1608            os.mkdir(tmpdir+"/keys")
1609        except IOError:
1610            raise service_error(service_error.internal, "Cannot create tmp dir")
1611
1612        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1613        gw_secretkey_base = "fed.%s" % self.ssh_type
1614        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1615        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1616        tclfile = tmpdir + "/experiment.tcl"
1617        tbparams = { }
1618        try:
1619            access_user = self.accessdb[fid]
1620        except KeyError:
1621            raise service_error(service_error.internal,
1622                    "Access map and authorizer out of sync in " + \
1623                            "create_experiment for fedid %s"  % fid)
1624
1625        pid = "dummy"
1626        gid = "dummy"
1627
1628        # The tcl parser needs to read a file so put the content into that file
1629        descr=req.get('experimentdescription', None)
1630        if descr:
1631            file_content=descr.get('ns2description', None)
1632            if file_content:
1633                try:
1634                    f = open(tclfile, 'w')
1635                    f.write(file_content)
1636                    f.close()
1637                except IOError:
1638                    raise service_error(service_error.internal,
1639                            "Cannot write temp experiment description")
1640            else:
1641                raise service_error(service_error.req, 
1642                        "Only ns2descriptions supported")
1643        else:
1644            raise service_error(service_error.req, "No experiment description")
1645
1646        self.state_lock.acquire()
1647        if self.state.has_key(key):
1648            self.state[key]['experimentStatus'] = "starting"
1649            for e in self.state[key].get('experimentID',[]):
1650                if not expid and e.has_key('fedid'):
1651                    expid = e['fedid']
1652                elif not eid and e.has_key('localname'):
1653                    eid = e['localname']
1654        self.state_lock.release()
1655
1656        if not (eid and expid):
1657            raise service_error(service_error.internal, 
1658                    "Cannot find local experiment info!?")
1659
1660        try: 
1661            # This catches exceptions to clear the placeholder if necessary
1662            try:
1663                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1664            except ValueError:
1665                raise service_error(service_error.server_config, 
1666                        "Bad key type (%s)" % self.ssh_type)
1667
1668            # Copy the service request
1669            tb_services = [ s for s in req.get('service',[]) ]
1670            # Translate to topdl
1671            if self.splitter_url:
1672                self.log.debug("Calling remote topdl translator at %s" % \
1673                        self.splitter_url)
1674                top = self.remote_ns2topdl(self.splitter_url, file_content)
1675            else:
1676                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1677                    str(self.muxmax), '-m', 'dummy']
1678
1679                tclcmd.extend([pid, gid, eid, tclfile])
1680
1681                self.log.debug("running local splitter %s", " ".join(tclcmd))
1682                # This is just fantastic.  As a side effect the parser copies
1683                # tb_compat.tcl into the current directory, so that directory
1684                # must be writable by the fedd user.  Doing this in the
1685                # temporary subdir ensures this is the case.
1686                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1687                        cwd=tmpdir)
1688                split_data = tclparser.stdout
1689
1690                top = topdl.topology_from_xml(file=split_data, top="experiment")
1691
1692            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1693            # Find the testbeds to look up
1694            testbeds = set([ a.value for e in top.elements \
1695                    for a in e.attribute \
1696                        if a.attribute == 'testbed'])
1697
1698            masters = { }           # testbeds exporting services
1699            for s in tb_services:
1700                # If this is a project_export request with the importall field
1701                # set, fill it out.
1702
1703                if s.get('importall', False):
1704                    s['import'] = [ tb for tb in testbeds \
1705                            if tb not in s.get('export',[])]
1706                    del s['importall']
1707
1708                # Add the service to masters
1709                for tb in s.get('export', []):
1710                    if s.get('name', None):
1711                        if tb not in masters:
1712                            masters[tb] = [ ]
1713
1714                        params = { }
1715                        if 'fedAttr' in s:
1716                            for a in s['fedAttr']:
1717                                params[a.get('attribute', '')] = \
1718                                        a.get('value','')
1719
1720                        masters[tb].append(federated_service(name=s['name'],
1721                                exporter=tb, importers=s.get('import',[]),
1722                                params=params, reqs=[]))
1723                    else:
1724                        self.log.error('Testbed service does not have name " + \
1725                                "and importers')
1726
1727
1728            allocated = { }         # Testbeds we can access
1729            topo ={ }               # Sub topologies
1730            connInfo = { }          # Connection information
1731            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1732                    tbparams, masters)
1733
1734            self.split_topology(top, topo, testbeds)
1735
1736            # Copy configuration files into the remote file store
1737            # The config urlpath
1738            configpath = "/%s/config" % expid
1739            # The config file system location
1740            configdir ="%s%s" % ( self.repodir, configpath)
1741            try:
1742                os.makedirs(configdir)
1743            except EnvironmentError, e:
1744                raise service_error(service_error.internal,
1745                        "Cannot create config directory: %s" % e)
1746            try:
1747                f = open("%s/hosts" % configdir, "w")
1748                f.write('\n'.join(hosts))
1749                f.close()
1750            except IOError, e:
1751                raise service_error(service_error.internal, 
1752                        "Cannot write hosts file: %s" % e)
1753            try:
1754                copy_file("%s" % gw_pubkey, "%s/%s" % \
1755                        (configdir, gw_pubkey_base))
1756                copy_file("%s" % gw_secretkey, "%s/%s" % \
1757                        (configdir, gw_secretkey_base))
1758            except IOError, e:
1759                raise service_error(service_error.internal, 
1760                        "Cannot copy keyfiles: %s" % e)
1761
1762            # Allow the individual testbeds to access the configuration files.
1763            for tb in tbparams.keys():
1764                asignee = tbparams[tb]['allocID']['fedid']
1765                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1766                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1767
1768            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1769                    self.muxmax, self.direct_transit)
1770            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1771                    connInfo, expid)
1772            # Now get access to the dynamic testbeds (those added above)
1773            for tb in [ t for t in topo if t not in allocated]:
1774                self.get_access(tb, None, tbparams, access_user, masters)
1775                allocated[tb] = 1
1776                store_keys = topo[tb].get_attribute('store_keys')
1777                # Give the testbed access to keys it exports or imports
1778                if store_keys:
1779                    for sk in store_keys.split(" "):
1780                        self.auth.set_attribute(\
1781                                tbparams[tb]['allocID']['fedid'], sk)
1782
1783            self.wrangle_software(expid, top, topo, tbparams)
1784
1785            vtopo = topdl.topology_to_vtopo(top)
1786            vis = self.genviz(vtopo)
1787
1788            # save federant information
1789            for k in allocated.keys():
1790                tbparams[k]['federant'] = {
1791                        'name': [ { 'localname' : eid} ],
1792                        'allocID' : tbparams[k]['allocID'],
1793                        'uri': tbparams[k]['uri'],
1794                    }
1795
1796            self.state_lock.acquire()
1797            self.state[eid]['vtopo'] = vtopo
1798            self.state[eid]['vis'] = vis
1799            self.state[eid]['experimentdescription'] = \
1800                    { 'topdldescription': top.to_dict() }
1801            self.state[eid]['federant'] = \
1802                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1803                        if tbparams[tb].has_key('federant') ]
1804            if self.state_filename: 
1805                self.write_state()
1806            self.state_lock.release()
1807        except service_error, e:
1808            # If something goes wrong in the parse (usually an access error)
1809            # clear the placeholder state.  From here on out the code delays
1810            # exceptions.  Failing at this point returns a fault to the remote
1811            # caller.
1812
1813            self.state_lock.acquire()
1814            del self.state[eid]
1815            del self.state[expid]
1816            if self.state_filename: self.write_state()
1817            self.state_lock.release()
1818            raise e
1819
1820
1821        # Start the background swapper and return the starting state.  From
1822        # here on out, the state will stick around a while.
1823
1824        # Let users touch the state
1825        self.auth.set_attribute(fid, expid)
1826        self.auth.set_attribute(expid, expid)
1827        # Override fedids can manipulate state as well
1828        for o in self.overrides:
1829            self.auth.set_attribute(o, expid)
1830
1831        # Create a logger that logs to the experiment's state object as well as
1832        # to the main log file.
1833        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1834        alloc_collector = self.list_log(self.state[eid]['log'])
1835        h = logging.StreamHandler(alloc_collector)
1836        # XXX: there should be a global one of these rather than repeating the
1837        # code.
1838        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1839                    '%d %b %y %H:%M:%S'))
1840        alloc_log.addHandler(h)
1841       
1842        attrs = [ 
1843                {
1844                    'attribute': 'ssh_pubkey', 
1845                    'value': '%s/%s/config/%s' % \
1846                            (self.repo_url, expid, gw_pubkey_base)
1847                },
1848                {
1849                    'attribute': 'ssh_secretkey', 
1850                    'value': '%s/%s/config/%s' % \
1851                            (self.repo_url, expid, gw_secretkey_base)
1852                },
1853                {
1854                    'attribute': 'hosts', 
1855                    'value': '%s/%s/config/hosts' % \
1856                            (self.repo_url, expid)
1857                },
1858            ]
1859
1860        # transit and disconnected testbeds may not have a connInfo entry.
1861        # Fill in the blanks.
1862        for t in allocated.keys():
1863            if not connInfo.has_key(t):
1864                connInfo[t] = { }
1865
1866        # Start a thread to do the resource allocation
1867        t  = Thread(target=self.allocate_resources,
1868                args=(allocated, masters, eid, expid, tbparams, 
1869                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1870                    connInfo),
1871                name=eid)
1872        t.start()
1873
1874        rv = {
1875                'experimentID': [
1876                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1877                ],
1878                'experimentStatus': 'starting',
1879            }
1880
1881        return rv
1882   
1883    def get_experiment_fedid(self, key):
1884        """
1885        find the fedid associated with the localname key in the state database.
1886        """
1887
1888        rv = None
1889        self.state_lock.acquire()
1890        if self.state.has_key(key):
1891            if isinstance(self.state[key], dict):
1892                try:
1893                    kl = [ f['fedid'] for f in \
1894                            self.state[key]['experimentID']\
1895                                if f.has_key('fedid') ]
1896                except KeyError:
1897                    self.state_lock.release()
1898                    raise service_error(service_error.internal, 
1899                            "No fedid for experiment %s when getting "+\
1900                                    "fedid(!?)" % key)
1901                if len(kl) == 1:
1902                    rv = kl[0]
1903                else:
1904                    self.state_lock.release()
1905                    raise service_error(service_error.internal, 
1906                            "multiple fedids for experiment %s when " +\
1907                                    "getting fedid(!?)" % key)
1908            else:
1909                self.state_lock.release()
1910                raise service_error(service_error.internal, 
1911                        "Unexpected state for %s" % key)
1912        self.state_lock.release()
1913        return rv
1914
1915    def check_experiment_access(self, fid, key):
1916        """
1917        Confirm that the fid has access to the experiment.  Though a request
1918        may be made in terms of a local name, the access attribute is always
1919        the experiment's fedid.
1920        """
1921        if not isinstance(key, fedid):
1922            key = self.get_experiment_fedid(key)
1923
1924        if self.auth.check_attribute(fid, key):
1925            return True
1926        else:
1927            raise service_error(service_error.access, "Access Denied")
1928
1929
1930    def get_handler(self, path, fid):
1931        self.log.info("Get handler %s %s" % (path, fid))
1932        if self.auth.check_attribute(fid, path):
1933            return ("%s/%s" % (self.repodir, path), "application/binary")
1934        else:
1935            return (None, None)
1936
1937    def get_vtopo(self, req, fid):
1938        """
1939        Return the stored virtual topology for this experiment
1940        """
1941        rv = None
1942        state = None
1943
1944        req = req.get('VtopoRequestBody', None)
1945        if not req:
1946            raise service_error(service_error.req,
1947                    "Bad request format (no VtopoRequestBody)")
1948        exp = req.get('experiment', None)
1949        if exp:
1950            if exp.has_key('fedid'):
1951                key = exp['fedid']
1952                keytype = "fedid"
1953            elif exp.has_key('localname'):
1954                key = exp['localname']
1955                keytype = "localname"
1956            else:
1957                raise service_error(service_error.req, "Unknown lookup type")
1958        else:
1959            raise service_error(service_error.req, "No request?")
1960
1961        self.check_experiment_access(fid, key)
1962
1963        self.state_lock.acquire()
1964        if self.state.has_key(key):
1965            if self.state[key].has_key('vtopo'):
1966                rv = { 'experiment' : {keytype: key },\
1967                        'vtopo': self.state[key]['vtopo'],\
1968                    }
1969            else:
1970                state = self.state[key]['experimentStatus']
1971        self.state_lock.release()
1972
1973        if rv: return rv
1974        else: 
1975            if state:
1976                raise service_error(service_error.partial, 
1977                        "Not ready: %s" % state)
1978            else:
1979                raise service_error(service_error.req, "No such experiment")
1980
1981    def get_vis(self, req, fid):
1982        """
1983        Return the stored visualization for this experiment
1984        """
1985        rv = None
1986        state = None
1987
1988        req = req.get('VisRequestBody', None)
1989        if not req:
1990            raise service_error(service_error.req,
1991                    "Bad request format (no VisRequestBody)")
1992        exp = req.get('experiment', None)
1993        if exp:
1994            if exp.has_key('fedid'):
1995                key = exp['fedid']
1996                keytype = "fedid"
1997            elif exp.has_key('localname'):
1998                key = exp['localname']
1999                keytype = "localname"
2000            else:
2001                raise service_error(service_error.req, "Unknown lookup type")
2002        else:
2003            raise service_error(service_error.req, "No request?")
2004
2005        self.check_experiment_access(fid, key)
2006
2007        self.state_lock.acquire()
2008        if self.state.has_key(key):
2009            if self.state[key].has_key('vis'):
2010                rv =  { 'experiment' : {keytype: key },\
2011                        'vis': self.state[key]['vis'],\
2012                        }
2013            else:
2014                state = self.state[key]['experimentStatus']
2015        self.state_lock.release()
2016
2017        if rv: return rv
2018        else:
2019            if state:
2020                raise service_error(service_error.partial, 
2021                        "Not ready: %s" % state)
2022            else:
2023                raise service_error(service_error.req, "No such experiment")
2024
2025    def clean_info_response(self, rv):
2026        """
2027        Remove the information in the experiment's state object that is not in
2028        the info response.
2029        """
2030        # Remove the owner info (should always be there, but...)
2031        if rv.has_key('owner'): del rv['owner']
2032
2033        # Convert the log into the allocationLog parameter and remove the
2034        # log entry (with defensive programming)
2035        if rv.has_key('log'):
2036            rv['allocationLog'] = "".join(rv['log'])
2037            del rv['log']
2038        else:
2039            rv['allocationLog'] = ""
2040
2041        if rv['experimentStatus'] != 'active':
2042            if rv.has_key('federant'): del rv['federant']
2043        else:
2044            # remove the allocationID and uri info from each federant
2045            for f in rv.get('federant', []):
2046                if f.has_key('allocID'): del f['allocID']
2047                if f.has_key('uri'): del f['uri']
2048
2049        return rv
2050
2051    def get_info(self, req, fid):
2052        """
2053        Return all the stored info about this experiment
2054        """
2055        rv = None
2056
2057        req = req.get('InfoRequestBody', None)
2058        if not req:
2059            raise service_error(service_error.req,
2060                    "Bad request format (no InfoRequestBody)")
2061        exp = req.get('experiment', None)
2062        if exp:
2063            if exp.has_key('fedid'):
2064                key = exp['fedid']
2065                keytype = "fedid"
2066            elif exp.has_key('localname'):
2067                key = exp['localname']
2068                keytype = "localname"
2069            else:
2070                raise service_error(service_error.req, "Unknown lookup type")
2071        else:
2072            raise service_error(service_error.req, "No request?")
2073
2074        self.check_experiment_access(fid, key)
2075
2076        # The state may be massaged by the service function that called
2077        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2078        # state.
2079        self.state_lock.acquire()
2080        if self.state.has_key(key):
2081            rv = copy.deepcopy(self.state[key])
2082        self.state_lock.release()
2083
2084        if rv:
2085            return self.clean_info_response(rv)
2086        else:
2087            raise service_error(service_error.req, "No such experiment")
2088
2089    def get_multi_info(self, req, fid):
2090        """
2091        Return all the stored info that this fedid can access
2092        """
2093        rv = { 'info': [ ] }
2094
2095        self.state_lock.acquire()
2096        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2097            try:
2098                self.check_experiment_access(fid, key)
2099            except service_error, e:
2100                if e.code == service_error.access:
2101                    continue
2102                else:
2103                    self.state_lock.release()
2104                    raise e
2105
2106            if self.state.has_key(key):
2107                e = copy.deepcopy(self.state[key])
2108                e = self.clean_info_response(e)
2109                rv['info'].append(e)
2110        self.state_lock.release()
2111        return rv
2112
2113    def terminate_experiment(self, req, fid):
2114        """
2115        Swap this experiment out on the federants and delete the shared
2116        information
2117        """
2118        tbparams = { }
2119        req = req.get('TerminateRequestBody', None)
2120        if not req:
2121            raise service_error(service_error.req,
2122                    "Bad request format (no TerminateRequestBody)")
2123        force = req.get('force', False)
2124        exp = req.get('experiment', None)
2125        if exp:
2126            if exp.has_key('fedid'):
2127                key = exp['fedid']
2128                keytype = "fedid"
2129            elif exp.has_key('localname'):
2130                key = exp['localname']
2131                keytype = "localname"
2132            else:
2133                raise service_error(service_error.req, "Unknown lookup type")
2134        else:
2135            raise service_error(service_error.req, "No request?")
2136
2137        self.check_experiment_access(fid, key)
2138
2139        dealloc_list = [ ]
2140
2141
2142        # Create a logger that logs to the dealloc_list as well as to the main
2143        # log file.
2144        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2145        h = logging.StreamHandler(self.list_log(dealloc_list))
2146        # XXX: there should be a global one of these rather than repeating the
2147        # code.
2148        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2149                    '%d %b %y %H:%M:%S'))
2150        dealloc_log.addHandler(h)
2151
2152        self.state_lock.acquire()
2153        fed_exp = self.state.get(key, None)
2154
2155        if fed_exp:
2156            # This branch of the conditional holds the lock to generate a
2157            # consistent temporary tbparams variable to deallocate experiments.
2158            # It releases the lock to do the deallocations and reacquires it to
2159            # remove the experiment state when the termination is complete.
2160
2161            # First make sure that the experiment creation is complete.
2162            status = fed_exp.get('experimentStatus', None)
2163
2164            if status:
2165                if status in ('starting', 'terminating'):
2166                    if not force:
2167                        self.state_lock.release()
2168                        raise service_error(service_error.partial, 
2169                                'Experiment still being created or destroyed')
2170                    else:
2171                        self.log.warning('Experiment in %s state ' % status + \
2172                                'being terminated by force.')
2173            else:
2174                # No status??? trouble
2175                self.state_lock.release()
2176                raise service_error(service_error.internal,
2177                        "Experiment has no status!?")
2178
2179            ids = []
2180            #  experimentID is a list of dicts that are self-describing
2181            #  identifiers.  This finds all the fedids and localnames - the
2182            #  keys of self.state - and puts them into ids.
2183            for id in fed_exp.get('experimentID', []):
2184                if id.has_key('fedid'): ids.append(id['fedid'])
2185                if id.has_key('localname'): ids.append(id['localname'])
2186
2187            # Collect the allocation/segment ids into a dict keyed by the fedid
2188            # of the allocation (or a monotonically increasing integer) that
2189            # contains a tuple of uri, aid (which is a dict...)
2190            for i, fed in enumerate(fed_exp.get('federant', [])):
2191                try:
2192                    uri = fed['uri']
2193                    aid = fed['allocID']
2194                    k = fed['allocID'].get('fedid', i)
2195                except KeyError, e:
2196                    continue
2197                tbparams[k] = (uri, aid)
2198            fed_exp['experimentStatus'] = 'terminating'
2199            if self.state_filename: self.write_state()
2200            self.state_lock.release()
2201
2202            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2203            # then completes, so we can't wait if nothing starts.  So, no
2204            # tbparams, no start.
2205            if len(tbparams) > 0:
2206                thread_pool = self.thread_pool(self.nthreads)
2207                for k in tbparams.keys():
2208                    # Create and start a thread to stop the segment
2209                    thread_pool.wait_for_slot()
2210                    uri, aid = tbparams[k]
2211                    t  = self.pooled_thread(\
2212                            target=self.terminate_segment(log=dealloc_log,
2213                                testbed=uri,
2214                                cert_file=self.cert_file, 
2215                                cert_pwd=self.cert_pwd,
2216                                trusted_certs=self.trusted_certs,
2217                                caller=self.call_TerminateSegment),
2218                            args=(uri, aid), name=k,
2219                            pdata=thread_pool, trace_file=self.trace_file)
2220                    t.start()
2221                # Wait for completions
2222                thread_pool.wait_for_all_done()
2223
2224            # release the allocations (failed experiments have done this
2225            # already, and starting experiments may be in odd states, so we
2226            # ignore errors releasing those allocations
2227            try: 
2228                for k in tbparams.keys():
2229                    # This releases access by uri
2230                    uri, aid = tbparams[k]
2231                    self.release_access(None, aid, uri=uri)
2232            except service_error, e:
2233                if status != 'failed' and not force:
2234                    raise e
2235
2236            # Remove the terminated experiment
2237            self.state_lock.acquire()
2238            for id in ids:
2239                if self.state.has_key(id): del self.state[id]
2240
2241            if self.state_filename: self.write_state()
2242            self.state_lock.release()
2243
2244            # Delete any synch points associated with this experiment.  All
2245            # synch points begin with the fedid of the experiment.
2246            fedid_keys = set(["fedid:%s" % f for f in ids \
2247                    if isinstance(f, fedid)])
2248            for k in self.synch_store.all_keys():
2249                try:
2250                    if len(k) > 45 and k[0:46] in fedid_keys:
2251                        self.synch_store.del_value(k)
2252                except synch_store.BadDeletionError:
2253                    pass
2254            self.write_store()
2255               
2256            return { 
2257                    'experiment': exp , 
2258                    'deallocationLog': "".join(dealloc_list),
2259                    }
2260        else:
2261            # Don't forget to release the lock
2262            self.state_lock.release()
2263            raise service_error(service_error.req, "No saved state")
2264
2265
2266    def GetValue(self, req, fid):
2267        """
2268        Get a value from the synchronized store
2269        """
2270        req = req.get('GetValueRequestBody', None)
2271        if not req:
2272            raise service_error(service_error.req,
2273                    "Bad request format (no GetValueRequestBody)")
2274       
2275        name = req['name']
2276        wait = req['wait']
2277        rv = { 'name': name }
2278
2279        if self.auth.check_attribute(fid, name):
2280            self.log.debug("[GetValue] asking for %s " % name)
2281            try:
2282                v = self.synch_store.get_value(name, wait)
2283            except synch_store.RevokedKeyError:
2284                # No more synch on this key
2285                raise service_error(service_error.federant, 
2286                        "Synch key %s revoked" % name)
2287            if v is not None:
2288                rv['value'] = v
2289            self.log.debug("[GetValue] got %s from %s" % (v, name))
2290            return rv
2291        else:
2292            raise service_error(service_error.access, "Access Denied")
2293       
2294
2295    def SetValue(self, req, fid):
2296        """
2297        Set a value in the synchronized store
2298        """
2299        req = req.get('SetValueRequestBody', None)
2300        if not req:
2301            raise service_error(service_error.req,
2302                    "Bad request format (no SetValueRequestBody)")
2303       
2304        name = req['name']
2305        v = req['value']
2306
2307        if self.auth.check_attribute(fid, name):
2308            try:
2309                self.synch_store.set_value(name, v)
2310                self.write_store()
2311                self.log.debug("[SetValue] set %s to %s" % (name, v))
2312            except synch_store.CollisionError:
2313                # Translate into a service_error
2314                raise service_error(service_error.req,
2315                        "Value already set: %s" %name)
2316            except synch_store.RevokedKeyError:
2317                # No more synch on this key
2318                raise service_error(service_error.federant, 
2319                        "Synch key %s revoked" % name)
2320            return { 'name': name, 'value': v }
2321        else:
2322            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.