source: fedd/federation/experiment_control.py @ 9a8cd92

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 9a8cd92 was b4b19c7, checked in by Ted Faber <faber@…>, 15 years ago

Get topology information into the info operation, as annotations of a topology description. This required adding such information to the start segment replies as well

  • Property mode set to 100644
File size: 80.4 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55class experiment_control_local:
56    """
57    Control of experiments that this system can directly access.
58
59    Includes experiment creation, termination and information dissemination.
60    Thred safe.
61    """
62
63    class ssh_cmd_timeout(RuntimeError): pass
64   
65    class thread_pool:
66        """
67        A class to keep track of a set of threads all invoked for the same
68        task.  Manages the mutual exclusion of the states.
69        """
70        def __init__(self, nthreads):
71            """
72            Start a pool.
73            """
74            self.changed = Condition()
75            self.started = 0
76            self.terminated = 0
77            self.nthreads = nthreads
78
79        def acquire(self):
80            """
81            Get the pool's lock.
82            """
83            self.changed.acquire()
84
85        def release(self):
86            """
87            Release the pool's lock.
88            """
89            self.changed.release()
90
91        def wait(self, timeout = None):
92            """
93            Wait for a pool thread to start or stop.
94            """
95            self.changed.wait(timeout)
96
97        def start(self):
98            """
99            Called by a pool thread to report starting.
100            """
101            self.changed.acquire()
102            self.started += 1
103            self.changed.notifyAll()
104            self.changed.release()
105
106        def terminate(self):
107            """
108            Called by a pool thread to report finishing.
109            """
110            self.changed.acquire()
111            self.terminated += 1
112            self.changed.notifyAll()
113            self.changed.release()
114
115        def clear(self):
116            """
117            Clear all pool data.
118            """
119            self.changed.acquire()
120            self.started = 0
121            self.terminated =0
122            self.changed.notifyAll()
123            self.changed.release()
124
125        def wait_for_slot(self):
126            """
127            Wait until we have a free slot to start another pooled thread
128            """
129            self.acquire()
130            while self.started - self.terminated >= self.nthreads:
131                self.wait()
132            self.release()
133
134        def wait_for_all_done(self, timeout=None):
135            """
136            Wait until all active threads finish (and at least one has
137            started).  If a timeout is given, return after waiting that long
138            for termination.  If all threads are done (and one has started in
139            the since the last clear()) return True, otherwise False.
140            """
141            if timeout:
142                deadline = time.time() + timeout
143            self.acquire()
144            while self.started == 0 or self.started > self.terminated:
145                self.wait(timeout)
146                if timeout:
147                    if time.time() > deadline:
148                        break
149                    timeout = deadline - time.time()
150            self.release()
151            return not (self.started == 0 or self.started > self.terminated)
152
153    class pooled_thread(Thread):
154        """
155        One of a set of threads dedicated to a specific task.  Uses the
156        thread_pool class above for coordination.
157        """
158        def __init__(self, group=None, target=None, name=None, args=(), 
159                kwargs={}, pdata=None, trace_file=None):
160            Thread.__init__(self, group, target, name, args, kwargs)
161            self.rv = None          # Return value of the ops in this thread
162            self.exception = None   # Exception that terminated this thread
163            self.target=target      # Target function to run on start()
164            self.args = args        # Args to pass to target
165            self.kwargs = kwargs    # Additional kw args
166            self.pdata = pdata      # thread_pool for this class
167            # Logger for this thread
168            self.log = logging.getLogger("fedd.experiment_control")
169       
170        def run(self):
171            """
172            Emulate Thread.run, except add pool data manipulation and error
173            logging.
174            """
175            if self.pdata:
176                self.pdata.start()
177
178            if self.target:
179                try:
180                    self.rv = self.target(*self.args, **self.kwargs)
181                except service_error, s:
182                    self.exception = s
183                    self.log.error("Thread exception: %s %s" % \
184                            (s.code_string(), s.desc))
185                except:
186                    self.exception = sys.exc_info()[1]
187                    self.log.error(("Unexpected thread exception: %s" +\
188                            "Trace %s") % (self.exception,\
189                                traceback.format_exc()))
190            if self.pdata:
191                self.pdata.terminate()
192
193    call_RequestAccess = service_caller('RequestAccess')
194    call_ReleaseAccess = service_caller('ReleaseAccess')
195    call_StartSegment = service_caller('StartSegment')
196    call_TerminateSegment = service_caller('TerminateSegment')
197    call_Ns2Topdl = service_caller('Ns2Topdl')
198
199    def __init__(self, config=None, auth=None):
200        """
201        Intialize the various attributes, most from the config object
202        """
203
204        def parse_tarfile_list(tf):
205            """
206            Parse a tarfile list from the configuration.  This is a set of
207            paths and tarfiles separated by spaces.
208            """
209            rv = [ ]
210            if tf is not None:
211                tl = tf.split()
212                while len(tl) > 1:
213                    p, t = tl[0:2]
214                    del tl[0:2]
215                    rv.append((p, t))
216            return rv
217
218        self.thread_with_rv = experiment_control_local.pooled_thread
219        self.thread_pool = experiment_control_local.thread_pool
220        self.list_log = list_log.list_log
221
222        self.cert_file = config.get("experiment_control", "cert_file")
223        if self.cert_file:
224            self.cert_pwd = config.get("experiment_control", "cert_pwd")
225        else:
226            self.cert_file = config.get("globals", "cert_file")
227            self.cert_pwd = config.get("globals", "cert_pwd")
228
229        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
230                or config.get("globals", "trusted_certs")
231
232        self.repodir = config.get("experiment_control", "repodir")
233        self.repo_url = config.get("experiment_control", "repo_url", 
234                "https://users.isi.deterlab.net:23235");
235
236        self.exp_stem = "fed-stem"
237        self.log = logging.getLogger("fedd.experiment_control")
238        set_log_level(config, "experiment_control", self.log)
239        self.muxmax = 2
240        self.nthreads = 10
241        self.randomize_experiments = False
242
243        self.splitter = None
244        self.ssh_keygen = "/usr/bin/ssh-keygen"
245        self.ssh_identity_file = None
246
247
248        self.debug = config.getboolean("experiment_control", "create_debug")
249        self.cleanup = not config.getboolean("experiment_control", 
250                "leave_tmpfiles")
251        self.state_filename = config.get("experiment_control", 
252                "experiment_state")
253        self.store_filename = config.get("experiment_control", 
254                "synch_store")
255        self.store_url = config.get("experiment_control", "store_url")
256        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
257        self.fedkit = parse_tarfile_list(\
258                config.get("experiment_control", "fedkit"))
259        self.gatewaykit = parse_tarfile_list(\
260                config.get("experiment_control", "gatewaykit"))
261        accessdb_file = config.get("experiment_control", "accessdb")
262
263        self.ssh_pubkey_file = config.get("experiment_control", 
264                "ssh_pubkey_file")
265        self.ssh_privkey_file = config.get("experiment_control",
266                "ssh_privkey_file")
267        # NB for internal master/slave ops, not experiment setup
268        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
269
270        self.overrides = set([])
271        ovr = config.get('experiment_control', 'overrides')
272        if ovr:
273            for o in ovr.split(","):
274                o = o.strip()
275                if o.startswith('fedid:'): o = o[len('fedid:'):]
276                self.overrides.add(fedid(hexstr=o))
277
278        self.state = { }
279        self.state_lock = Lock()
280        self.tclsh = "/usr/local/bin/otclsh"
281        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
282                config.get("experiment_control", "tcl_splitter",
283                        "/usr/testbed/lib/ns2ir/parse.tcl")
284        mapdb_file = config.get("experiment_control", "mapdb")
285        self.trace_file = sys.stderr
286
287        self.def_expstart = \
288                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
289                "/tmp/federate";
290        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
291                "FEDDIR/hosts";
292        self.def_gwstart = \
293                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
294                "/tmp/bridge.log";
295        self.def_mgwstart = \
296                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
297                "/tmp/bridge.log";
298        self.def_gwimage = "FBSD61-TUNNEL2";
299        self.def_gwtype = "pc";
300        self.local_access = { }
301
302        if auth:
303            self.auth = auth
304        else:
305            self.log.error(\
306                    "[access]: No authorizer initialized, creating local one.")
307            auth = authorizer()
308
309
310        if self.ssh_pubkey_file:
311            try:
312                f = open(self.ssh_pubkey_file, 'r')
313                self.ssh_pubkey = f.read()
314                f.close()
315            except IOError:
316                raise service_error(service_error.internal,
317                        "Cannot read sshpubkey")
318        else:
319            raise service_error(service_error.internal, 
320                    "No SSH public key file?")
321
322        if not self.ssh_privkey_file:
323            raise service_error(service_error.internal, 
324                    "No SSH public key file?")
325
326
327        if mapdb_file:
328            self.read_mapdb(mapdb_file)
329        else:
330            self.log.warn("[experiment_control] No testbed map, using defaults")
331            self.tbmap = { 
332                    'deter':'https://users.isi.deterlab.net:23235',
333                    'emulab':'https://users.isi.deterlab.net:23236',
334                    'ucb':'https://users.isi.deterlab.net:23237',
335                    }
336
337        if accessdb_file:
338                self.read_accessdb(accessdb_file)
339        else:
340            raise service_error(service_error.internal,
341                    "No accessdb specified in config")
342
343        # Grab saved state.  OK to do this w/o locking because it's read only
344        # and only one thread should be in existence that can see self.state at
345        # this point.
346        if self.state_filename:
347            self.read_state()
348
349        if self.store_filename:
350            self.read_store()
351        else:
352            self.log.warning("No saved synch store")
353            self.synch_store = synch_store
354
355        # Dispatch tables
356        self.soap_services = {\
357                'New': soap_handler('New', self.new_experiment),
358                'Create': soap_handler('Create', self.create_experiment),
359                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
360                'Vis': soap_handler('Vis', self.get_vis),
361                'Info': soap_handler('Info', self.get_info),
362                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
363                'Terminate': soap_handler('Terminate', 
364                    self.terminate_experiment),
365                'GetValue': soap_handler('GetValue', self.GetValue),
366                'SetValue': soap_handler('SetValue', self.SetValue),
367        }
368
369        self.xmlrpc_services = {\
370                'New': xmlrpc_handler('New', self.new_experiment),
371                'Create': xmlrpc_handler('Create', self.create_experiment),
372                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
373                'Vis': xmlrpc_handler('Vis', self.get_vis),
374                'Info': xmlrpc_handler('Info', self.get_info),
375                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
376                'Terminate': xmlrpc_handler('Terminate',
377                    self.terminate_experiment),
378                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
379                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
380        }
381
382    # Call while holding self.state_lock
383    def write_state(self):
384        """
385        Write a new copy of experiment state after copying the existing state
386        to a backup.
387
388        State format is a simple pickling of the state dictionary.
389        """
390        if os.access(self.state_filename, os.W_OK):
391            copy_file(self.state_filename, \
392                    "%s.bak" % self.state_filename)
393        try:
394            f = open(self.state_filename, 'w')
395            pickle.dump(self.state, f)
396        except IOError, e:
397            self.log.error("Can't write file %s: %s" % \
398                    (self.state_filename, e))
399        except pickle.PicklingError, e:
400            self.log.error("Pickling problem: %s" % e)
401        except TypeError, e:
402            self.log.error("Pickling problem (TypeError): %s" % e)
403
404    @staticmethod
405    def get_alloc_ids(state):
406        """
407        Pull the fedids of the identifiers of each allocation from the
408        state.  Again, a dict dive that's best isolated.
409
410        Used by read_store and read state
411        """
412
413        return [ f['allocID']['fedid'] 
414                for f in state.get('federant',[]) \
415                    if f.has_key('allocID') and \
416                        f['allocID'].has_key('fedid')]
417
418    # Call while holding self.state_lock
419    def read_state(self):
420        """
421        Read a new copy of experiment state.  Old state is overwritten.
422
423        State format is a simple pickling of the state dictionary.
424        """
425       
426        def get_experiment_id(state):
427            """
428            Pull the fedid experimentID out of the saved state.  This is kind
429            of a gross walk through the dict.
430            """
431
432            if state.has_key('experimentID'):
433                for e in state['experimentID']:
434                    if e.has_key('fedid'):
435                        return e['fedid']
436                else:
437                    return None
438            else:
439                return None
440
441        try:
442            f = open(self.state_filename, "r")
443            self.state = pickle.load(f)
444            self.log.debug("[read_state]: Read state from %s" % \
445                    self.state_filename)
446        except IOError, e:
447            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
448                    % (self.state_filename, e))
449        except pickle.UnpicklingError, e:
450            self.log.warning(("[read_state]: No saved state: " + \
451                    "Unpickling failed: %s") % e)
452       
453        for s in self.state.values():
454            try:
455
456                eid = get_experiment_id(s)
457                if eid : 
458                    # Give the owner rights to the experiment
459                    self.auth.set_attribute(s['owner'], eid)
460                    # And holders of the eid as well
461                    self.auth.set_attribute(eid, eid)
462                    # allow overrides to control experiments as well
463                    for o in self.overrides:
464                        self.auth.set_attribute(o, eid)
465                    # Set permissions to allow reading of the software repo, if
466                    # any, as well.
467                    for a in self.get_alloc_ids(s):
468                        self.auth.set_attribute(a, 'repo/%s' % eid)
469                else:
470                    raise KeyError("No experiment id")
471            except KeyError, e:
472                self.log.warning("[read_state]: State ownership or identity " +\
473                        "misformatted in %s: %s" % (self.state_filename, e))
474
475
476    def read_accessdb(self, accessdb_file):
477        """
478        Read the mapping from fedids that can create experiments to their name
479        in the 3-level access namespace.  All will be asserted from this
480        testbed and can include the local username and porject that will be
481        asserted on their behalf by this fedd.  Each fedid is also added to the
482        authorization system with the "create" attribute.
483        """
484        self.accessdb = {}
485        # These are the regexps for parsing the db
486        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
487        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
488                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
489        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
490                "\s*->\s*(" + name_expr + ")\s*$")
491        lineno = 0
492
493        # Parse the mappings and store in self.authdb, a dict of
494        # fedid -> (proj, user)
495        try:
496            f = open(accessdb_file, "r")
497            for line in f:
498                lineno += 1
499                line = line.strip()
500                if len(line) == 0 or line.startswith('#'):
501                    continue
502                m = project_line.match(line)
503                if m:
504                    fid = fedid(hexstr=m.group(1))
505                    project, user = m.group(2,3)
506                    if not self.accessdb.has_key(fid):
507                        self.accessdb[fid] = []
508                    self.accessdb[fid].append((project, user))
509                    continue
510
511                m = user_line.match(line)
512                if m:
513                    fid = fedid(hexstr=m.group(1))
514                    project = None
515                    user = m.group(2)
516                    if not self.accessdb.has_key(fid):
517                        self.accessdb[fid] = []
518                    self.accessdb[fid].append((project, user))
519                    continue
520                self.log.warn("[experiment_control] Error parsing access " +\
521                        "db %s at line %d" %  (accessdb_file, lineno))
522        except IOError:
523            raise service_error(service_error.internal,
524                    "Error opening/reading %s as experiment " +\
525                            "control accessdb" %  accessdb_file)
526        f.close()
527
528        # Initialize the authorization attributes
529        for fid in self.accessdb.keys():
530            self.auth.set_attribute(fid, 'create')
531            self.auth.set_attribute(fid, 'new')
532
533    def read_mapdb(self, file):
534        """
535        Read a simple colon separated list of mappings for the
536        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
537        """
538
539        self.tbmap = { }
540        lineno =0
541        try:
542            f = open(file, "r")
543            for line in f:
544                lineno += 1
545                line = line.strip()
546                if line.startswith('#') or len(line) == 0:
547                    continue
548                try:
549                    label, url = line.split(':', 1)
550                    self.tbmap[label] = url
551                except ValueError, e:
552                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
553                            "map db: %s %s" % (lineno, line, e))
554        except IOError, e:
555            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
556                    "open %s: %s" % (file, e))
557        f.close()
558
559    def read_store(self):
560        try:
561            self.synch_store = synch_store()
562            self.synch_store.load(self.store_filename)
563            self.log.debug("[read_store]: Read store from %s" % \
564                    self.store_filename)
565        except IOError, e:
566            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
567                    % (self.state_filename, e))
568            self.synch_store = synch_store()
569
570        # Set the initial permissions on data in the store.  XXX: This ad hoc
571        # authorization attribute initialization is getting out of hand.
572        for k in self.synch_store.all_keys():
573            try:
574                if k.startswith('fedid:'):
575                    fid = fedid(hexstr=k[6:46])
576                    if self.state.has_key(fid):
577                        for a in self.get_alloc_ids(self.state[fid]):
578                            self.auth.set_attribute(a, k)
579            except ValueError, e:
580                self.log.warn("Cannot deduce permissions for %s" % k)
581
582
583    def write_store(self):
584        """
585        Write a new copy of synch_store after writing current state
586        to a backup.  We use the internal synch_store pickle method to avoid
587        incinsistent data.
588
589        State format is a simple pickling of the store.
590        """
591        if os.access(self.store_filename, os.W_OK):
592            copy_file(self.store_filename, \
593                    "%s.bak" % self.store_filename)
594        try:
595            self.synch_store.save(self.store_filename)
596        except IOError, e:
597            self.log.error("Can't write file %s: %s" % \
598                    (self.store_filename, e))
599        except TypeError, e:
600            self.log.error("Pickling problem (TypeError): %s" % e)
601
602       
603    def generate_ssh_keys(self, dest, type="rsa" ):
604        """
605        Generate a set of keys for the gateways to use to talk.
606
607        Keys are of type type and are stored in the required dest file.
608        """
609        valid_types = ("rsa", "dsa")
610        t = type.lower();
611        if t not in valid_types: raise ValueError
612        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
613
614        try:
615            trace = open("/dev/null", "w")
616        except IOError:
617            raise service_error(service_error.internal,
618                    "Cannot open /dev/null??");
619
620        # May raise CalledProcessError
621        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
622        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
623        if rv != 0:
624            raise service_error(service_error.internal, 
625                    "Cannot generate nonce ssh keys.  %s return code %d" \
626                            % (self.ssh_keygen, rv))
627
628    def gentopo(self, str):
629        """
630        Generate the topology dtat structure from the splitter's XML
631        representation of it.
632
633        The topology XML looks like:
634            <experiment>
635                <nodes>
636                    <node><vname></vname><ips>ip1:ip2</ips></node>
637                </nodes>
638                <lans>
639                    <lan>
640                        <vname></vname><vnode></vnode><ip></ip>
641                        <bandwidth></bandwidth><member>node:port</member>
642                    </lan>
643                </lans>
644        """
645        class topo_parse:
646            """
647            Parse the topology XML and create the dats structure.
648            """
649            def __init__(self):
650                # Typing of the subelements for data conversion
651                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
652                self.int_subelements = ( 'bandwidth',)
653                self.float_subelements = ( 'delay',)
654                # The final data structure
655                self.nodes = [ ]
656                self.lans =  [ ]
657                self.topo = { \
658                        'node': self.nodes,\
659                        'lan' : self.lans,\
660                    }
661                self.element = { }  # Current element being created
662                self.chars = ""     # Last text seen
663
664            def end_element(self, name):
665                # After each sub element the contents is added to the current
666                # element or to the appropriate list.
667                if name == 'node':
668                    self.nodes.append(self.element)
669                    self.element = { }
670                elif name == 'lan':
671                    self.lans.append(self.element)
672                    self.element = { }
673                elif name in self.str_subelements:
674                    self.element[name] = self.chars
675                    self.chars = ""
676                elif name in self.int_subelements:
677                    self.element[name] = int(self.chars)
678                    self.chars = ""
679                elif name in self.float_subelements:
680                    self.element[name] = float(self.chars)
681                    self.chars = ""
682
683            def found_chars(self, data):
684                self.chars += data.rstrip()
685
686
687        tp = topo_parse();
688        parser = xml.parsers.expat.ParserCreate()
689        parser.EndElementHandler = tp.end_element
690        parser.CharacterDataHandler = tp.found_chars
691
692        parser.Parse(str)
693
694        return tp.topo
695       
696
697    def genviz(self, topo):
698        """
699        Generate the visualization the virtual topology
700        """
701
702        neato = "/usr/local/bin/neato"
703        # These are used to parse neato output and to create the visualization
704        # file.
705        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
706        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
707                "%s</type></node>"
708
709        try:
710            # Node names
711            nodes = [ n['vname'] for n in topo['node'] ]
712            topo_lans = topo['lan']
713        except KeyError, e:
714            raise service_error(service_error.internal, "Bad topology: %s" %e)
715
716        lans = { }
717        links = { }
718
719        # Walk through the virtual topology, organizing the connections into
720        # 2-node connections (links) and more-than-2-node connections (lans).
721        # When a lan is created, it's added to the list of nodes (there's a
722        # node in the visualization for the lan).
723        for l in topo_lans:
724            if links.has_key(l['vname']):
725                if len(links[l['vname']]) < 2:
726                    links[l['vname']].append(l['vnode'])
727                else:
728                    nodes.append(l['vname'])
729                    lans[l['vname']] = links[l['vname']]
730                    del links[l['vname']]
731                    lans[l['vname']].append(l['vnode'])
732            elif lans.has_key(l['vname']):
733                lans[l['vname']].append(l['vnode'])
734            else:
735                links[l['vname']] = [ l['vnode'] ]
736
737
738        # Open up a temporary file for dot to turn into a visualization
739        try:
740            df, dotname = tempfile.mkstemp()
741            dotfile = os.fdopen(df, 'w')
742        except IOError:
743            raise service_error(service_error.internal,
744                    "Failed to open file in genviz")
745
746        try:
747            dnull = open('/dev/null', 'w')
748        except IOError:
749            service_error(service_error.internal,
750                    "Failed to open /dev/null in genviz")
751
752        # Generate a dot/neato input file from the links, nodes and lans
753        try:
754            print >>dotfile, "graph G {"
755            for n in nodes:
756                print >>dotfile, '\t"%s"' % n
757            for l in links.keys():
758                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
759            for l in lans.keys():
760                for n in lans[l]:
761                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
762            print >>dotfile, "}"
763            dotfile.close()
764        except TypeError:
765            raise service_error(service_error.internal,
766                    "Single endpoint link in vtopo")
767        except IOError:
768            raise service_error(service_error.internal, "Cannot write dot file")
769
770        # Use dot to create a visualization
771        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
772                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
773                close_fds=True)
774        dnull.close()
775
776        # Translate dot to vis format
777        vis_nodes = [ ]
778        vis = { 'node': vis_nodes }
779        for line in dot.stdout:
780            m = vis_re.match(line)
781            if m:
782                vn = m.group(1)
783                vis_node = {'name': vn, \
784                        'x': float(m.group(2)),\
785                        'y' : float(m.group(3)),\
786                    }
787                if vn in links.keys() or vn in lans.keys():
788                    vis_node['type'] = 'lan'
789                else:
790                    vis_node['type'] = 'node'
791                vis_nodes.append(vis_node)
792        rv = dot.wait()
793
794        os.remove(dotname)
795        if rv == 0 : return vis
796        else: return None
797
798    def get_access(self, tb, nodes, tbparam, access_user, masters):
799        """
800        Get access to testbed through fedd and set the parameters for that tb
801        """
802        def get_export_project(svcs):
803            """
804            Look through for the list of federated_service for this testbed
805            objects for a project_export service, and extract the project
806            parameter.
807            """
808
809            pe = [s for s in svcs if s.name=='project_export']
810            if len(pe) == 1:
811                return pe[0].params.get('project', None)
812            elif len(pe) == 0:
813                return None
814            else:
815                raise service_error(service_error.req,
816                        "More than one project export is not supported")
817
818        uri = self.tbmap.get(tb, None)
819        if not uri:
820            raise service_error(service_error.server_config, 
821                    "Unknown testbed: %s" % tb)
822
823        export_svcs = masters.get(tb,[])
824        import_svcs = [ s for m in masters.values() \
825                for s in m \
826                    if tb in s.importers ]
827
828        export_project = get_export_project(export_svcs)
829
830        # Tweak search order so that if there are entries in access_user that
831        # have a project matching the export project, we try them first
832        if export_project: 
833            access_sequence = [ (p, u) for p, u in access_user \
834                    if p == export_project] 
835            access_sequence.extend([(p, u) for p, u in access_user \
836                    if p != export_project]) 
837        else: 
838            access_sequence = access_user
839
840        for p, u in access_sequence: 
841            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
842                    "to %s") %  ((p or "None"), u, uri))
843
844            if p:
845                # Request with user and project specified
846                req = {\
847                        'destinationTestbed' : { 'uri' : uri },
848                        'credential': [ "project: %s" % p, "user: %s"  % u],
849                        'allocID' : { 'localname': 'test' },
850                    }
851            else:
852                # Request with only user specified
853                req = {\
854                        'destinationTestbed' : { 'uri' : uri },
855                        'credential': [ 'user: %s' % u ],
856                        'allocID' : { 'localname': 'test' },
857                    }
858
859            # Make the service request from the services we're importing and
860            # exporting.  Keep track of the export request ids so we can
861            # collect the resulting info from the access response.
862            e_keys = { }
863            if import_svcs or export_svcs:
864                req['service'] = [ ]
865
866                for i, s in enumerate(import_svcs):
867                    idx = 'import%d' % i
868                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
869                    if s.params:
870                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
871                                for k, v in s.params.items()]
872                    req['service'].append(sr)
873
874                for i, s in enumerate(export_svcs):
875                    idx = 'export%d' % i
876                    e_keys[idx] = s
877                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
878                    if s.params:
879                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
880                                for k, v in s.params.items()]
881                    req['service'].append(sr)
882
883            # node resources if any
884            if nodes != None and len(nodes) > 0:
885                rnodes = [ ]
886                for n in nodes:
887                    rn = { }
888                    image, hw, count = n.split(":")
889                    if image: rn['image'] = [ image ]
890                    if hw: rn['hardware'] = [ hw ]
891                    if count and int(count) >0 : rn['count'] = int(count)
892                    rnodes.append(rn)
893                req['resources']= { }
894                req['resources']['node'] = rnodes
895
896            try:
897                if self.local_access.has_key(uri):
898                    # Local access call
899                    req = { 'RequestAccessRequestBody' : req }
900                    r = self.local_access[uri].RequestAccess(req, 
901                            fedid(file=self.cert_file))
902                    r = { 'RequestAccessResponseBody' : r }
903                else:
904                    r = self.call_RequestAccess(uri, req, 
905                            self.cert_file, self.cert_pwd, self.trusted_certs)
906            except service_error, e:
907                if e.code == service_error.access:
908                    self.log.debug("[get_access] Access denied")
909                    r = None
910                    continue
911                else:
912                    raise e
913
914            if r.has_key('RequestAccessResponseBody'):
915                # Through to here we have a valid response, not a fault.
916                # Access denied is a fault, so something better or worse than
917                # access denied has happened.
918                r = r['RequestAccessResponseBody']
919                self.log.debug("[get_access] Access granted")
920                break
921            else:
922                raise service_error(service_error.protocol,
923                        "Bad proxy response")
924       
925        if not r:
926            raise service_error(service_error.access, 
927                    "Access denied by %s (%s)" % (tb, uri))
928
929        tbparam[tb] = { 
930                "allocID" : r['allocID'],
931                "uri": uri,
932                }
933
934        # Collect the responses corresponding to the services this testbed
935        # exports.  These will be the service requests that we will include in
936        # the start segment requests (with appropriate visibility values) to
937        # import and export the segments.
938        for s in r.get('service', []):
939            id = s.get('id', None)
940            if id and id in e_keys:
941                e_keys[id].reqs.append(s)
942
943        # Add attributes to parameter space.  We don't allow attributes to
944        # overlay any parameters already installed.
945        for a in r.get('fedAttr', []):
946            try:
947                if a['attribute'] and \
948                        isinstance(a['attribute'], basestring)\
949                        and not tbparam[tb].has_key(a['attribute'].lower()):
950                    tbparam[tb][a['attribute'].lower()] = a['value']
951            except KeyError:
952                self.log.error("Bad attribute in response: %s" % a)
953
954    def release_access(self, tb, aid, uri=None):
955        """
956        Release access to testbed through fedd
957        """
958
959        if not uri:
960            uri = self.tbmap.get(tb, None)
961        if not uri:
962            raise service_error(service_error.server_config, 
963                    "Unknown testbed: %s" % tb)
964
965        if self.local_access.has_key(uri):
966            resp = self.local_access[uri].ReleaseAccess(\
967                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
968                    fedid(file=self.cert_file))
969            resp = { 'ReleaseAccessResponseBody': resp } 
970        else:
971            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
972                    self.cert_file, self.cert_pwd, self.trusted_certs)
973
974        # better error coding
975
976    def remote_ns2topdl(self, uri, desc):
977
978        req = {
979                'description' : { 'ns2description': desc },
980            }
981
982        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
983                self.trusted_certs)
984
985        if r.has_key('Ns2TopdlResponseBody'):
986            r = r['Ns2TopdlResponseBody']
987            ed = r.get('experimentdescription', None)
988            if ed.has_key('topdldescription'):
989                return topdl.Topology(**ed['topdldescription'])
990            else:
991                raise service_error(service_error.protocol, 
992                        "Bad splitter response (no output)")
993        else:
994            raise service_error(service_error.protocol, "Bad splitter response")
995
996    class start_segment:
997        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
998                cert_pwd=None, trusted_certs=None, caller=None,
999                log_collector=None):
1000            self.log = log
1001            self.debug = debug
1002            self.cert_file = cert_file
1003            self.cert_pwd = cert_pwd
1004            self.trusted_certs = None
1005            self.caller = caller
1006            self.testbed = testbed
1007            self.log_collector = log_collector
1008            self.response = None
1009            self.node = { }
1010
1011        def make_map(self, resp):
1012            if 'segmentdescription' in resp and \
1013                    'topdldescription' in resp['segmentdescription']:
1014                top = topdl.Topology(\
1015                        **resp['segmentdescription']['topdldescription'])
1016                for e in [e for e in top.elements \
1017                        if isinstance(e, topdl.Computer)]:
1018                    hn = e.get_attribute('hostname')
1019                    if hn:
1020                        for n in e.name:
1021                            self.node[n] = hn
1022
1023        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1024            req = {
1025                    'allocID': { 'fedid' : aid }, 
1026                    'segmentdescription': { 
1027                        'topdldescription': topo.to_dict(),
1028                    },
1029                }
1030
1031            if connInfo:
1032                req['connection'] = connInfo
1033
1034            import_svcs = [ s for m in masters.values() \
1035                    for s in m if self.testbed in s.importers]
1036
1037            if import_svcs or self.testbed in masters:
1038                req['service'] = []
1039
1040            for s in import_svcs:
1041                for r in s.reqs:
1042                    sr = copy.deepcopy(r)
1043                    sr['visibility'] = 'import';
1044                    req['service'].append(sr)
1045
1046            for s in masters.get(self.testbed, []):
1047                for r in s.reqs:
1048                    sr = copy.deepcopy(r)
1049                    sr['visibility'] = 'export';
1050                    req['service'].append(sr)
1051
1052            if attrs:
1053                req['fedAttr'] = attrs
1054
1055            try:
1056                self.log.debug("Calling StartSegment at %s " % uri)
1057                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1058                        self.trusted_certs)
1059                if r.has_key('StartSegmentResponseBody'):
1060                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1061                            None)
1062                    if lval and self.log_collector:
1063                        for line in  lval.splitlines(True):
1064                            self.log_collector.write(line)
1065                    self.make_map(r['StartSegmentResponseBody'])
1066                    self.response = r
1067                else:
1068                    raise service_error(service_error.internal, 
1069                            "Bad response!?: %s" %r)
1070                return True
1071            except service_error, e:
1072                self.log.error("Start segment failed on %s: %s" % \
1073                        (self.testbed, e))
1074                return False
1075
1076
1077
1078    class terminate_segment:
1079        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1080                cert_pwd=None, trusted_certs=None, caller=None):
1081            self.log = log
1082            self.debug = debug
1083            self.cert_file = cert_file
1084            self.cert_pwd = cert_pwd
1085            self.trusted_certs = None
1086            self.caller = caller
1087            self.testbed = testbed
1088
1089        def __call__(self, uri, aid ):
1090            req = {
1091                    'allocID': aid , 
1092                }
1093            try:
1094                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1095                        self.trusted_certs)
1096                return True
1097            except service_error, e:
1098                self.log.error("Terminate segment failed on %s: %s" % \
1099                        (self.testbed, e))
1100                return False
1101   
1102
1103    def allocate_resources(self, allocated, masters, eid, expid, 
1104            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1105            attrs=None, connInfo={}):
1106
1107        started = { }           # Testbeds where a sub-experiment started
1108                                # successfully
1109
1110        # XXX
1111        fail_soft = False
1112
1113        log = alloc_log or self.log
1114
1115        thread_pool = self.thread_pool(self.nthreads)
1116        threads = [ ]
1117        starters = [ ]
1118
1119        for tb in allocated.keys():
1120            # Create and start a thread to start the segment, and save it
1121            # to get the return value later
1122            thread_pool.wait_for_slot()
1123            uri = tbparams[tb].get('uri', self.tbmap.get(tb, None))
1124            if not uri:
1125                raise service_error(service_error.internal, 
1126                        "Unknown testbed %s !?" % tb)
1127
1128            if tbparams[tb].has_key('allocID') and \
1129                    tbparams[tb]['allocID'].has_key('fedid'):
1130                aid = tbparams[tb]['allocID']['fedid']
1131            else:
1132                raise service_error(service_error.internal, 
1133                        "No alloc id for testbed %s !?" % tb)
1134
1135            s = self.start_segment(log=log, debug=self.debug,
1136                    testbed=tb, cert_file=self.cert_file,
1137                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1138                    caller=self.call_StartSegment,
1139                    log_collector=log_collector)
1140            starters.append(s)
1141            t  = self.pooled_thread(\
1142                    target=s, name=tb,
1143                    args=(uri, aid, topo[tb], masters, attrs, connInfo[tb]),
1144                    pdata=thread_pool, trace_file=self.trace_file)
1145            threads.append(t)
1146            t.start()
1147
1148        # Wait until all finish (keep pinging the log, though)
1149        mins = 0
1150        revoked = False
1151        while not thread_pool.wait_for_all_done(60.0):
1152            mins += 1
1153            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1154                    % mins)
1155            if not revoked and \
1156                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1157                # a testbed has failed.  Revoke this experiment's
1158                # synchronizarion values so that sub experiments will not
1159                # deadlock waiting for synchronization that will never happen
1160                self.log.info("A subexperiment has failed to swap in, " + \
1161                        "revoking synch keys")
1162                var_key = "fedid:%s" % expid
1163                for k in self.synch_store.all_keys():
1164                    if len(k) > 45 and k[0:46] == var_key:
1165                        self.synch_store.revoke_key(k)
1166                revoked = True
1167
1168        failed = [ t.getName() for t in threads if not t.rv ]
1169        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1170
1171        # If one failed clean up, unless fail_soft is set
1172        if failed:
1173            if not fail_soft:
1174                thread_pool.clear()
1175                for tb in succeeded:
1176                    # Create and start a thread to stop the segment
1177                    thread_pool.wait_for_slot()
1178                    uri = tbparams[tb]['uri']
1179                    t  = self.pooled_thread(\
1180                            target=self.terminate_segment(log=log,
1181                                testbed=tb,
1182                                cert_file=self.cert_file, 
1183                                cert_pwd=self.cert_pwd,
1184                                trusted_certs=self.trusted_certs,
1185                                caller=self.call_TerminateSegment),
1186                            args=(uri, tbparams[tb]['federant']['allocID']),
1187                            name=tb,
1188                            pdata=thread_pool, trace_file=self.trace_file)
1189                    t.start()
1190                # Wait until all finish (if any are being stopped)
1191                if succeeded:
1192                    thread_pool.wait_for_all_done()
1193
1194                # release the allocations
1195                for tb in tbparams.keys():
1196                    self.release_access(tb, tbparams[tb]['allocID'],
1197                            tbparams[tb].get('uri', None))
1198                # Remove the placeholder
1199                self.state_lock.acquire()
1200                self.state[eid]['experimentStatus'] = 'failed'
1201                if self.state_filename: self.write_state()
1202                self.state_lock.release()
1203
1204                log.error("Swap in failed on %s" % ",".join(failed))
1205                return
1206        else:
1207            # Walk through the successes and gather the virtual to physical
1208            # mapping.
1209            node = { }
1210            for s in starters:
1211                node.update(s.node)
1212            # Assigng the mapping as a hostname attribute
1213            for e in [ e for e in top.elements \
1214                    if isinstance(e, topdl.Computer)]:
1215                for n in e.name:
1216                    if n in node:
1217                        e.set_attribute('hostname', node[n])
1218            log.info("[start_segment]: Experiment %s active" % eid)
1219
1220
1221        # Walk up tmpdir, deleting as we go
1222        if self.cleanup:
1223            log.debug("[start_experiment]: removing %s" % tmpdir)
1224            for path, dirs, files in os.walk(tmpdir, topdown=False):
1225                for f in files:
1226                    os.remove(os.path.join(path, f))
1227                for d in dirs:
1228                    os.rmdir(os.path.join(path, d))
1229            os.rmdir(tmpdir)
1230        else:
1231            log.debug("[start_experiment]: not removing %s" % tmpdir)
1232
1233        # Insert the experiment into our state and update the disk copy.
1234        self.state_lock.acquire()
1235        self.state[expid]['experimentStatus'] = 'active'
1236        self.state[eid] = self.state[expid]
1237        self.state[eid]['experimentdescription']['topdldescription'] = \
1238                top.to_dict()
1239        if self.state_filename: self.write_state()
1240        self.state_lock.release()
1241        return
1242
1243
1244    def add_kit(self, e, kit):
1245        """
1246        Add a Software object created from the list of (install, location)
1247        tuples passed as kit  to the software attribute of an object e.  We
1248        do this enough to break out the code, but it's kind of a hack to
1249        avoid changing the old tuple rep.
1250        """
1251
1252        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1253
1254        if isinstance(e.software, list): e.software.extend(s)
1255        else: e.software = s
1256
1257
1258    def create_experiment_state(self, fid, req, expid, expcert,
1259            state='starting'):
1260        """
1261        Create the initial entry in the experiment's state.  The expid and
1262        expcert are the experiment's fedid and certifacte that represents that
1263        ID, which are installed in the experiment state.  If the request
1264        includes a suggested local name that is used if possible.  If the local
1265        name is already taken by an experiment owned by this user that has
1266        failed, it is overwritten.  Otherwise new letters are added until a
1267        valid localname is found.  The generated local name is returned.
1268        """
1269
1270        if req.has_key('experimentID') and \
1271                req['experimentID'].has_key('localname'):
1272            overwrite = False
1273            eid = req['experimentID']['localname']
1274            # If there's an old failed experiment here with the same local name
1275            # and accessible by this user, we'll overwrite it, otherwise we'll
1276            # fall through and do the collision avoidance.
1277            old_expid = self.get_experiment_fedid(eid)
1278            if old_expid and self.check_experiment_access(fid, old_expid):
1279                self.state_lock.acquire()
1280                status = self.state[eid].get('experimentStatus', None)
1281                if status and status == 'failed':
1282                    # remove the old access attribute
1283                    self.auth.unset_attribute(fid, old_expid)
1284                    overwrite = True
1285                    del self.state[eid]
1286                    del self.state[old_expid]
1287                self.state_lock.release()
1288            self.state_lock.acquire()
1289            while (self.state.has_key(eid) and not overwrite):
1290                eid += random.choice(string.ascii_letters)
1291            # Initial state
1292            self.state[eid] = {
1293                    'experimentID' : \
1294                            [ { 'localname' : eid }, {'fedid': expid } ],
1295                    'experimentStatus': state,
1296                    'experimentAccess': { 'X509' : expcert },
1297                    'owner': fid,
1298                    'log' : [],
1299                }
1300            self.state[expid] = self.state[eid]
1301            if self.state_filename: self.write_state()
1302            self.state_lock.release()
1303        else:
1304            eid = self.exp_stem
1305            for i in range(0,5):
1306                eid += random.choice(string.ascii_letters)
1307            self.state_lock.acquire()
1308            while (self.state.has_key(eid)):
1309                eid = self.exp_stem
1310                for i in range(0,5):
1311                    eid += random.choice(string.ascii_letters)
1312            # Initial state
1313            self.state[eid] = {
1314                    'experimentID' : \
1315                            [ { 'localname' : eid }, {'fedid': expid } ],
1316                    'experimentStatus': state,
1317                    'experimentAccess': { 'X509' : expcert },
1318                    'owner': fid,
1319                    'log' : [],
1320                }
1321            self.state[expid] = self.state[eid]
1322            if self.state_filename: self.write_state()
1323            self.state_lock.release()
1324
1325        return eid
1326
1327
1328    def allocate_ips_to_topo(self, top):
1329        """
1330        Add an ip4_address attribute to all the hosts in the topology, based on
1331        the shared substrates on which they sit.  An /etc/hosts file is also
1332        created and returned as a list of hostfiles entries.  We also return
1333        the allocator, because we may need to allocate IPs to portals
1334        (specifically DRAGON portals).
1335        """
1336        subs = sorted(top.substrates, 
1337                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1338                reverse=True)
1339        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1340        ifs = { }
1341        hosts = [ ]
1342
1343        for idx, s in enumerate(subs):
1344            net_size = len(s.interfaces)+2
1345
1346            a = ips.allocate(net_size)
1347            if a :
1348                base, num = a
1349                if num < net_size: 
1350                    raise service_error(service_error.internal,
1351                            "Allocator returned wrong number of IPs??")
1352            else:
1353                raise service_error(service_error.req, 
1354                        "Cannot allocate IP addresses")
1355            mask = ips.min_alloc
1356            while mask < net_size:
1357                mask *= 2
1358
1359            netmask = ((2**32-1) ^ (mask-1))
1360
1361            base += 1
1362            for i in s.interfaces:
1363                i.attribute.append(
1364                        topdl.Attribute('ip4_address', 
1365                            "%s" % ip_addr(base)))
1366                i.attribute.append(
1367                        topdl.Attribute('ip4_netmask', 
1368                            "%s" % ip_addr(int(netmask))))
1369
1370                hname = i.element.name[0]
1371                if ifs.has_key(hname):
1372                    hosts.append("%s\t%s-%s %s-%d" % \
1373                            (ip_addr(base), hname, s.name, hname,
1374                                ifs[hname]))
1375                else:
1376                    ifs[hname] = 0
1377                    hosts.append("%s\t%s-%s %s-%d %s" % \
1378                            (ip_addr(base), hname, s.name, hname,
1379                                ifs[hname], hname))
1380
1381                ifs[hname] += 1
1382                base += 1
1383        return hosts, ips
1384
1385    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1386            tbparams, masters):
1387        """
1388        Request access to the various testbeds required for this instantiation
1389        (passed in as testbeds).  User, access_user, expoert_project and master
1390        are used to construct the correct requests.  Per-testbed parameters are
1391        returned in tbparams.
1392        """
1393        for tb in testbeds:
1394            self.get_access(tb, None, tbparams, access_user, masters)
1395            allocated[tb] = 1
1396
1397    def split_topology(self, top, topo, testbeds):
1398        """
1399        Create the sub-topologies that are needed for experiment instantiation.
1400        """
1401        for tb in testbeds:
1402            topo[tb] = top.clone()
1403            # copy in for loop allows deletions from the original
1404            for e in [ e for e in topo[tb].elements]:
1405                etb = e.get_attribute('testbed')
1406                # NB: elements without a testbed attribute won't appear in any
1407                # sub topologies. 
1408                if not etb or etb != tb:
1409                    for i in e.interface:
1410                        for s in i.subs:
1411                            try:
1412                                s.interfaces.remove(i)
1413                            except ValueError:
1414                                raise service_error(service_error.internal,
1415                                        "Can't remove interface??")
1416                    topo[tb].elements.remove(e)
1417            topo[tb].make_indices()
1418
1419    def wrangle_software(self, expid, top, topo, tbparams):
1420        """
1421        Copy software out to the repository directory, allocate permissions and
1422        rewrite the segment topologies to look for the software in local
1423        places.
1424        """
1425
1426        # Copy the rpms and tarfiles to a distribution directory from
1427        # which the federants can retrieve them
1428        linkpath = "%s/software" %  expid
1429        softdir ="%s/%s" % ( self.repodir, linkpath)
1430        softmap = { }
1431        # These are in a list of tuples format (each kit).  This comprehension
1432        # unwraps them into a single list of tuples that initilaizes the set of
1433        # tuples.
1434        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1435                for p, t in l ])
1436        pkgs.update([x.location for e in top.elements \
1437                for x in e.software])
1438        try:
1439            os.makedirs(softdir)
1440        except IOError, e:
1441            raise service_error(
1442                    "Cannot create software directory: %s" % e)
1443        # The actual copying.  Everything's converted into a url for copying.
1444        for pkg in pkgs:
1445            loc = pkg
1446
1447            scheme, host, path = urlparse(loc)[0:3]
1448            dest = os.path.basename(path)
1449            if not scheme:
1450                if not loc.startswith('/'):
1451                    loc = "/%s" % loc
1452                loc = "file://%s" %loc
1453            try:
1454                u = urlopen(loc)
1455            except Exception, e:
1456                raise service_error(service_error.req, 
1457                        "Cannot open %s: %s" % (loc, e))
1458            try:
1459                f = open("%s/%s" % (softdir, dest) , "w")
1460                self.log.debug("Writing %s/%s" % (softdir,dest) )
1461                data = u.read(4096)
1462                while data:
1463                    f.write(data)
1464                    data = u.read(4096)
1465                f.close()
1466                u.close()
1467            except Exception, e:
1468                raise service_error(service_error.internal,
1469                        "Could not copy %s: %s" % (loc, e))
1470            path = re.sub("/tmp", "", linkpath)
1471            # XXX
1472            softmap[pkg] = \
1473                    "%s/%s/%s" %\
1474                    ( self.repo_url, path, dest)
1475
1476            # Allow the individual segments to access the software.
1477            for tb in tbparams.keys():
1478                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1479                        "/%s/%s" % ( path, dest))
1480
1481        # Convert the software locations in the segments into the local
1482        # copies on this host
1483        for soft in [ s for tb in topo.values() \
1484                for e in tb.elements \
1485                    if getattr(e, 'software', False) \
1486                        for s in e.software ]:
1487            if softmap.has_key(soft.location):
1488                soft.location = softmap[soft.location]
1489
1490
1491    def new_experiment(self, req, fid):
1492        """
1493        The external interface to empty initial experiment creation called from
1494        the dispatcher.
1495
1496        Creates a working directory, splits the incoming description using the
1497        splitter script and parses out the avrious subsections using the
1498        lcasses above.  Once each sub-experiment is created, use pooled threads
1499        to instantiate them and start it all up.
1500        """
1501        if not self.auth.check_attribute(fid, 'new'):
1502            raise service_error(service_error.access, "New access denied")
1503
1504        try:
1505            tmpdir = tempfile.mkdtemp(prefix="split-")
1506        except IOError:
1507            raise service_error(service_error.internal, "Cannot create tmp dir")
1508
1509        try:
1510            access_user = self.accessdb[fid]
1511        except KeyError:
1512            raise service_error(service_error.internal,
1513                    "Access map and authorizer out of sync in " + \
1514                            "new_experiment for fedid %s"  % fid)
1515
1516        pid = "dummy"
1517        gid = "dummy"
1518
1519        req = req.get('NewRequestBody', None)
1520        if not req:
1521            raise service_error(service_error.req,
1522                    "Bad request format (no NewRequestBody)")
1523
1524        # Generate an ID for the experiment (slice) and a certificate that the
1525        # allocator can use to prove they own it.  We'll ship it back through
1526        # the encrypted connection.
1527        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1528
1529        #now we're done with the tmpdir, and it should be empty
1530        if self.cleanup:
1531            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1532            os.rmdir(tmpdir)
1533        else:
1534            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1535
1536        eid = self.create_experiment_state(fid, req, expid, expcert, 
1537                state='empty')
1538
1539        # Let users touch the state
1540        self.auth.set_attribute(fid, expid)
1541        self.auth.set_attribute(expid, expid)
1542        # Override fedids can manipulate state as well
1543        for o in self.overrides:
1544            self.auth.set_attribute(o, expid)
1545
1546        rv = {
1547                'experimentID': [
1548                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1549                ],
1550                'experimentStatus': 'empty',
1551                'experimentAccess': { 'X509' : expcert }
1552            }
1553
1554        return rv
1555
1556    def create_experiment(self, req, fid):
1557        """
1558        The external interface to experiment creation called from the
1559        dispatcher.
1560
1561        Creates a working directory, splits the incoming description using the
1562        splitter script and parses out the various subsections using the
1563        classes above.  Once each sub-experiment is created, use pooled threads
1564        to instantiate them and start it all up.
1565        """
1566
1567        req = req.get('CreateRequestBody', None)
1568        if not req:
1569            raise service_error(service_error.req,
1570                    "Bad request format (no CreateRequestBody)")
1571
1572        # Get the experiment access
1573        exp = req.get('experimentID', None)
1574        if exp:
1575            if exp.has_key('fedid'):
1576                key = exp['fedid']
1577                expid = key
1578                eid = None
1579            elif exp.has_key('localname'):
1580                key = exp['localname']
1581                eid = key
1582                expid = None
1583            else:
1584                raise service_error(service_error.req, "Unknown lookup type")
1585        else:
1586            raise service_error(service_error.req, "No request?")
1587
1588        self.check_experiment_access(fid, key)
1589
1590        try:
1591            tmpdir = tempfile.mkdtemp(prefix="split-")
1592            os.mkdir(tmpdir+"/keys")
1593        except IOError:
1594            raise service_error(service_error.internal, "Cannot create tmp dir")
1595
1596        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1597        gw_secretkey_base = "fed.%s" % self.ssh_type
1598        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1599        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1600        tclfile = tmpdir + "/experiment.tcl"
1601        tbparams = { }
1602        try:
1603            access_user = self.accessdb[fid]
1604        except KeyError:
1605            raise service_error(service_error.internal,
1606                    "Access map and authorizer out of sync in " + \
1607                            "create_experiment for fedid %s"  % fid)
1608
1609        pid = "dummy"
1610        gid = "dummy"
1611
1612        # The tcl parser needs to read a file so put the content into that file
1613        descr=req.get('experimentdescription', None)
1614        if descr:
1615            file_content=descr.get('ns2description', None)
1616            if file_content:
1617                try:
1618                    f = open(tclfile, 'w')
1619                    f.write(file_content)
1620                    f.close()
1621                except IOError:
1622                    raise service_error(service_error.internal,
1623                            "Cannot write temp experiment description")
1624            else:
1625                raise service_error(service_error.req, 
1626                        "Only ns2descriptions supported")
1627        else:
1628            raise service_error(service_error.req, "No experiment description")
1629
1630        self.state_lock.acquire()
1631        if self.state.has_key(key):
1632            self.state[key]['experimentStatus'] = "starting"
1633            for e in self.state[key].get('experimentID',[]):
1634                if not expid and e.has_key('fedid'):
1635                    expid = e['fedid']
1636                elif not eid and e.has_key('localname'):
1637                    eid = e['localname']
1638        self.state_lock.release()
1639
1640        if not (eid and expid):
1641            raise service_error(service_error.internal, 
1642                    "Cannot find local experiment info!?")
1643
1644        try: 
1645            # This catches exceptions to clear the placeholder if necessary
1646            try:
1647                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1648            except ValueError:
1649                raise service_error(service_error.server_config, 
1650                        "Bad key type (%s)" % self.ssh_type)
1651
1652            # Copy the service request
1653            tb_services = [ s for s in req.get('service',[]) ]
1654            # Translate to topdl
1655            if self.splitter_url:
1656                self.log.debug("Calling remote topdl translator at %s" % \
1657                        self.splitter_url)
1658                top = self.remote_ns2topdl(self.splitter_url, file_content)
1659            else:
1660                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1661                    str(self.muxmax), '-m', 'dummy']
1662
1663                tclcmd.extend([pid, gid, eid, tclfile])
1664
1665                self.log.debug("running local splitter %s", " ".join(tclcmd))
1666                # This is just fantastic.  As a side effect the parser copies
1667                # tb_compat.tcl into the current directory, so that directory
1668                # must be writable by the fedd user.  Doing this in the
1669                # temporary subdir ensures this is the case.
1670                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1671                        cwd=tmpdir)
1672                split_data = tclparser.stdout
1673
1674                top = topdl.topology_from_xml(file=split_data, top="experiment")
1675
1676            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1677             # Find the testbeds to look up
1678            testbeds = set([ a.value for e in top.elements \
1679                    for a in e.attribute \
1680                        if a.attribute == 'testbed'])
1681
1682            masters = { }           # testbeds exporting services
1683            for s in tb_services:
1684                # If this is a project_export request with the imports field
1685                # blank, fill it in.
1686                if s.get('name', '') == 'project_export':
1687                    if 'import' not in s or len(s['import']) == 0:
1688                        s['import'] = [ tb for tb in testbeds \
1689                                if tb not in s.get('export',[])]
1690                # Add the service to masters
1691                for tb in s.get('export', []):
1692                    if s.get('name', None):
1693                        if tb not in masters:
1694                            masters[tb] = [ ]
1695
1696                        params = { }
1697                        if 'fedAttr' in s:
1698                            for a in s['fedAttr']:
1699                                params[a.get('attribute', '')] = \
1700                                        a.get('value','')
1701
1702                        masters[tb].append(federated_service(name=s['name'],
1703                                exporter=tb, importers=s.get('import',[]),
1704                                params=params))
1705                    else:
1706                        self.log.error('Testbed service does not have name " + \
1707                                "and importers')
1708
1709
1710            allocated = { }         # Testbeds we can access
1711            topo ={ }               # Sub topologies
1712            connInfo = { }          # Connection information
1713            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1714                    tbparams, masters)
1715
1716            self.split_topology(top, topo, testbeds)
1717
1718            # Copy configuration files into the remote file store
1719            # The config urlpath
1720            configpath = "/%s/config" % expid
1721            # The config file system location
1722            configdir ="%s%s" % ( self.repodir, configpath)
1723            try:
1724                os.makedirs(configdir)
1725            except IOError, e:
1726                raise service_error(
1727                        "Cannot create config directory: %s" % e)
1728            try:
1729                f = open("%s/hosts" % configdir, "w")
1730                f.write('\n'.join(hosts))
1731                f.close()
1732            except IOError, e:
1733                raise service_error(service_error.internal, 
1734                        "Cannot write hosts file: %s" % e)
1735            try:
1736                copy_file("%s" % gw_pubkey, "%s/%s" % \
1737                        (configdir, gw_pubkey_base))
1738                copy_file("%s" % gw_secretkey, "%s/%s" % \
1739                        (configdir, gw_secretkey_base))
1740            except IOError, e:
1741                raise service_error(service_error.internal, 
1742                        "Cannot copy keyfiles: %s" % e)
1743
1744            # Allow the individual testbeds to access the configuration files.
1745            for tb in tbparams.keys():
1746                asignee = tbparams[tb]['allocID']['fedid']
1747                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1748                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1749
1750            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1751                    self.muxmax)
1752            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1753                    connInfo, expid)
1754            # Now get access to the dynamic testbeds
1755            for k, t in topo.items():
1756                if not t.get_attribute('dynamic'):
1757                    continue
1758                tb = t.get_attribute('testbed')
1759                if tb: 
1760                    self.get_access(tb, None, tbparams, export_project, 
1761                            access_user, masters)
1762                    tbparams[k] = tbparams[tb]
1763                    del tbparams[tb]
1764                    allocated[k] = 1
1765                    store_keys = t.get_attribute('store_keys')
1766                    # Give the testbed access to keys it exports or imports
1767                    if store_keys:
1768                        for sk in store_keys.split(" "):
1769                            self.auth.set_attribute(\
1770                                    tbparams[k]['allocID']['fedid'], sk)
1771                else:
1772                    raise service_error(service_error.internal, 
1773                            "Dynamic allocation from no testbed!?")
1774
1775            self.wrangle_software(expid, top, topo, tbparams)
1776
1777            vtopo = topdl.topology_to_vtopo(top)
1778            vis = self.genviz(vtopo)
1779
1780            # save federant information
1781            for k in allocated.keys():
1782                tbparams[k]['federant'] = {
1783                        'name': [ { 'localname' : eid} ],
1784                        'allocID' : tbparams[k]['allocID'],
1785                        'uri': tbparams[k]['uri'],
1786                    }
1787
1788            self.state_lock.acquire()
1789            self.state[eid]['vtopo'] = vtopo
1790            self.state[eid]['vis'] = vis
1791            self.state[eid]['experimentdescription'] = \
1792                    { 'topdldescription': top.clone() }
1793            self.state[expid]['federant'] = \
1794                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1795                        if tbparams[tb].has_key('federant') ]
1796            if self.state_filename: 
1797                self.write_state()
1798            self.state_lock.release()
1799        except service_error, e:
1800            # If something goes wrong in the parse (usually an access error)
1801            # clear the placeholder state.  From here on out the code delays
1802            # exceptions.  Failing at this point returns a fault to the remote
1803            # caller.
1804
1805            self.state_lock.acquire()
1806            del self.state[eid]
1807            del self.state[expid]
1808            if self.state_filename: self.write_state()
1809            self.state_lock.release()
1810            raise e
1811
1812
1813        # Start the background swapper and return the starting state.  From
1814        # here on out, the state will stick around a while.
1815
1816        # Let users touch the state
1817        self.auth.set_attribute(fid, expid)
1818        self.auth.set_attribute(expid, expid)
1819        # Override fedids can manipulate state as well
1820        for o in self.overrides:
1821            self.auth.set_attribute(o, expid)
1822
1823        # Create a logger that logs to the experiment's state object as well as
1824        # to the main log file.
1825        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1826        alloc_collector = self.list_log(self.state[eid]['log'])
1827        h = logging.StreamHandler(alloc_collector)
1828        # XXX: there should be a global one of these rather than repeating the
1829        # code.
1830        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1831                    '%d %b %y %H:%M:%S'))
1832        alloc_log.addHandler(h)
1833       
1834        attrs = [ 
1835                {
1836                    'attribute': 'ssh_pubkey', 
1837                    'value': '%s/%s/config/%s' % \
1838                            (self.repo_url, expid, gw_pubkey_base)
1839                },
1840                {
1841                    'attribute': 'ssh_secretkey', 
1842                    'value': '%s/%s/config/%s' % \
1843                            (self.repo_url, expid, gw_secretkey_base)
1844                },
1845                {
1846                    'attribute': 'hosts', 
1847                    'value': '%s/%s/config/hosts' % \
1848                            (self.repo_url, expid)
1849                },
1850                {
1851                    'attribute': 'experiment_name',
1852                    'value': eid,
1853                },
1854            ]
1855
1856        # transit and disconnected testbeds may not have a connInfo entry.
1857        # Fill in the blanks.
1858        for t in allocated.keys():
1859            if not connInfo.has_key(t):
1860                connInfo[t] = { }
1861
1862        # Start a thread to do the resource allocation
1863        t  = Thread(target=self.allocate_resources,
1864                args=(allocated, masters, eid, expid, tbparams, 
1865                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1866                    connInfo),
1867                name=eid)
1868        t.start()
1869
1870        rv = {
1871                'experimentID': [
1872                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1873                ],
1874                'experimentStatus': 'starting',
1875            }
1876
1877        return rv
1878   
1879    def get_experiment_fedid(self, key):
1880        """
1881        find the fedid associated with the localname key in the state database.
1882        """
1883
1884        rv = None
1885        self.state_lock.acquire()
1886        if self.state.has_key(key):
1887            if isinstance(self.state[key], dict):
1888                try:
1889                    kl = [ f['fedid'] for f in \
1890                            self.state[key]['experimentID']\
1891                                if f.has_key('fedid') ]
1892                except KeyError:
1893                    self.state_lock.release()
1894                    raise service_error(service_error.internal, 
1895                            "No fedid for experiment %s when getting "+\
1896                                    "fedid(!?)" % key)
1897                if len(kl) == 1:
1898                    rv = kl[0]
1899                else:
1900                    self.state_lock.release()
1901                    raise service_error(service_error.internal, 
1902                            "multiple fedids for experiment %s when " +\
1903                                    "getting fedid(!?)" % key)
1904            else:
1905                self.state_lock.release()
1906                raise service_error(service_error.internal, 
1907                        "Unexpected state for %s" % key)
1908        self.state_lock.release()
1909        return rv
1910
1911    def check_experiment_access(self, fid, key):
1912        """
1913        Confirm that the fid has access to the experiment.  Though a request
1914        may be made in terms of a local name, the access attribute is always
1915        the experiment's fedid.
1916        """
1917        if not isinstance(key, fedid):
1918            key = self.get_experiment_fedid(key)
1919
1920        if self.auth.check_attribute(fid, key):
1921            return True
1922        else:
1923            raise service_error(service_error.access, "Access Denied")
1924
1925
1926    def get_handler(self, path, fid):
1927        self.log.info("Get handler %s %s" % (path, fid))
1928        if self.auth.check_attribute(fid, path):
1929            return ("%s/%s" % (self.repodir, path), "application/binary")
1930        else:
1931            return (None, None)
1932
1933    def get_vtopo(self, req, fid):
1934        """
1935        Return the stored virtual topology for this experiment
1936        """
1937        rv = None
1938        state = None
1939
1940        req = req.get('VtopoRequestBody', None)
1941        if not req:
1942            raise service_error(service_error.req,
1943                    "Bad request format (no VtopoRequestBody)")
1944        exp = req.get('experiment', None)
1945        if exp:
1946            if exp.has_key('fedid'):
1947                key = exp['fedid']
1948                keytype = "fedid"
1949            elif exp.has_key('localname'):
1950                key = exp['localname']
1951                keytype = "localname"
1952            else:
1953                raise service_error(service_error.req, "Unknown lookup type")
1954        else:
1955            raise service_error(service_error.req, "No request?")
1956
1957        self.check_experiment_access(fid, key)
1958
1959        self.state_lock.acquire()
1960        if self.state.has_key(key):
1961            if self.state[key].has_key('vtopo'):
1962                rv = { 'experiment' : {keytype: key },\
1963                        'vtopo': self.state[key]['vtopo'],\
1964                    }
1965            else:
1966                state = self.state[key]['experimentStatus']
1967        self.state_lock.release()
1968
1969        if rv: return rv
1970        else: 
1971            if state:
1972                raise service_error(service_error.partial, 
1973                        "Not ready: %s" % state)
1974            else:
1975                raise service_error(service_error.req, "No such experiment")
1976
1977    def get_vis(self, req, fid):
1978        """
1979        Return the stored visualization for this experiment
1980        """
1981        rv = None
1982        state = None
1983
1984        req = req.get('VisRequestBody', None)
1985        if not req:
1986            raise service_error(service_error.req,
1987                    "Bad request format (no VisRequestBody)")
1988        exp = req.get('experiment', None)
1989        if exp:
1990            if exp.has_key('fedid'):
1991                key = exp['fedid']
1992                keytype = "fedid"
1993            elif exp.has_key('localname'):
1994                key = exp['localname']
1995                keytype = "localname"
1996            else:
1997                raise service_error(service_error.req, "Unknown lookup type")
1998        else:
1999            raise service_error(service_error.req, "No request?")
2000
2001        self.check_experiment_access(fid, key)
2002
2003        self.state_lock.acquire()
2004        if self.state.has_key(key):
2005            if self.state[key].has_key('vis'):
2006                rv =  { 'experiment' : {keytype: key },\
2007                        'vis': self.state[key]['vis'],\
2008                        }
2009            else:
2010                state = self.state[key]['experimentStatus']
2011        self.state_lock.release()
2012
2013        if rv: return rv
2014        else:
2015            if state:
2016                raise service_error(service_error.partial, 
2017                        "Not ready: %s" % state)
2018            else:
2019                raise service_error(service_error.req, "No such experiment")
2020
2021    def clean_info_response(self, rv):
2022        """
2023        Remove the information in the experiment's state object that is not in
2024        the info response.
2025        """
2026        # Remove the owner info (should always be there, but...)
2027        if rv.has_key('owner'): del rv['owner']
2028
2029        # Convert the log into the allocationLog parameter and remove the
2030        # log entry (with defensive programming)
2031        if rv.has_key('log'):
2032            rv['allocationLog'] = "".join(rv['log'])
2033            del rv['log']
2034        else:
2035            rv['allocationLog'] = ""
2036
2037        if rv['experimentStatus'] != 'active':
2038            if rv.has_key('federant'): del rv['federant']
2039        else:
2040            # remove the allocationID and uri info from each federant
2041            for f in rv.get('federant', []):
2042                if f.has_key('allocID'): del f['allocID']
2043                if f.has_key('uri'): del f['uri']
2044
2045        return rv
2046
2047    def get_info(self, req, fid):
2048        """
2049        Return all the stored info about this experiment
2050        """
2051        rv = None
2052
2053        req = req.get('InfoRequestBody', None)
2054        if not req:
2055            raise service_error(service_error.req,
2056                    "Bad request format (no InfoRequestBody)")
2057        exp = req.get('experiment', None)
2058        if exp:
2059            if exp.has_key('fedid'):
2060                key = exp['fedid']
2061                keytype = "fedid"
2062            elif exp.has_key('localname'):
2063                key = exp['localname']
2064                keytype = "localname"
2065            else:
2066                raise service_error(service_error.req, "Unknown lookup type")
2067        else:
2068            raise service_error(service_error.req, "No request?")
2069
2070        self.check_experiment_access(fid, key)
2071
2072        # The state may be massaged by the service function that called
2073        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2074        # state.
2075        self.state_lock.acquire()
2076        if self.state.has_key(key):
2077            rv = copy.deepcopy(self.state[key])
2078        self.state_lock.release()
2079
2080        if rv:
2081            return self.clean_info_response(rv)
2082        else:
2083            raise service_error(service_error.req, "No such experiment")
2084
2085    def get_multi_info(self, req, fid):
2086        """
2087        Return all the stored info that this fedid can access
2088        """
2089        rv = { 'info': [ ] }
2090
2091        self.state_lock.acquire()
2092        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2093            try:
2094                self.check_experiment_access(fid, key)
2095            except service_error, e:
2096                if e.code == service_error.access:
2097                    continue
2098                else:
2099                    self.state_lock.release()
2100                    raise e
2101
2102            if self.state.has_key(key):
2103                e = copy.deepcopy(self.state[key])
2104                e = self.clean_info_response(e)
2105                rv['info'].append(e)
2106        self.state_lock.release()
2107        return rv
2108
2109    def terminate_experiment(self, req, fid):
2110        """
2111        Swap this experiment out on the federants and delete the shared
2112        information
2113        """
2114        tbparams = { }
2115        req = req.get('TerminateRequestBody', None)
2116        if not req:
2117            raise service_error(service_error.req,
2118                    "Bad request format (no TerminateRequestBody)")
2119        force = req.get('force', False)
2120        exp = req.get('experiment', None)
2121        if exp:
2122            if exp.has_key('fedid'):
2123                key = exp['fedid']
2124                keytype = "fedid"
2125            elif exp.has_key('localname'):
2126                key = exp['localname']
2127                keytype = "localname"
2128            else:
2129                raise service_error(service_error.req, "Unknown lookup type")
2130        else:
2131            raise service_error(service_error.req, "No request?")
2132
2133        self.check_experiment_access(fid, key)
2134
2135        dealloc_list = [ ]
2136
2137
2138        # Create a logger that logs to the dealloc_list as well as to the main
2139        # log file.
2140        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2141        h = logging.StreamHandler(self.list_log(dealloc_list))
2142        # XXX: there should be a global one of these rather than repeating the
2143        # code.
2144        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2145                    '%d %b %y %H:%M:%S'))
2146        dealloc_log.addHandler(h)
2147
2148        self.state_lock.acquire()
2149        fed_exp = self.state.get(key, None)
2150
2151        if fed_exp:
2152            # This branch of the conditional holds the lock to generate a
2153            # consistent temporary tbparams variable to deallocate experiments.
2154            # It releases the lock to do the deallocations and reacquires it to
2155            # remove the experiment state when the termination is complete.
2156
2157            # First make sure that the experiment creation is complete.
2158            status = fed_exp.get('experimentStatus', None)
2159
2160            if status:
2161                if status in ('starting', 'terminating'):
2162                    if not force:
2163                        self.state_lock.release()
2164                        raise service_error(service_error.partial, 
2165                                'Experiment still being created or destroyed')
2166                    else:
2167                        self.log.warning('Experiment in %s state ' % status + \
2168                                'being terminated by force.')
2169            else:
2170                # No status??? trouble
2171                self.state_lock.release()
2172                raise service_error(service_error.internal,
2173                        "Experiment has no status!?")
2174
2175            ids = []
2176            #  experimentID is a list of dicts that are self-describing
2177            #  identifiers.  This finds all the fedids and localnames - the
2178            #  keys of self.state - and puts them into ids.
2179            for id in fed_exp.get('experimentID', []):
2180                if id.has_key('fedid'): ids.append(id['fedid'])
2181                if id.has_key('localname'): ids.append(id['localname'])
2182
2183            # Collect the allocation/segment ids into a dict keyed by the fedid
2184            # of the allocation (or a monotonically increasing integer) that
2185            # contains a tuple of uri, aid (which is a dict...)
2186            for i, fed in enumerate(fed_exp.get('federant', [])):
2187                try:
2188                    uri = fed['uri']
2189                    aid = fed['allocID']
2190                    k = fed['allocID'].get('fedid', i)
2191                except KeyError, e:
2192                    continue
2193                tbparams[k] = (uri, aid)
2194            fed_exp['experimentStatus'] = 'terminating'
2195            if self.state_filename: self.write_state()
2196            self.state_lock.release()
2197
2198            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2199            # then completes, so we can't wait if nothing starts.  So, no
2200            # tbparams, no start.
2201            if len(tbparams) > 0:
2202                thread_pool = self.thread_pool(self.nthreads)
2203                for k in tbparams.keys():
2204                    # Create and start a thread to stop the segment
2205                    thread_pool.wait_for_slot()
2206                    uri, aid = tbparams[k]
2207                    t  = self.pooled_thread(\
2208                            target=self.terminate_segment(log=dealloc_log,
2209                                testbed=uri,
2210                                cert_file=self.cert_file, 
2211                                cert_pwd=self.cert_pwd,
2212                                trusted_certs=self.trusted_certs,
2213                                caller=self.call_TerminateSegment),
2214                            args=(uri, aid), name=k,
2215                            pdata=thread_pool, trace_file=self.trace_file)
2216                    t.start()
2217                # Wait for completions
2218                thread_pool.wait_for_all_done()
2219
2220            # release the allocations (failed experiments have done this
2221            # already, and starting experiments may be in odd states, so we
2222            # ignore errors releasing those allocations
2223            try: 
2224                for k in tbparams.keys():
2225                    # This releases access by uri
2226                    uri, aid = tbparams[k]
2227                    self.release_access(None, aid, uri=uri)
2228            except service_error, e:
2229                if status != 'failed' and not force:
2230                    raise e
2231
2232            # Remove the terminated experiment
2233            self.state_lock.acquire()
2234            for id in ids:
2235                if self.state.has_key(id): del self.state[id]
2236
2237            if self.state_filename: self.write_state()
2238            self.state_lock.release()
2239
2240            # Delete any synch points associated with this experiment.  All
2241            # synch points begin with the fedid of the experiment.
2242            fedid_keys = set(["fedid:%s" % f for f in ids \
2243                    if isinstance(f, fedid)])
2244            for k in self.synch_store.all_keys():
2245                try:
2246                    if len(k) > 45 and k[0:46] in fedid_keys:
2247                        self.synch_store.del_value(k)
2248                except synch_store.BadDeletionError:
2249                    pass
2250            self.write_store()
2251               
2252            return { 
2253                    'experiment': exp , 
2254                    'deallocationLog': "".join(dealloc_list),
2255                    }
2256        else:
2257            # Don't forget to release the lock
2258            self.state_lock.release()
2259            raise service_error(service_error.req, "No saved state")
2260
2261
2262    def GetValue(self, req, fid):
2263        """
2264        Get a value from the synchronized store
2265        """
2266        req = req.get('GetValueRequestBody', None)
2267        if not req:
2268            raise service_error(service_error.req,
2269                    "Bad request format (no GetValueRequestBody)")
2270       
2271        name = req['name']
2272        wait = req['wait']
2273        rv = { 'name': name }
2274
2275        if self.auth.check_attribute(fid, name):
2276            try:
2277                v = self.synch_store.get_value(name, wait)
2278            except synch_store.RevokedKeyError:
2279                # No more synch on this key
2280                raise service_error(service_error.federant, 
2281                        "Synch key %s revoked" % name)
2282            if v is not None:
2283                rv['value'] = v
2284            self.log.debug("[GetValue] got %s from %s" % (v, name))
2285            return rv
2286        else:
2287            raise service_error(service_error.access, "Access Denied")
2288       
2289
2290    def SetValue(self, req, fid):
2291        """
2292        Set a value in the synchronized store
2293        """
2294        req = req.get('SetValueRequestBody', None)
2295        if not req:
2296            raise service_error(service_error.req,
2297                    "Bad request format (no SetValueRequestBody)")
2298       
2299        name = req['name']
2300        v = req['value']
2301
2302        if self.auth.check_attribute(fid, name):
2303            try:
2304                self.synch_store.set_value(name, v)
2305                self.write_store()
2306                self.log.debug("[SetValue] set %s to %s" % (name, v))
2307            except synch_store.CollisionError:
2308                # Translate into a service_error
2309                raise service_error(service_error.req,
2310                        "Value already set: %s" %name)
2311            except synch_store.RevokedKeyError:
2312                # No more synch on this key
2313                raise service_error(service_error.federant, 
2314                        "Synch key %s revoked" % name)
2315            return { 'name': name, 'value': v }
2316        else:
2317            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.