source: fedd/federation/experiment_control.py @ 2fd8f8c

axis_examplecompt_changesinfo-opsversion-3.01version-3.02
Last change on this file since 2fd8f8c was 2fd8f8c, checked in by Ted Faber <faber@…>, 14 years ago

ftopo improvements

  • Property mode set to 100644
File size: 80.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22
23from urlparse import urlparse
24from urllib2 import urlopen
25
26from util import *
27from fedid import fedid, generate_fedid
28from remote_service import xmlrpc_handler, soap_handler, service_caller
29from service_error import service_error
30from synch_store import synch_store
31from experiment_partition import experiment_partition
32
33import topdl
34import list_log
35from ip_allocator import ip_allocator
36from ip_addr import ip_addr
37
38
39class nullHandler(logging.Handler):
40    def emit(self, record): pass
41
42fl = logging.getLogger("fedd.experiment_control")
43fl.addHandler(nullHandler())
44
45
46# Right now, no support for composition.
47class federated_service:
48    def __init__(self, name, exporter=None, importers=[], params={ }, reqs=[]):
49        self.name=name
50        self.exporter=exporter
51        self.importers=importers
52        self.params = params
53        self.reqs = reqs
54
55    def __str__(self):
56        return "name %s export %s import %s params %s reqs %s" % \
57                (self.name, self.exporter, self.importers, self.params,
58                        [ (r['name'], r['visibility']) for r in self.reqs] )
59
60class experiment_control_local:
61    """
62    Control of experiments that this system can directly access.
63
64    Includes experiment creation, termination and information dissemination.
65    Thred safe.
66    """
67
68    class ssh_cmd_timeout(RuntimeError): pass
69   
70    class thread_pool:
71        """
72        A class to keep track of a set of threads all invoked for the same
73        task.  Manages the mutual exclusion of the states.
74        """
75        def __init__(self, nthreads):
76            """
77            Start a pool.
78            """
79            self.changed = Condition()
80            self.started = 0
81            self.terminated = 0
82            self.nthreads = nthreads
83
84        def acquire(self):
85            """
86            Get the pool's lock.
87            """
88            self.changed.acquire()
89
90        def release(self):
91            """
92            Release the pool's lock.
93            """
94            self.changed.release()
95
96        def wait(self, timeout = None):
97            """
98            Wait for a pool thread to start or stop.
99            """
100            self.changed.wait(timeout)
101
102        def start(self):
103            """
104            Called by a pool thread to report starting.
105            """
106            self.changed.acquire()
107            self.started += 1
108            self.changed.notifyAll()
109            self.changed.release()
110
111        def terminate(self):
112            """
113            Called by a pool thread to report finishing.
114            """
115            self.changed.acquire()
116            self.terminated += 1
117            self.changed.notifyAll()
118            self.changed.release()
119
120        def clear(self):
121            """
122            Clear all pool data.
123            """
124            self.changed.acquire()
125            self.started = 0
126            self.terminated =0
127            self.changed.notifyAll()
128            self.changed.release()
129
130        def wait_for_slot(self):
131            """
132            Wait until we have a free slot to start another pooled thread
133            """
134            self.acquire()
135            while self.started - self.terminated >= self.nthreads:
136                self.wait()
137            self.release()
138
139        def wait_for_all_done(self, timeout=None):
140            """
141            Wait until all active threads finish (and at least one has
142            started).  If a timeout is given, return after waiting that long
143            for termination.  If all threads are done (and one has started in
144            the since the last clear()) return True, otherwise False.
145            """
146            if timeout:
147                deadline = time.time() + timeout
148            self.acquire()
149            while self.started == 0 or self.started > self.terminated:
150                self.wait(timeout)
151                if timeout:
152                    if time.time() > deadline:
153                        break
154                    timeout = deadline - time.time()
155            self.release()
156            return not (self.started == 0 or self.started > self.terminated)
157
158    class pooled_thread(Thread):
159        """
160        One of a set of threads dedicated to a specific task.  Uses the
161        thread_pool class above for coordination.
162        """
163        def __init__(self, group=None, target=None, name=None, args=(), 
164                kwargs={}, pdata=None, trace_file=None):
165            Thread.__init__(self, group, target, name, args, kwargs)
166            self.rv = None          # Return value of the ops in this thread
167            self.exception = None   # Exception that terminated this thread
168            self.target=target      # Target function to run on start()
169            self.args = args        # Args to pass to target
170            self.kwargs = kwargs    # Additional kw args
171            self.pdata = pdata      # thread_pool for this class
172            # Logger for this thread
173            self.log = logging.getLogger("fedd.experiment_control")
174       
175        def run(self):
176            """
177            Emulate Thread.run, except add pool data manipulation and error
178            logging.
179            """
180            if self.pdata:
181                self.pdata.start()
182
183            if self.target:
184                try:
185                    self.rv = self.target(*self.args, **self.kwargs)
186                except service_error, s:
187                    self.exception = s
188                    self.log.error("Thread exception: %s %s" % \
189                            (s.code_string(), s.desc))
190                except:
191                    self.exception = sys.exc_info()[1]
192                    self.log.error(("Unexpected thread exception: %s" +\
193                            "Trace %s") % (self.exception,\
194                                traceback.format_exc()))
195            if self.pdata:
196                self.pdata.terminate()
197
198    call_RequestAccess = service_caller('RequestAccess')
199    call_ReleaseAccess = service_caller('ReleaseAccess')
200    call_StartSegment = service_caller('StartSegment')
201    call_TerminateSegment = service_caller('TerminateSegment')
202    call_Ns2Topdl = service_caller('Ns2Topdl')
203
204    def __init__(self, config=None, auth=None):
205        """
206        Intialize the various attributes, most from the config object
207        """
208
209        def parse_tarfile_list(tf):
210            """
211            Parse a tarfile list from the configuration.  This is a set of
212            paths and tarfiles separated by spaces.
213            """
214            rv = [ ]
215            if tf is not None:
216                tl = tf.split()
217                while len(tl) > 1:
218                    p, t = tl[0:2]
219                    del tl[0:2]
220                    rv.append((p, t))
221            return rv
222
223        self.thread_with_rv = experiment_control_local.pooled_thread
224        self.thread_pool = experiment_control_local.thread_pool
225        self.list_log = list_log.list_log
226
227        self.cert_file = config.get("experiment_control", "cert_file")
228        if self.cert_file:
229            self.cert_pwd = config.get("experiment_control", "cert_pwd")
230        else:
231            self.cert_file = config.get("globals", "cert_file")
232            self.cert_pwd = config.get("globals", "cert_pwd")
233
234        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
235                or config.get("globals", "trusted_certs")
236
237        self.repodir = config.get("experiment_control", "repodir")
238        self.repo_url = config.get("experiment_control", "repo_url", 
239                "https://users.isi.deterlab.net:23235");
240
241        self.exp_stem = "fed-stem"
242        self.log = logging.getLogger("fedd.experiment_control")
243        set_log_level(config, "experiment_control", self.log)
244        self.muxmax = 2
245        self.nthreads = 10
246        self.randomize_experiments = False
247
248        self.splitter = None
249        self.ssh_keygen = "/usr/bin/ssh-keygen"
250        self.ssh_identity_file = None
251
252
253        self.debug = config.getboolean("experiment_control", "create_debug")
254        self.cleanup = not config.getboolean("experiment_control", 
255                "leave_tmpfiles")
256        self.state_filename = config.get("experiment_control", 
257                "experiment_state")
258        self.store_filename = config.get("experiment_control", 
259                "synch_store")
260        self.store_url = config.get("experiment_control", "store_url")
261        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
262        self.fedkit = parse_tarfile_list(\
263                config.get("experiment_control", "fedkit"))
264        self.gatewaykit = parse_tarfile_list(\
265                config.get("experiment_control", "gatewaykit"))
266        accessdb_file = config.get("experiment_control", "accessdb")
267
268        self.ssh_pubkey_file = config.get("experiment_control", 
269                "ssh_pubkey_file")
270        self.ssh_privkey_file = config.get("experiment_control",
271                "ssh_privkey_file")
272        dt = config.get("experiment_control", "direct_transit")
273        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
274        else: self.direct_transit = [ ]
275        # NB for internal master/slave ops, not experiment setup
276        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
277
278        self.overrides = set([])
279        ovr = config.get('experiment_control', 'overrides')
280        if ovr:
281            for o in ovr.split(","):
282                o = o.strip()
283                if o.startswith('fedid:'): o = o[len('fedid:'):]
284                self.overrides.add(fedid(hexstr=o))
285
286        self.state = { }
287        self.state_lock = Lock()
288        self.tclsh = "/usr/local/bin/otclsh"
289        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
290                config.get("experiment_control", "tcl_splitter",
291                        "/usr/testbed/lib/ns2ir/parse.tcl")
292        mapdb_file = config.get("experiment_control", "mapdb")
293        self.trace_file = sys.stderr
294
295        self.def_expstart = \
296                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
297                "/tmp/federate";
298        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
299                "FEDDIR/hosts";
300        self.def_gwstart = \
301                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
302                "/tmp/bridge.log";
303        self.def_mgwstart = \
304                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
305                "/tmp/bridge.log";
306        self.def_gwimage = "FBSD61-TUNNEL2";
307        self.def_gwtype = "pc";
308        self.local_access = { }
309
310        if auth:
311            self.auth = auth
312        else:
313            self.log.error(\
314                    "[access]: No authorizer initialized, creating local one.")
315            auth = authorizer()
316
317
318        if self.ssh_pubkey_file:
319            try:
320                f = open(self.ssh_pubkey_file, 'r')
321                self.ssh_pubkey = f.read()
322                f.close()
323            except EnvironmentError:
324                raise service_error(service_error.internal,
325                        "Cannot read sshpubkey")
326        else:
327            raise service_error(service_error.internal, 
328                    "No SSH public key file?")
329
330        if not self.ssh_privkey_file:
331            raise service_error(service_error.internal, 
332                    "No SSH public key file?")
333
334
335        if mapdb_file:
336            self.read_mapdb(mapdb_file)
337        else:
338            self.log.warn("[experiment_control] No testbed map, using defaults")
339            self.tbmap = { 
340                    'deter':'https://users.isi.deterlab.net:23235',
341                    'emulab':'https://users.isi.deterlab.net:23236',
342                    'ucb':'https://users.isi.deterlab.net:23237',
343                    }
344
345        if accessdb_file:
346                self.read_accessdb(accessdb_file)
347        else:
348            raise service_error(service_error.internal,
349                    "No accessdb specified in config")
350
351        # Grab saved state.  OK to do this w/o locking because it's read only
352        # and only one thread should be in existence that can see self.state at
353        # this point.
354        if self.state_filename:
355            self.read_state()
356
357        if self.store_filename:
358            self.read_store()
359        else:
360            self.log.warning("No saved synch store")
361            self.synch_store = synch_store
362
363        # Dispatch tables
364        self.soap_services = {\
365                'New': soap_handler('New', self.new_experiment),
366                'Create': soap_handler('Create', self.create_experiment),
367                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
368                'Vis': soap_handler('Vis', self.get_vis),
369                'Info': soap_handler('Info', self.get_info),
370                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
371                'Terminate': soap_handler('Terminate', 
372                    self.terminate_experiment),
373                'GetValue': soap_handler('GetValue', self.GetValue),
374                'SetValue': soap_handler('SetValue', self.SetValue),
375        }
376
377        self.xmlrpc_services = {\
378                'New': xmlrpc_handler('New', self.new_experiment),
379                'Create': xmlrpc_handler('Create', self.create_experiment),
380                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
381                'Vis': xmlrpc_handler('Vis', self.get_vis),
382                'Info': xmlrpc_handler('Info', self.get_info),
383                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
384                'Terminate': xmlrpc_handler('Terminate',
385                    self.terminate_experiment),
386                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
387                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
388        }
389
390    # Call while holding self.state_lock
391    def write_state(self):
392        """
393        Write a new copy of experiment state after copying the existing state
394        to a backup.
395
396        State format is a simple pickling of the state dictionary.
397        """
398        if os.access(self.state_filename, os.W_OK):
399            copy_file(self.state_filename, \
400                    "%s.bak" % self.state_filename)
401        try:
402            f = open(self.state_filename, 'w')
403            pickle.dump(self.state, f)
404        except EnvironmentError, e:
405            self.log.error("Can't write file %s: %s" % \
406                    (self.state_filename, e))
407        except pickle.PicklingError, e:
408            self.log.error("Pickling problem: %s" % e)
409        except TypeError, e:
410            self.log.error("Pickling problem (TypeError): %s" % e)
411
412    @staticmethod
413    def get_alloc_ids(state):
414        """
415        Pull the fedids of the identifiers of each allocation from the
416        state.  Again, a dict dive that's best isolated.
417
418        Used by read_store and read state
419        """
420
421        return [ f['allocID']['fedid'] 
422                for f in state.get('federant',[]) \
423                    if f.has_key('allocID') and \
424                        f['allocID'].has_key('fedid')]
425
426    # Call while holding self.state_lock
427    def read_state(self):
428        """
429        Read a new copy of experiment state.  Old state is overwritten.
430
431        State format is a simple pickling of the state dictionary.
432        """
433       
434        def get_experiment_id(state):
435            """
436            Pull the fedid experimentID out of the saved state.  This is kind
437            of a gross walk through the dict.
438            """
439
440            if state.has_key('experimentID'):
441                for e in state['experimentID']:
442                    if e.has_key('fedid'):
443                        return e['fedid']
444                else:
445                    return None
446            else:
447                return None
448
449        try:
450            f = open(self.state_filename, "r")
451            self.state = pickle.load(f)
452            self.log.debug("[read_state]: Read state from %s" % \
453                    self.state_filename)
454        except EnvironmentError, e:
455            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
456                    % (self.state_filename, e))
457        except pickle.UnpicklingError, e:
458            self.log.warning(("[read_state]: No saved state: " + \
459                    "Unpickling failed: %s") % e)
460       
461        for s in self.state.values():
462            try:
463
464                eid = get_experiment_id(s)
465                if eid : 
466                    # Give the owner rights to the experiment
467                    self.auth.set_attribute(s['owner'], eid)
468                    # And holders of the eid as well
469                    self.auth.set_attribute(eid, eid)
470                    # allow overrides to control experiments as well
471                    for o in self.overrides:
472                        self.auth.set_attribute(o, eid)
473                    # Set permissions to allow reading of the software repo, if
474                    # any, as well.
475                    for a in self.get_alloc_ids(s):
476                        self.auth.set_attribute(a, 'repo/%s' % eid)
477                else:
478                    raise KeyError("No experiment id")
479            except KeyError, e:
480                self.log.warning("[read_state]: State ownership or identity " +\
481                        "misformatted in %s: %s" % (self.state_filename, e))
482
483
484    def read_accessdb(self, accessdb_file):
485        """
486        Read the mapping from fedids that can create experiments to their name
487        in the 3-level access namespace.  All will be asserted from this
488        testbed and can include the local username and porject that will be
489        asserted on their behalf by this fedd.  Each fedid is also added to the
490        authorization system with the "create" attribute.
491        """
492        self.accessdb = {}
493        # These are the regexps for parsing the db
494        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
495        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
496                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
497        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
498                "\s*->\s*(" + name_expr + ")\s*$")
499        lineno = 0
500
501        # Parse the mappings and store in self.authdb, a dict of
502        # fedid -> (proj, user)
503        try:
504            f = open(accessdb_file, "r")
505            for line in f:
506                lineno += 1
507                line = line.strip()
508                if len(line) == 0 or line.startswith('#'):
509                    continue
510                m = project_line.match(line)
511                if m:
512                    fid = fedid(hexstr=m.group(1))
513                    project, user = m.group(2,3)
514                    if not self.accessdb.has_key(fid):
515                        self.accessdb[fid] = []
516                    self.accessdb[fid].append((project, user))
517                    continue
518
519                m = user_line.match(line)
520                if m:
521                    fid = fedid(hexstr=m.group(1))
522                    project = None
523                    user = m.group(2)
524                    if not self.accessdb.has_key(fid):
525                        self.accessdb[fid] = []
526                    self.accessdb[fid].append((project, user))
527                    continue
528                self.log.warn("[experiment_control] Error parsing access " +\
529                        "db %s at line %d" %  (accessdb_file, lineno))
530        except EnvironmentError:
531            raise service_error(service_error.internal,
532                    ("Error opening/reading %s as experiment " +\
533                            "control accessdb") %  accessdb_file)
534        f.close()
535
536        # Initialize the authorization attributes
537        for fid in self.accessdb.keys():
538            self.auth.set_attribute(fid, 'create')
539            self.auth.set_attribute(fid, 'new')
540
541    def read_mapdb(self, file):
542        """
543        Read a simple colon separated list of mappings for the
544        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
545        """
546
547        self.tbmap = { }
548        lineno =0
549        try:
550            f = open(file, "r")
551            for line in f:
552                lineno += 1
553                line = line.strip()
554                if line.startswith('#') or len(line) == 0:
555                    continue
556                try:
557                    label, url = line.split(':', 1)
558                    self.tbmap[label] = url
559                except ValueError, e:
560                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
561                            "map db: %s %s" % (lineno, line, e))
562        except EnvironmentError, e:
563            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
564                    "open %s: %s" % (file, e))
565        f.close()
566
567    def read_store(self):
568        try:
569            self.synch_store = synch_store()
570            self.synch_store.load(self.store_filename)
571            self.log.debug("[read_store]: Read store from %s" % \
572                    self.store_filename)
573        except EnvironmentError, e:
574            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
575                    % (self.state_filename, e))
576            self.synch_store = synch_store()
577
578        # Set the initial permissions on data in the store.  XXX: This ad hoc
579        # authorization attribute initialization is getting out of hand.
580        for k in self.synch_store.all_keys():
581            try:
582                if k.startswith('fedid:'):
583                    fid = fedid(hexstr=k[6:46])
584                    if self.state.has_key(fid):
585                        for a in self.get_alloc_ids(self.state[fid]):
586                            self.auth.set_attribute(a, k)
587            except ValueError, e:
588                self.log.warn("Cannot deduce permissions for %s" % k)
589
590
591    def write_store(self):
592        """
593        Write a new copy of synch_store after writing current state
594        to a backup.  We use the internal synch_store pickle method to avoid
595        incinsistent data.
596
597        State format is a simple pickling of the store.
598        """
599        if os.access(self.store_filename, os.W_OK):
600            copy_file(self.store_filename, \
601                    "%s.bak" % self.store_filename)
602        try:
603            self.synch_store.save(self.store_filename)
604        except EnvironmentError, e:
605            self.log.error("Can't write file %s: %s" % \
606                    (self.store_filename, e))
607        except TypeError, e:
608            self.log.error("Pickling problem (TypeError): %s" % e)
609
610       
611    def generate_ssh_keys(self, dest, type="rsa" ):
612        """
613        Generate a set of keys for the gateways to use to talk.
614
615        Keys are of type type and are stored in the required dest file.
616        """
617        valid_types = ("rsa", "dsa")
618        t = type.lower();
619        if t not in valid_types: raise ValueError
620        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
621
622        try:
623            trace = open("/dev/null", "w")
624        except EnvironmentError:
625            raise service_error(service_error.internal,
626                    "Cannot open /dev/null??");
627
628        # May raise CalledProcessError
629        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
630        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
631        if rv != 0:
632            raise service_error(service_error.internal, 
633                    "Cannot generate nonce ssh keys.  %s return code %d" \
634                            % (self.ssh_keygen, rv))
635
636    def gentopo(self, str):
637        """
638        Generate the topology dtat structure from the splitter's XML
639        representation of it.
640
641        The topology XML looks like:
642            <experiment>
643                <nodes>
644                    <node><vname></vname><ips>ip1:ip2</ips></node>
645                </nodes>
646                <lans>
647                    <lan>
648                        <vname></vname><vnode></vnode><ip></ip>
649                        <bandwidth></bandwidth><member>node:port</member>
650                    </lan>
651                </lans>
652        """
653        class topo_parse:
654            """
655            Parse the topology XML and create the dats structure.
656            """
657            def __init__(self):
658                # Typing of the subelements for data conversion
659                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
660                self.int_subelements = ( 'bandwidth',)
661                self.float_subelements = ( 'delay',)
662                # The final data structure
663                self.nodes = [ ]
664                self.lans =  [ ]
665                self.topo = { \
666                        'node': self.nodes,\
667                        'lan' : self.lans,\
668                    }
669                self.element = { }  # Current element being created
670                self.chars = ""     # Last text seen
671
672            def end_element(self, name):
673                # After each sub element the contents is added to the current
674                # element or to the appropriate list.
675                if name == 'node':
676                    self.nodes.append(self.element)
677                    self.element = { }
678                elif name == 'lan':
679                    self.lans.append(self.element)
680                    self.element = { }
681                elif name in self.str_subelements:
682                    self.element[name] = self.chars
683                    self.chars = ""
684                elif name in self.int_subelements:
685                    self.element[name] = int(self.chars)
686                    self.chars = ""
687                elif name in self.float_subelements:
688                    self.element[name] = float(self.chars)
689                    self.chars = ""
690
691            def found_chars(self, data):
692                self.chars += data.rstrip()
693
694
695        tp = topo_parse();
696        parser = xml.parsers.expat.ParserCreate()
697        parser.EndElementHandler = tp.end_element
698        parser.CharacterDataHandler = tp.found_chars
699
700        parser.Parse(str)
701
702        return tp.topo
703       
704
705    def genviz(self, topo):
706        """
707        Generate the visualization the virtual topology
708        """
709
710        neato = "/usr/local/bin/neato"
711        # These are used to parse neato output and to create the visualization
712        # file.
713        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
714        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
715                "%s</type></node>"
716
717        try:
718            # Node names
719            nodes = [ n['vname'] for n in topo['node'] ]
720            topo_lans = topo['lan']
721        except KeyError, e:
722            raise service_error(service_error.internal, "Bad topology: %s" %e)
723
724        lans = { }
725        links = { }
726
727        # Walk through the virtual topology, organizing the connections into
728        # 2-node connections (links) and more-than-2-node connections (lans).
729        # When a lan is created, it's added to the list of nodes (there's a
730        # node in the visualization for the lan).
731        for l in topo_lans:
732            if links.has_key(l['vname']):
733                if len(links[l['vname']]) < 2:
734                    links[l['vname']].append(l['vnode'])
735                else:
736                    nodes.append(l['vname'])
737                    lans[l['vname']] = links[l['vname']]
738                    del links[l['vname']]
739                    lans[l['vname']].append(l['vnode'])
740            elif lans.has_key(l['vname']):
741                lans[l['vname']].append(l['vnode'])
742            else:
743                links[l['vname']] = [ l['vnode'] ]
744
745
746        # Open up a temporary file for dot to turn into a visualization
747        try:
748            df, dotname = tempfile.mkstemp()
749            dotfile = os.fdopen(df, 'w')
750        except EnvironmentError:
751            raise service_error(service_error.internal,
752                    "Failed to open file in genviz")
753
754        try:
755            dnull = open('/dev/null', 'w')
756        except EnvironmentError:
757            service_error(service_error.internal,
758                    "Failed to open /dev/null in genviz")
759
760        # Generate a dot/neato input file from the links, nodes and lans
761        try:
762            print >>dotfile, "graph G {"
763            for n in nodes:
764                print >>dotfile, '\t"%s"' % n
765            for l in links.keys():
766                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
767            for l in lans.keys():
768                for n in lans[l]:
769                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
770            print >>dotfile, "}"
771            dotfile.close()
772        except TypeError:
773            raise service_error(service_error.internal,
774                    "Single endpoint link in vtopo")
775        except EnvironmentError:
776            raise service_error(service_error.internal, "Cannot write dot file")
777
778        # Use dot to create a visualization
779        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
780                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
781                close_fds=True)
782        dnull.close()
783
784        # Translate dot to vis format
785        vis_nodes = [ ]
786        vis = { 'node': vis_nodes }
787        for line in dot.stdout:
788            m = vis_re.match(line)
789            if m:
790                vn = m.group(1)
791                vis_node = {'name': vn, \
792                        'x': float(m.group(2)),\
793                        'y' : float(m.group(3)),\
794                    }
795                if vn in links.keys() or vn in lans.keys():
796                    vis_node['type'] = 'lan'
797                else:
798                    vis_node['type'] = 'node'
799                vis_nodes.append(vis_node)
800        rv = dot.wait()
801
802        os.remove(dotname)
803        if rv == 0 : return vis
804        else: return None
805
806    def get_access(self, tb, nodes, tbparam, access_user, masters):
807        """
808        Get access to testbed through fedd and set the parameters for that tb
809        """
810        def get_export_project(svcs):
811            """
812            Look through for the list of federated_service for this testbed
813            objects for a project_export service, and extract the project
814            parameter.
815            """
816
817            pe = [s for s in svcs if s.name=='project_export']
818            if len(pe) == 1:
819                return pe[0].params.get('project', None)
820            elif len(pe) == 0:
821                return None
822            else:
823                raise service_error(service_error.req,
824                        "More than one project export is not supported")
825
826        uri = self.tbmap.get(testbed_base(tb), None)
827        if not uri:
828            raise service_error(service_error.server_config, 
829                    "Unknown testbed: %s" % tb)
830
831        export_svcs = masters.get(tb,[])
832        import_svcs = [ s for m in masters.values() \
833                for s in m \
834                    if tb in s.importers ]
835
836        export_project = get_export_project(export_svcs)
837
838        # Tweak search order so that if there are entries in access_user that
839        # have a project matching the export project, we try them first
840        if export_project: 
841            access_sequence = [ (p, u) for p, u in access_user \
842                    if p == export_project] 
843            access_sequence.extend([(p, u) for p, u in access_user \
844                    if p != export_project]) 
845        else: 
846            access_sequence = access_user
847
848        for p, u in access_sequence: 
849            self.log.debug(("[get_access] Attempting access from (%s, %s) " + \
850                    "to %s") %  ((p or "None"), u, uri))
851
852            if p:
853                # Request with user and project specified
854                req = {\
855                        'destinationTestbed' : { 'uri' : uri },
856                        'credential': [ "project: %s" % p, "user: %s"  % u],
857                        'allocID' : { 'localname': 'test' },
858                    }
859            else:
860                # Request with only user specified
861                req = {\
862                        'destinationTestbed' : { 'uri' : uri },
863                        'credential': [ 'user: %s' % u ],
864                        'allocID' : { 'localname': 'test' },
865                    }
866
867            # Make the service request from the services we're importing and
868            # exporting.  Keep track of the export request ids so we can
869            # collect the resulting info from the access response.
870            e_keys = { }
871            if import_svcs or export_svcs:
872                req['service'] = [ ]
873
874                for i, s in enumerate(import_svcs):
875                    idx = 'import%d' % i
876                    sr = {'id': idx, 'name': s.name, 'visibility': 'import' }
877                    if s.params:
878                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
879                                for k, v in s.params.items()]
880                    req['service'].append(sr)
881
882                for i, s in enumerate(export_svcs):
883                    idx = 'export%d' % i
884                    e_keys[idx] = s
885                    sr = {'id': idx, 'name': s.name, 'visibility': 'export' }
886                    if s.params:
887                        sr['fedAttr'] = [ { 'attribute': k, 'value': v } 
888                                for k, v in s.params.items()]
889                    req['service'].append(sr)
890
891            # node resources if any
892            if nodes != None and len(nodes) > 0:
893                rnodes = [ ]
894                for n in nodes:
895                    rn = { }
896                    image, hw, count = n.split(":")
897                    if image: rn['image'] = [ image ]
898                    if hw: rn['hardware'] = [ hw ]
899                    if count and int(count) >0 : rn['count'] = int(count)
900                    rnodes.append(rn)
901                req['resources']= { }
902                req['resources']['node'] = rnodes
903
904            try:
905                if self.local_access.has_key(uri):
906                    # Local access call
907                    req = { 'RequestAccessRequestBody' : req }
908                    r = self.local_access[uri].RequestAccess(req, 
909                            fedid(file=self.cert_file))
910                    r = { 'RequestAccessResponseBody' : r }
911                else:
912                    r = self.call_RequestAccess(uri, req, 
913                            self.cert_file, self.cert_pwd, self.trusted_certs)
914            except service_error, e:
915                if e.code == service_error.access:
916                    self.log.debug("[get_access] Access denied")
917                    r = None
918                    continue
919                else:
920                    raise e
921
922            if r.has_key('RequestAccessResponseBody'):
923                # Through to here we have a valid response, not a fault.
924                # Access denied is a fault, so something better or worse than
925                # access denied has happened.
926                r = r['RequestAccessResponseBody']
927                self.log.debug("[get_access] Access granted")
928                break
929            else:
930                raise service_error(service_error.protocol,
931                        "Bad proxy response")
932       
933        if not r:
934            raise service_error(service_error.access, 
935                    "Access denied by %s (%s)" % (tb, uri))
936
937        tbparam[tb] = { 
938                "allocID" : r['allocID'],
939                "uri": uri,
940                }
941
942        # Collect the responses corresponding to the services this testbed
943        # exports.  These will be the service requests that we will include in
944        # the start segment requests (with appropriate visibility values) to
945        # import and export the segments.
946        for s in r.get('service', []):
947            id = s.get('id', None)
948            if id and id in e_keys:
949                e_keys[id].reqs.append(s)
950
951        # Add attributes to parameter space.  We don't allow attributes to
952        # overlay any parameters already installed.
953        for a in r.get('fedAttr', []):
954            try:
955                if a['attribute'] and \
956                        isinstance(a['attribute'], basestring)\
957                        and not tbparam[tb].has_key(a['attribute'].lower()):
958                    tbparam[tb][a['attribute'].lower()] = a['value']
959            except KeyError:
960                self.log.error("Bad attribute in response: %s" % a)
961
962    def release_access(self, tb, aid, uri=None):
963        """
964        Release access to testbed through fedd
965        """
966
967        if not uri:
968            uri = self.tbmap.get(tb, None)
969        if not uri:
970            raise service_error(service_error.server_config, 
971                    "Unknown testbed: %s" % tb)
972
973        if self.local_access.has_key(uri):
974            resp = self.local_access[uri].ReleaseAccess(\
975                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
976                    fedid(file=self.cert_file))
977            resp = { 'ReleaseAccessResponseBody': resp } 
978        else:
979            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
980                    self.cert_file, self.cert_pwd, self.trusted_certs)
981
982        # better error coding
983
984    def remote_ns2topdl(self, uri, desc):
985
986        req = {
987                'description' : { 'ns2description': desc },
988            }
989
990        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
991                self.trusted_certs)
992
993        if r.has_key('Ns2TopdlResponseBody'):
994            r = r['Ns2TopdlResponseBody']
995            ed = r.get('experimentdescription', None)
996            if ed.has_key('topdldescription'):
997                return topdl.Topology(**ed['topdldescription'])
998            else:
999                raise service_error(service_error.protocol, 
1000                        "Bad splitter response (no output)")
1001        else:
1002            raise service_error(service_error.protocol, "Bad splitter response")
1003
1004    class start_segment:
1005        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1006                cert_pwd=None, trusted_certs=None, caller=None,
1007                log_collector=None):
1008            self.log = log
1009            self.debug = debug
1010            self.cert_file = cert_file
1011            self.cert_pwd = cert_pwd
1012            self.trusted_certs = None
1013            self.caller = caller
1014            self.testbed = testbed
1015            self.log_collector = log_collector
1016            self.response = None
1017            self.node = { }
1018
1019        def make_map(self, resp):
1020            for e in resp.get('embedding', []):
1021                if 'toponame' in e and 'physname' in e:
1022                    self.node[e['toponame']] = e['physname'][0]
1023
1024        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
1025            req = {
1026                    'allocID': { 'fedid' : aid }, 
1027                    'segmentdescription': { 
1028                        'topdldescription': topo.to_dict(),
1029                    },
1030                }
1031
1032            if connInfo:
1033                req['connection'] = connInfo
1034
1035            import_svcs = [ s for m in masters.values() \
1036                    for s in m if self.testbed in s.importers]
1037
1038            if import_svcs or self.testbed in masters:
1039                req['service'] = []
1040
1041            for s in import_svcs:
1042                for r in s.reqs:
1043                    sr = copy.deepcopy(r)
1044                    sr['visibility'] = 'import';
1045                    req['service'].append(sr)
1046
1047            for s in masters.get(self.testbed, []):
1048                for r in s.reqs:
1049                    sr = copy.deepcopy(r)
1050                    sr['visibility'] = 'export';
1051                    req['service'].append(sr)
1052
1053            if attrs:
1054                req['fedAttr'] = attrs
1055
1056            try:
1057                self.log.debug("Calling StartSegment at %s " % uri)
1058                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1059                        self.trusted_certs)
1060                if r.has_key('StartSegmentResponseBody'):
1061                    lval = r['StartSegmentResponseBody'].get('allocationLog',
1062                            None)
1063                    if lval and self.log_collector:
1064                        for line in  lval.splitlines(True):
1065                            self.log_collector.write(line)
1066                    self.make_map(r['StartSegmentResponseBody'])
1067                    self.response = r
1068                else:
1069                    raise service_error(service_error.internal, 
1070                            "Bad response!?: %s" %r)
1071                return True
1072            except service_error, e:
1073                self.log.error("Start segment failed on %s: %s" % \
1074                        (self.testbed, e))
1075                return False
1076
1077
1078
1079    class terminate_segment:
1080        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
1081                cert_pwd=None, trusted_certs=None, caller=None):
1082            self.log = log
1083            self.debug = debug
1084            self.cert_file = cert_file
1085            self.cert_pwd = cert_pwd
1086            self.trusted_certs = None
1087            self.caller = caller
1088            self.testbed = testbed
1089
1090        def __call__(self, uri, aid ):
1091            req = {
1092                    'allocID': aid , 
1093                }
1094            try:
1095                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
1096                        self.trusted_certs)
1097                return True
1098            except service_error, e:
1099                self.log.error("Terminate segment failed on %s: %s" % \
1100                        (self.testbed, e))
1101                return False
1102   
1103
1104    def allocate_resources(self, allocated, masters, eid, expid, 
1105            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
1106            attrs=None, connInfo={}):
1107
1108        started = { }           # Testbeds where a sub-experiment started
1109                                # successfully
1110
1111        # XXX
1112        fail_soft = False
1113
1114        log = alloc_log or self.log
1115
1116        thread_pool = self.thread_pool(self.nthreads)
1117        threads = [ ]
1118        starters = [ ]
1119
1120        for tb in allocated.keys():
1121            # Create and start a thread to start the segment, and save it
1122            # to get the return value later
1123            tb_attrs = copy.copy(attrs)
1124            thread_pool.wait_for_slot()
1125            uri = tbparams[tb].get('uri', \
1126                    self.tbmap.get(testbed_base(tb), None))
1127            base, suffix = split_testbed(tb)
1128            if suffix:
1129                tb_attrs.append({'attribute': 'experiment_name', 
1130                    'value': "%s-%s" % (eid, suffix)})
1131            else:
1132                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
1133            if not uri:
1134                raise service_error(service_error.internal, 
1135                        "Unknown testbed %s !?" % tb)
1136
1137            if tbparams[tb].has_key('allocID') and \
1138                    tbparams[tb]['allocID'].has_key('fedid'):
1139                aid = tbparams[tb]['allocID']['fedid']
1140            else:
1141                raise service_error(service_error.internal, 
1142                        "No alloc id for testbed %s !?" % tb)
1143
1144            s = self.start_segment(log=log, debug=self.debug,
1145                    testbed=tb, cert_file=self.cert_file,
1146                    cert_pwd=self.cert_pwd, trusted_certs=self.trusted_certs,
1147                    caller=self.call_StartSegment,
1148                    log_collector=log_collector)
1149            starters.append(s)
1150            t  = self.pooled_thread(\
1151                    target=s, name=tb,
1152                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
1153                    pdata=thread_pool, trace_file=self.trace_file)
1154            threads.append(t)
1155            t.start()
1156
1157        # Wait until all finish (keep pinging the log, though)
1158        mins = 0
1159        revoked = False
1160        while not thread_pool.wait_for_all_done(60.0):
1161            mins += 1
1162            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
1163                    % mins)
1164            if not revoked and \
1165                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1166                # a testbed has failed.  Revoke this experiment's
1167                # synchronizarion values so that sub experiments will not
1168                # deadlock waiting for synchronization that will never happen
1169                self.log.info("A subexperiment has failed to swap in, " + \
1170                        "revoking synch keys")
1171                var_key = "fedid:%s" % expid
1172                for k in self.synch_store.all_keys():
1173                    if len(k) > 45 and k[0:46] == var_key:
1174                        self.synch_store.revoke_key(k)
1175                revoked = True
1176
1177        failed = [ t.getName() for t in threads if not t.rv ]
1178        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1179
1180        # If one failed clean up, unless fail_soft is set
1181        if failed:
1182            if not fail_soft:
1183                thread_pool.clear()
1184                for tb in succeeded:
1185                    # Create and start a thread to stop the segment
1186                    thread_pool.wait_for_slot()
1187                    uri = tbparams[tb]['uri']
1188                    t  = self.pooled_thread(\
1189                            target=self.terminate_segment(log=log,
1190                                testbed=tb,
1191                                cert_file=self.cert_file, 
1192                                cert_pwd=self.cert_pwd,
1193                                trusted_certs=self.trusted_certs,
1194                                caller=self.call_TerminateSegment),
1195                            args=(uri, tbparams[tb]['federant']['allocID']),
1196                            name=tb,
1197                            pdata=thread_pool, trace_file=self.trace_file)
1198                    t.start()
1199                # Wait until all finish (if any are being stopped)
1200                if succeeded:
1201                    thread_pool.wait_for_all_done()
1202
1203                # release the allocations
1204                for tb in tbparams.keys():
1205                    self.release_access(tb, tbparams[tb]['allocID'],
1206                            tbparams[tb].get('uri', None))
1207                # Remove the placeholder
1208                self.state_lock.acquire()
1209                self.state[eid]['experimentStatus'] = 'failed'
1210                if self.state_filename: self.write_state()
1211                self.state_lock.release()
1212
1213                log.error("Swap in failed on %s" % ",".join(failed))
1214                return
1215        else:
1216            # Walk through the successes and gather the virtual to physical
1217            # mapping.
1218            embedding = [ ]
1219            for s in starters:
1220                for k, v in s.node.items():
1221                    embedding.append({
1222                        'toponame': k, 
1223                        'physname': [ v],
1224                        'testbed': s.testbed
1225                        })
1226            log.info("[start_segment]: Experiment %s active" % eid)
1227
1228
1229        # Walk up tmpdir, deleting as we go
1230        if self.cleanup:
1231            log.debug("[start_experiment]: removing %s" % tmpdir)
1232            for path, dirs, files in os.walk(tmpdir, topdown=False):
1233                for f in files:
1234                    os.remove(os.path.join(path, f))
1235                for d in dirs:
1236                    os.rmdir(os.path.join(path, d))
1237            os.rmdir(tmpdir)
1238        else:
1239            log.debug("[start_experiment]: not removing %s" % tmpdir)
1240
1241        # Insert the experiment into our state and update the disk copy.
1242        self.state_lock.acquire()
1243        self.state[expid]['experimentStatus'] = 'active'
1244        self.state[eid] = self.state[expid]
1245        self.state[eid]['experimentdescription']['topdldescription'] = \
1246                top.to_dict()
1247        self.state[eid]['embedding'] = embedding
1248        if self.state_filename: self.write_state()
1249        self.state_lock.release()
1250        return
1251
1252
1253    def add_kit(self, e, kit):
1254        """
1255        Add a Software object created from the list of (install, location)
1256        tuples passed as kit  to the software attribute of an object e.  We
1257        do this enough to break out the code, but it's kind of a hack to
1258        avoid changing the old tuple rep.
1259        """
1260
1261        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1262
1263        if isinstance(e.software, list): e.software.extend(s)
1264        else: e.software = s
1265
1266
1267    def create_experiment_state(self, fid, req, expid, expcert,
1268            state='starting'):
1269        """
1270        Create the initial entry in the experiment's state.  The expid and
1271        expcert are the experiment's fedid and certifacte that represents that
1272        ID, which are installed in the experiment state.  If the request
1273        includes a suggested local name that is used if possible.  If the local
1274        name is already taken by an experiment owned by this user that has
1275        failed, it is overwritten.  Otherwise new letters are added until a
1276        valid localname is found.  The generated local name is returned.
1277        """
1278
1279        if req.has_key('experimentID') and \
1280                req['experimentID'].has_key('localname'):
1281            overwrite = False
1282            eid = req['experimentID']['localname']
1283            # If there's an old failed experiment here with the same local name
1284            # and accessible by this user, we'll overwrite it, otherwise we'll
1285            # fall through and do the collision avoidance.
1286            old_expid = self.get_experiment_fedid(eid)
1287            if old_expid and self.check_experiment_access(fid, old_expid):
1288                self.state_lock.acquire()
1289                status = self.state[eid].get('experimentStatus', None)
1290                if status and status == 'failed':
1291                    # remove the old access attribute
1292                    self.auth.unset_attribute(fid, old_expid)
1293                    overwrite = True
1294                    del self.state[eid]
1295                    del self.state[old_expid]
1296                self.state_lock.release()
1297            self.state_lock.acquire()
1298            while (self.state.has_key(eid) and not overwrite):
1299                eid += random.choice(string.ascii_letters)
1300            # Initial state
1301            self.state[eid] = {
1302                    'experimentID' : \
1303                            [ { 'localname' : eid }, {'fedid': expid } ],
1304                    'experimentStatus': state,
1305                    'experimentAccess': { 'X509' : expcert },
1306                    'owner': fid,
1307                    'log' : [],
1308                }
1309            self.state[expid] = self.state[eid]
1310            if self.state_filename: self.write_state()
1311            self.state_lock.release()
1312        else:
1313            eid = self.exp_stem
1314            for i in range(0,5):
1315                eid += random.choice(string.ascii_letters)
1316            self.state_lock.acquire()
1317            while (self.state.has_key(eid)):
1318                eid = self.exp_stem
1319                for i in range(0,5):
1320                    eid += random.choice(string.ascii_letters)
1321            # Initial state
1322            self.state[eid] = {
1323                    'experimentID' : \
1324                            [ { 'localname' : eid }, {'fedid': expid } ],
1325                    'experimentStatus': state,
1326                    'experimentAccess': { 'X509' : expcert },
1327                    'owner': fid,
1328                    'log' : [],
1329                }
1330            self.state[expid] = self.state[eid]
1331            if self.state_filename: self.write_state()
1332            self.state_lock.release()
1333
1334        return eid
1335
1336
1337    def allocate_ips_to_topo(self, top):
1338        """
1339        Add an ip4_address attribute to all the hosts in the topology, based on
1340        the shared substrates on which they sit.  An /etc/hosts file is also
1341        created and returned as a list of hostfiles entries.  We also return
1342        the allocator, because we may need to allocate IPs to portals
1343        (specifically DRAGON portals).
1344        """
1345        subs = sorted(top.substrates, 
1346                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1347                reverse=True)
1348        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1349        ifs = { }
1350        hosts = [ ]
1351
1352        for idx, s in enumerate(subs):
1353            net_size = len(s.interfaces)+2
1354
1355            a = ips.allocate(net_size)
1356            if a :
1357                base, num = a
1358                if num < net_size: 
1359                    raise service_error(service_error.internal,
1360                            "Allocator returned wrong number of IPs??")
1361            else:
1362                raise service_error(service_error.req, 
1363                        "Cannot allocate IP addresses")
1364            mask = ips.min_alloc
1365            while mask < net_size:
1366                mask *= 2
1367
1368            netmask = ((2**32-1) ^ (mask-1))
1369
1370            base += 1
1371            for i in s.interfaces:
1372                i.attribute.append(
1373                        topdl.Attribute('ip4_address', 
1374                            "%s" % ip_addr(base)))
1375                i.attribute.append(
1376                        topdl.Attribute('ip4_netmask', 
1377                            "%s" % ip_addr(int(netmask))))
1378
1379                hname = i.element.name
1380                if ifs.has_key(hname):
1381                    hosts.append("%s\t%s-%s %s-%d" % \
1382                            (ip_addr(base), hname, s.name, hname,
1383                                ifs[hname]))
1384                else:
1385                    ifs[hname] = 0
1386                    hosts.append("%s\t%s-%s %s-%d %s" % \
1387                            (ip_addr(base), hname, s.name, hname,
1388                                ifs[hname], hname))
1389
1390                ifs[hname] += 1
1391                base += 1
1392        return hosts, ips
1393
1394    def get_access_to_testbeds(self, testbeds, access_user, allocated, 
1395            tbparams, masters):
1396        """
1397        Request access to the various testbeds required for this instantiation
1398        (passed in as testbeds).  User, access_user, expoert_project and master
1399        are used to construct the correct requests.  Per-testbed parameters are
1400        returned in tbparams.
1401        """
1402        for tb in testbeds:
1403            self.get_access(tb, None, tbparams, access_user, masters)
1404            allocated[tb] = 1
1405
1406    def split_topology(self, top, topo, testbeds):
1407        """
1408        Create the sub-topologies that are needed for experiment instantiation.
1409        """
1410        for tb in testbeds:
1411            topo[tb] = top.clone()
1412            # copy in for loop allows deletions from the original
1413            for e in [ e for e in topo[tb].elements]:
1414                etb = e.get_attribute('testbed')
1415                # NB: elements without a testbed attribute won't appear in any
1416                # sub topologies. 
1417                if not etb or etb != tb:
1418                    for i in e.interface:
1419                        for s in i.subs:
1420                            try:
1421                                s.interfaces.remove(i)
1422                            except ValueError:
1423                                raise service_error(service_error.internal,
1424                                        "Can't remove interface??")
1425                    topo[tb].elements.remove(e)
1426            topo[tb].make_indices()
1427
1428    def wrangle_software(self, expid, top, topo, tbparams):
1429        """
1430        Copy software out to the repository directory, allocate permissions and
1431        rewrite the segment topologies to look for the software in local
1432        places.
1433        """
1434
1435        # Copy the rpms and tarfiles to a distribution directory from
1436        # which the federants can retrieve them
1437        linkpath = "%s/software" %  expid
1438        softdir ="%s/%s" % ( self.repodir, linkpath)
1439        softmap = { }
1440        # These are in a list of tuples format (each kit).  This comprehension
1441        # unwraps them into a single list of tuples that initilaizes the set of
1442        # tuples.
1443        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1444                for p, t in l ])
1445        pkgs.update([x.location for e in top.elements \
1446                for x in e.software])
1447        try:
1448            os.makedirs(softdir)
1449        except EnvironmentError, e:
1450            raise service_error(
1451                    "Cannot create software directory: %s" % e)
1452        # The actual copying.  Everything's converted into a url for copying.
1453        for pkg in pkgs:
1454            loc = pkg
1455
1456            scheme, host, path = urlparse(loc)[0:3]
1457            dest = os.path.basename(path)
1458            if not scheme:
1459                if not loc.startswith('/'):
1460                    loc = "/%s" % loc
1461                loc = "file://%s" %loc
1462            try:
1463                u = urlopen(loc)
1464            except Exception, e:
1465                raise service_error(service_error.req, 
1466                        "Cannot open %s: %s" % (loc, e))
1467            try:
1468                f = open("%s/%s" % (softdir, dest) , "w")
1469                self.log.debug("Writing %s/%s" % (softdir,dest) )
1470                data = u.read(4096)
1471                while data:
1472                    f.write(data)
1473                    data = u.read(4096)
1474                f.close()
1475                u.close()
1476            except Exception, e:
1477                raise service_error(service_error.internal,
1478                        "Could not copy %s: %s" % (loc, e))
1479            path = re.sub("/tmp", "", linkpath)
1480            # XXX
1481            softmap[pkg] = \
1482                    "%s/%s/%s" %\
1483                    ( self.repo_url, path, dest)
1484
1485            # Allow the individual segments to access the software.
1486            for tb in tbparams.keys():
1487                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1488                        "/%s/%s" % ( path, dest))
1489
1490        # Convert the software locations in the segments into the local
1491        # copies on this host
1492        for soft in [ s for tb in topo.values() \
1493                for e in tb.elements \
1494                    if getattr(e, 'software', False) \
1495                        for s in e.software ]:
1496            if softmap.has_key(soft.location):
1497                soft.location = softmap[soft.location]
1498
1499
1500    def new_experiment(self, req, fid):
1501        """
1502        The external interface to empty initial experiment creation called from
1503        the dispatcher.
1504
1505        Creates a working directory, splits the incoming description using the
1506        splitter script and parses out the avrious subsections using the
1507        lcasses above.  Once each sub-experiment is created, use pooled threads
1508        to instantiate them and start it all up.
1509        """
1510        if not self.auth.check_attribute(fid, 'new'):
1511            raise service_error(service_error.access, "New access denied")
1512
1513        try:
1514            tmpdir = tempfile.mkdtemp(prefix="split-")
1515        except EnvironmentError:
1516            raise service_error(service_error.internal, "Cannot create tmp dir")
1517
1518        try:
1519            access_user = self.accessdb[fid]
1520        except KeyError:
1521            raise service_error(service_error.internal,
1522                    "Access map and authorizer out of sync in " + \
1523                            "new_experiment for fedid %s"  % fid)
1524
1525        pid = "dummy"
1526        gid = "dummy"
1527
1528        req = req.get('NewRequestBody', None)
1529        if not req:
1530            raise service_error(service_error.req,
1531                    "Bad request format (no NewRequestBody)")
1532
1533        # Generate an ID for the experiment (slice) and a certificate that the
1534        # allocator can use to prove they own it.  We'll ship it back through
1535        # the encrypted connection.
1536        (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1537
1538        #now we're done with the tmpdir, and it should be empty
1539        if self.cleanup:
1540            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1541            os.rmdir(tmpdir)
1542        else:
1543            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1544
1545        eid = self.create_experiment_state(fid, req, expid, expcert, 
1546                state='empty')
1547
1548        # Let users touch the state
1549        self.auth.set_attribute(fid, expid)
1550        self.auth.set_attribute(expid, expid)
1551        # Override fedids can manipulate state as well
1552        for o in self.overrides:
1553            self.auth.set_attribute(o, expid)
1554
1555        rv = {
1556                'experimentID': [
1557                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1558                ],
1559                'experimentStatus': 'empty',
1560                'experimentAccess': { 'X509' : expcert }
1561            }
1562
1563        return rv
1564
1565    def create_experiment(self, req, fid):
1566        """
1567        The external interface to experiment creation called from the
1568        dispatcher.
1569
1570        Creates a working directory, splits the incoming description using the
1571        splitter script and parses out the various subsections using the
1572        classes above.  Once each sub-experiment is created, use pooled threads
1573        to instantiate them and start it all up.
1574        """
1575
1576        req = req.get('CreateRequestBody', None)
1577        if not req:
1578            raise service_error(service_error.req,
1579                    "Bad request format (no CreateRequestBody)")
1580
1581        # Get the experiment access
1582        exp = req.get('experimentID', None)
1583        if exp:
1584            if exp.has_key('fedid'):
1585                key = exp['fedid']
1586                expid = key
1587                eid = None
1588            elif exp.has_key('localname'):
1589                key = exp['localname']
1590                eid = key
1591                expid = None
1592            else:
1593                raise service_error(service_error.req, "Unknown lookup type")
1594        else:
1595            raise service_error(service_error.req, "No request?")
1596
1597        self.check_experiment_access(fid, key)
1598
1599        try:
1600            tmpdir = tempfile.mkdtemp(prefix="split-")
1601            os.mkdir(tmpdir+"/keys")
1602        except EnvironmentError:
1603            raise service_error(service_error.internal, "Cannot create tmp dir")
1604
1605        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1606        gw_secretkey_base = "fed.%s" % self.ssh_type
1607        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1608        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1609        tclfile = tmpdir + "/experiment.tcl"
1610        tbparams = { }
1611        try:
1612            access_user = self.accessdb[fid]
1613        except KeyError:
1614            raise service_error(service_error.internal,
1615                    "Access map and authorizer out of sync in " + \
1616                            "create_experiment for fedid %s"  % fid)
1617
1618        pid = "dummy"
1619        gid = "dummy"
1620
1621        # The tcl parser needs to read a file so put the content into that file
1622        descr=req.get('experimentdescription', None)
1623        if descr:
1624            file_content=descr.get('ns2description', None)
1625            if file_content:
1626                try:
1627                    f = open(tclfile, 'w')
1628                    f.write(file_content)
1629                    f.close()
1630                except EnvironmentError:
1631                    raise service_error(service_error.internal,
1632                            "Cannot write temp experiment description")
1633            else:
1634                raise service_error(service_error.req, 
1635                        "Only ns2descriptions supported")
1636        else:
1637            raise service_error(service_error.req, "No experiment description")
1638
1639        self.state_lock.acquire()
1640        if self.state.has_key(key):
1641            self.state[key]['experimentStatus'] = "starting"
1642            for e in self.state[key].get('experimentID',[]):
1643                if not expid and e.has_key('fedid'):
1644                    expid = e['fedid']
1645                elif not eid and e.has_key('localname'):
1646                    eid = e['localname']
1647        self.state_lock.release()
1648
1649        if not (eid and expid):
1650            raise service_error(service_error.internal, 
1651                    "Cannot find local experiment info!?")
1652
1653        try: 
1654            # This catches exceptions to clear the placeholder if necessary
1655            try:
1656                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1657            except ValueError:
1658                raise service_error(service_error.server_config, 
1659                        "Bad key type (%s)" % self.ssh_type)
1660
1661            # Copy the service request
1662            tb_services = [ s for s in req.get('service',[]) ]
1663            # Translate to topdl
1664            if self.splitter_url:
1665                self.log.debug("Calling remote topdl translator at %s" % \
1666                        self.splitter_url)
1667                top = self.remote_ns2topdl(self.splitter_url, file_content)
1668            else:
1669                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1670                    str(self.muxmax), '-m', 'dummy']
1671
1672                tclcmd.extend([pid, gid, eid, tclfile])
1673
1674                self.log.debug("running local splitter %s", " ".join(tclcmd))
1675                # This is just fantastic.  As a side effect the parser copies
1676                # tb_compat.tcl into the current directory, so that directory
1677                # must be writable by the fedd user.  Doing this in the
1678                # temporary subdir ensures this is the case.
1679                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1680                        cwd=tmpdir)
1681                split_data = tclparser.stdout
1682
1683                top = topdl.topology_from_xml(file=split_data, top="experiment")
1684
1685            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1686            # Find the testbeds to look up
1687            testbeds = set([ a.value for e in top.elements \
1688                    for a in e.attribute \
1689                        if a.attribute == 'testbed'])
1690
1691            masters = { }           # testbeds exporting services
1692            for s in tb_services:
1693                # If this is a project_export request with the importall field
1694                # set, fill it out.
1695
1696                if s.get('importall', False):
1697                    s['import'] = [ tb for tb in testbeds \
1698                            if tb not in s.get('export',[])]
1699                    del s['importall']
1700
1701                # Add the service to masters
1702                for tb in s.get('export', []):
1703                    if s.get('name', None):
1704                        if tb not in masters:
1705                            masters[tb] = [ ]
1706
1707                        params = { }
1708                        if 'fedAttr' in s:
1709                            for a in s['fedAttr']:
1710                                params[a.get('attribute', '')] = \
1711                                        a.get('value','')
1712
1713                        masters[tb].append(federated_service(name=s['name'],
1714                                exporter=tb, importers=s.get('import',[]),
1715                                params=params, reqs=[]))
1716                    else:
1717                        self.log.error('Testbed service does not have name " + \
1718                                "and importers')
1719
1720
1721            allocated = { }         # Testbeds we can access
1722            topo ={ }               # Sub topologies
1723            connInfo = { }          # Connection information
1724            self.get_access_to_testbeds(testbeds, access_user, allocated, 
1725                    tbparams, masters)
1726
1727            self.split_topology(top, topo, testbeds)
1728
1729            # Copy configuration files into the remote file store
1730            # The config urlpath
1731            configpath = "/%s/config" % expid
1732            # The config file system location
1733            configdir ="%s%s" % ( self.repodir, configpath)
1734            try:
1735                os.makedirs(configdir)
1736            except EnvironmentError, e:
1737                raise service_error(service_error.internal,
1738                        "Cannot create config directory: %s" % e)
1739            try:
1740                f = open("%s/hosts" % configdir, "w")
1741                f.write('\n'.join(hosts))
1742                f.close()
1743            except EnvironmentError, e:
1744                raise service_error(service_error.internal, 
1745                        "Cannot write hosts file: %s" % e)
1746            try:
1747                copy_file("%s" % gw_pubkey, "%s/%s" % \
1748                        (configdir, gw_pubkey_base))
1749                copy_file("%s" % gw_secretkey, "%s/%s" % \
1750                        (configdir, gw_secretkey_base))
1751            except EnvironmentError, e:
1752                raise service_error(service_error.internal, 
1753                        "Cannot copy keyfiles: %s" % e)
1754
1755            # Allow the individual testbeds to access the configuration files.
1756            for tb in tbparams.keys():
1757                asignee = tbparams[tb]['allocID']['fedid']
1758                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1759                    self.auth.set_attribute(asignee, "%s/%s" % (configpath, f))
1760
1761            part = experiment_partition(self.auth, self.store_url, self.tbmap,
1762                    self.muxmax, self.direct_transit)
1763            part.add_portals(top, topo, eid, masters, tbparams, ip_allocator,
1764                    connInfo, expid)
1765            # Now get access to the dynamic testbeds (those added above)
1766            for tb in [ t for t in topo if t not in allocated]:
1767                self.get_access(tb, None, tbparams, access_user, masters)
1768                allocated[tb] = 1
1769                store_keys = topo[tb].get_attribute('store_keys')
1770                # Give the testbed access to keys it exports or imports
1771                if store_keys:
1772                    for sk in store_keys.split(" "):
1773                        self.auth.set_attribute(\
1774                                tbparams[tb]['allocID']['fedid'], sk)
1775
1776            self.wrangle_software(expid, top, topo, tbparams)
1777
1778            vtopo = topdl.topology_to_vtopo(top)
1779            vis = self.genviz(vtopo)
1780
1781            # save federant information
1782            for k in allocated.keys():
1783                tbparams[k]['federant'] = {
1784                        'name': [ { 'localname' : eid} ],
1785                        'allocID' : tbparams[k]['allocID'],
1786                        'uri': tbparams[k]['uri'],
1787                    }
1788
1789            self.state_lock.acquire()
1790            self.state[eid]['vtopo'] = vtopo
1791            self.state[eid]['vis'] = vis
1792            self.state[eid]['experimentdescription'] = \
1793                    { 'topdldescription': top.to_dict() }
1794            self.state[eid]['federant'] = \
1795                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1796                        if tbparams[tb].has_key('federant') ]
1797            if self.state_filename: 
1798                self.write_state()
1799            self.state_lock.release()
1800        except service_error, e:
1801            # If something goes wrong in the parse (usually an access error)
1802            # clear the placeholder state.  From here on out the code delays
1803            # exceptions.  Failing at this point returns a fault to the remote
1804            # caller.
1805
1806            self.state_lock.acquire()
1807            del self.state[eid]
1808            del self.state[expid]
1809            if self.state_filename: self.write_state()
1810            self.state_lock.release()
1811            raise e
1812
1813
1814        # Start the background swapper and return the starting state.  From
1815        # here on out, the state will stick around a while.
1816
1817        # Let users touch the state
1818        self.auth.set_attribute(fid, expid)
1819        self.auth.set_attribute(expid, expid)
1820        # Override fedids can manipulate state as well
1821        for o in self.overrides:
1822            self.auth.set_attribute(o, expid)
1823
1824        # Create a logger that logs to the experiment's state object as well as
1825        # to the main log file.
1826        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1827        alloc_collector = self.list_log(self.state[eid]['log'])
1828        h = logging.StreamHandler(alloc_collector)
1829        # XXX: there should be a global one of these rather than repeating the
1830        # code.
1831        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1832                    '%d %b %y %H:%M:%S'))
1833        alloc_log.addHandler(h)
1834       
1835        attrs = [ 
1836                {
1837                    'attribute': 'ssh_pubkey', 
1838                    'value': '%s/%s/config/%s' % \
1839                            (self.repo_url, expid, gw_pubkey_base)
1840                },
1841                {
1842                    'attribute': 'ssh_secretkey', 
1843                    'value': '%s/%s/config/%s' % \
1844                            (self.repo_url, expid, gw_secretkey_base)
1845                },
1846                {
1847                    'attribute': 'hosts', 
1848                    'value': '%s/%s/config/hosts' % \
1849                            (self.repo_url, expid)
1850                },
1851            ]
1852
1853        # transit and disconnected testbeds may not have a connInfo entry.
1854        # Fill in the blanks.
1855        for t in allocated.keys():
1856            if not connInfo.has_key(t):
1857                connInfo[t] = { }
1858
1859        # Start a thread to do the resource allocation
1860        t  = Thread(target=self.allocate_resources,
1861                args=(allocated, masters, eid, expid, tbparams, 
1862                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1863                    connInfo),
1864                name=eid)
1865        t.start()
1866
1867        rv = {
1868                'experimentID': [
1869                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1870                ],
1871                'experimentStatus': 'starting',
1872            }
1873
1874        return rv
1875   
1876    def get_experiment_fedid(self, key):
1877        """
1878        find the fedid associated with the localname key in the state database.
1879        """
1880
1881        rv = None
1882        self.state_lock.acquire()
1883        if self.state.has_key(key):
1884            if isinstance(self.state[key], dict):
1885                try:
1886                    kl = [ f['fedid'] for f in \
1887                            self.state[key]['experimentID']\
1888                                if f.has_key('fedid') ]
1889                except KeyError:
1890                    self.state_lock.release()
1891                    raise service_error(service_error.internal, 
1892                            "No fedid for experiment %s when getting "+\
1893                                    "fedid(!?)" % key)
1894                if len(kl) == 1:
1895                    rv = kl[0]
1896                else:
1897                    self.state_lock.release()
1898                    raise service_error(service_error.internal, 
1899                            "multiple fedids for experiment %s when " +\
1900                                    "getting fedid(!?)" % key)
1901            else:
1902                self.state_lock.release()
1903                raise service_error(service_error.internal, 
1904                        "Unexpected state for %s" % key)
1905        self.state_lock.release()
1906        return rv
1907
1908    def check_experiment_access(self, fid, key):
1909        """
1910        Confirm that the fid has access to the experiment.  Though a request
1911        may be made in terms of a local name, the access attribute is always
1912        the experiment's fedid.
1913        """
1914        if not isinstance(key, fedid):
1915            key = self.get_experiment_fedid(key)
1916
1917        if self.auth.check_attribute(fid, key):
1918            return True
1919        else:
1920            raise service_error(service_error.access, "Access Denied")
1921
1922
1923    def get_handler(self, path, fid):
1924        self.log.info("Get handler %s %s" % (path, fid))
1925        if self.auth.check_attribute(fid, path):
1926            return ("%s/%s" % (self.repodir, path), "application/binary")
1927        else:
1928            return (None, None)
1929
1930    def get_vtopo(self, req, fid):
1931        """
1932        Return the stored virtual topology for this experiment
1933        """
1934        rv = None
1935        state = None
1936
1937        req = req.get('VtopoRequestBody', None)
1938        if not req:
1939            raise service_error(service_error.req,
1940                    "Bad request format (no VtopoRequestBody)")
1941        exp = req.get('experiment', None)
1942        if exp:
1943            if exp.has_key('fedid'):
1944                key = exp['fedid']
1945                keytype = "fedid"
1946            elif exp.has_key('localname'):
1947                key = exp['localname']
1948                keytype = "localname"
1949            else:
1950                raise service_error(service_error.req, "Unknown lookup type")
1951        else:
1952            raise service_error(service_error.req, "No request?")
1953
1954        self.check_experiment_access(fid, key)
1955
1956        self.state_lock.acquire()
1957        if self.state.has_key(key):
1958            if self.state[key].has_key('vtopo'):
1959                rv = { 'experiment' : {keytype: key },\
1960                        'vtopo': self.state[key]['vtopo'],\
1961                    }
1962            else:
1963                state = self.state[key]['experimentStatus']
1964        self.state_lock.release()
1965
1966        if rv: return rv
1967        else: 
1968            if state:
1969                raise service_error(service_error.partial, 
1970                        "Not ready: %s" % state)
1971            else:
1972                raise service_error(service_error.req, "No such experiment")
1973
1974    def get_vis(self, req, fid):
1975        """
1976        Return the stored visualization for this experiment
1977        """
1978        rv = None
1979        state = None
1980
1981        req = req.get('VisRequestBody', None)
1982        if not req:
1983            raise service_error(service_error.req,
1984                    "Bad request format (no VisRequestBody)")
1985        exp = req.get('experiment', None)
1986        if exp:
1987            if exp.has_key('fedid'):
1988                key = exp['fedid']
1989                keytype = "fedid"
1990            elif exp.has_key('localname'):
1991                key = exp['localname']
1992                keytype = "localname"
1993            else:
1994                raise service_error(service_error.req, "Unknown lookup type")
1995        else:
1996            raise service_error(service_error.req, "No request?")
1997
1998        self.check_experiment_access(fid, key)
1999
2000        self.state_lock.acquire()
2001        if self.state.has_key(key):
2002            if self.state[key].has_key('vis'):
2003                rv =  { 'experiment' : {keytype: key },\
2004                        'vis': self.state[key]['vis'],\
2005                        }
2006            else:
2007                state = self.state[key]['experimentStatus']
2008        self.state_lock.release()
2009
2010        if rv: return rv
2011        else:
2012            if state:
2013                raise service_error(service_error.partial, 
2014                        "Not ready: %s" % state)
2015            else:
2016                raise service_error(service_error.req, "No such experiment")
2017
2018    def clean_info_response(self, rv):
2019        """
2020        Remove the information in the experiment's state object that is not in
2021        the info response.
2022        """
2023        # Remove the owner info (should always be there, but...)
2024        if rv.has_key('owner'): del rv['owner']
2025
2026        # Convert the log into the allocationLog parameter and remove the
2027        # log entry (with defensive programming)
2028        if rv.has_key('log'):
2029            rv['allocationLog'] = "".join(rv['log'])
2030            del rv['log']
2031        else:
2032            rv['allocationLog'] = ""
2033
2034        if rv['experimentStatus'] != 'active':
2035            if rv.has_key('federant'): del rv['federant']
2036        else:
2037            # remove the allocationID and uri info from each federant
2038            for f in rv.get('federant', []):
2039                if f.has_key('allocID'): del f['allocID']
2040                if f.has_key('uri'): del f['uri']
2041
2042        return rv
2043
2044    def get_info(self, req, fid):
2045        """
2046        Return all the stored info about this experiment
2047        """
2048        rv = None
2049
2050        req = req.get('InfoRequestBody', None)
2051        if not req:
2052            raise service_error(service_error.req,
2053                    "Bad request format (no InfoRequestBody)")
2054        exp = req.get('experiment', None)
2055        if exp:
2056            if exp.has_key('fedid'):
2057                key = exp['fedid']
2058                keytype = "fedid"
2059            elif exp.has_key('localname'):
2060                key = exp['localname']
2061                keytype = "localname"
2062            else:
2063                raise service_error(service_error.req, "Unknown lookup type")
2064        else:
2065            raise service_error(service_error.req, "No request?")
2066
2067        self.check_experiment_access(fid, key)
2068
2069        # The state may be massaged by the service function that called
2070        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2071        # state.
2072        self.state_lock.acquire()
2073        if self.state.has_key(key):
2074            rv = copy.deepcopy(self.state[key])
2075        self.state_lock.release()
2076
2077        if rv:
2078            return self.clean_info_response(rv)
2079        else:
2080            raise service_error(service_error.req, "No such experiment")
2081
2082    def get_multi_info(self, req, fid):
2083        """
2084        Return all the stored info that this fedid can access
2085        """
2086        rv = { 'info': [ ] }
2087
2088        self.state_lock.acquire()
2089        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2090            try:
2091                self.check_experiment_access(fid, key)
2092            except service_error, e:
2093                if e.code == service_error.access:
2094                    continue
2095                else:
2096                    self.state_lock.release()
2097                    raise e
2098
2099            if self.state.has_key(key):
2100                e = copy.deepcopy(self.state[key])
2101                e = self.clean_info_response(e)
2102                rv['info'].append(e)
2103        self.state_lock.release()
2104        return rv
2105
2106    def terminate_experiment(self, req, fid):
2107        """
2108        Swap this experiment out on the federants and delete the shared
2109        information
2110        """
2111        tbparams = { }
2112        req = req.get('TerminateRequestBody', None)
2113        if not req:
2114            raise service_error(service_error.req,
2115                    "Bad request format (no TerminateRequestBody)")
2116        force = req.get('force', False)
2117        exp = req.get('experiment', None)
2118        if exp:
2119            if exp.has_key('fedid'):
2120                key = exp['fedid']
2121                keytype = "fedid"
2122            elif exp.has_key('localname'):
2123                key = exp['localname']
2124                keytype = "localname"
2125            else:
2126                raise service_error(service_error.req, "Unknown lookup type")
2127        else:
2128            raise service_error(service_error.req, "No request?")
2129
2130        self.check_experiment_access(fid, key)
2131
2132        dealloc_list = [ ]
2133
2134
2135        # Create a logger that logs to the dealloc_list as well as to the main
2136        # log file.
2137        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2138        h = logging.StreamHandler(self.list_log(dealloc_list))
2139        # XXX: there should be a global one of these rather than repeating the
2140        # code.
2141        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2142                    '%d %b %y %H:%M:%S'))
2143        dealloc_log.addHandler(h)
2144
2145        self.state_lock.acquire()
2146        fed_exp = self.state.get(key, None)
2147
2148        if fed_exp:
2149            # This branch of the conditional holds the lock to generate a
2150            # consistent temporary tbparams variable to deallocate experiments.
2151            # It releases the lock to do the deallocations and reacquires it to
2152            # remove the experiment state when the termination is complete.
2153
2154            # First make sure that the experiment creation is complete.
2155            status = fed_exp.get('experimentStatus', None)
2156
2157            if status:
2158                if status in ('starting', 'terminating'):
2159                    if not force:
2160                        self.state_lock.release()
2161                        raise service_error(service_error.partial, 
2162                                'Experiment still being created or destroyed')
2163                    else:
2164                        self.log.warning('Experiment in %s state ' % status + \
2165                                'being terminated by force.')
2166            else:
2167                # No status??? trouble
2168                self.state_lock.release()
2169                raise service_error(service_error.internal,
2170                        "Experiment has no status!?")
2171
2172            ids = []
2173            #  experimentID is a list of dicts that are self-describing
2174            #  identifiers.  This finds all the fedids and localnames - the
2175            #  keys of self.state - and puts them into ids.
2176            for id in fed_exp.get('experimentID', []):
2177                if id.has_key('fedid'): ids.append(id['fedid'])
2178                if id.has_key('localname'): ids.append(id['localname'])
2179
2180            # Collect the allocation/segment ids into a dict keyed by the fedid
2181            # of the allocation (or a monotonically increasing integer) that
2182            # contains a tuple of uri, aid (which is a dict...)
2183            for i, fed in enumerate(fed_exp.get('federant', [])):
2184                try:
2185                    uri = fed['uri']
2186                    aid = fed['allocID']
2187                    k = fed['allocID'].get('fedid', i)
2188                except KeyError, e:
2189                    continue
2190                tbparams[k] = (uri, aid)
2191            fed_exp['experimentStatus'] = 'terminating'
2192            if self.state_filename: self.write_state()
2193            self.state_lock.release()
2194
2195            # Stop everyone.  NB, wait_for_all waits until a thread starts and
2196            # then completes, so we can't wait if nothing starts.  So, no
2197            # tbparams, no start.
2198            if len(tbparams) > 0:
2199                thread_pool = self.thread_pool(self.nthreads)
2200                for k in tbparams.keys():
2201                    # Create and start a thread to stop the segment
2202                    thread_pool.wait_for_slot()
2203                    uri, aid = tbparams[k]
2204                    t  = self.pooled_thread(\
2205                            target=self.terminate_segment(log=dealloc_log,
2206                                testbed=uri,
2207                                cert_file=self.cert_file, 
2208                                cert_pwd=self.cert_pwd,
2209                                trusted_certs=self.trusted_certs,
2210                                caller=self.call_TerminateSegment),
2211                            args=(uri, aid), name=k,
2212                            pdata=thread_pool, trace_file=self.trace_file)
2213                    t.start()
2214                # Wait for completions
2215                thread_pool.wait_for_all_done()
2216
2217            # release the allocations (failed experiments have done this
2218            # already, and starting experiments may be in odd states, so we
2219            # ignore errors releasing those allocations
2220            try: 
2221                for k in tbparams.keys():
2222                    # This releases access by uri
2223                    uri, aid = tbparams[k]
2224                    self.release_access(None, aid, uri=uri)
2225            except service_error, e:
2226                if status != 'failed' and not force:
2227                    raise e
2228
2229            # Remove the terminated experiment
2230            self.state_lock.acquire()
2231            for id in ids:
2232                if self.state.has_key(id): del self.state[id]
2233
2234            if self.state_filename: self.write_state()
2235            self.state_lock.release()
2236
2237            # Delete any synch points associated with this experiment.  All
2238            # synch points begin with the fedid of the experiment.
2239            fedid_keys = set(["fedid:%s" % f for f in ids \
2240                    if isinstance(f, fedid)])
2241            for k in self.synch_store.all_keys():
2242                try:
2243                    if len(k) > 45 and k[0:46] in fedid_keys:
2244                        self.synch_store.del_value(k)
2245                except synch_store.BadDeletionError:
2246                    pass
2247            self.write_store()
2248               
2249            return { 
2250                    'experiment': exp , 
2251                    'deallocationLog': "".join(dealloc_list),
2252                    }
2253        else:
2254            # Don't forget to release the lock
2255            self.state_lock.release()
2256            raise service_error(service_error.req, "No saved state")
2257
2258
2259    def GetValue(self, req, fid):
2260        """
2261        Get a value from the synchronized store
2262        """
2263        req = req.get('GetValueRequestBody', None)
2264        if not req:
2265            raise service_error(service_error.req,
2266                    "Bad request format (no GetValueRequestBody)")
2267       
2268        name = req['name']
2269        wait = req['wait']
2270        rv = { 'name': name }
2271
2272        if self.auth.check_attribute(fid, name):
2273            self.log.debug("[GetValue] asking for %s " % name)
2274            try:
2275                v = self.synch_store.get_value(name, wait)
2276            except synch_store.RevokedKeyError:
2277                # No more synch on this key
2278                raise service_error(service_error.federant, 
2279                        "Synch key %s revoked" % name)
2280            if v is not None:
2281                rv['value'] = v
2282            self.log.debug("[GetValue] got %s from %s" % (v, name))
2283            return rv
2284        else:
2285            raise service_error(service_error.access, "Access Denied")
2286       
2287
2288    def SetValue(self, req, fid):
2289        """
2290        Set a value in the synchronized store
2291        """
2292        req = req.get('SetValueRequestBody', None)
2293        if not req:
2294            raise service_error(service_error.req,
2295                    "Bad request format (no SetValueRequestBody)")
2296       
2297        name = req['name']
2298        v = req['value']
2299
2300        if self.auth.check_attribute(fid, name):
2301            try:
2302                self.synch_store.set_value(name, v)
2303                self.write_store()
2304                self.log.debug("[SetValue] set %s to %s" % (name, v))
2305            except synch_store.CollisionError:
2306                # Translate into a service_error
2307                raise service_error(service_error.req,
2308                        "Value already set: %s" %name)
2309            except synch_store.RevokedKeyError:
2310                # No more synch on this key
2311                raise service_error(service_error.federant, 
2312                        "Synch key %s revoked" % name)
2313            return { 'name': name, 'value': v }
2314        else:
2315            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.