source: fedd/federation/experiment_control.py @ cd60510

axis_examplecompt_changesinfo-ops
Last change on this file since cd60510 was a7c0bcb, checked in by Ted Faber <faber@…>, 14 years ago

Allow users to pass topdl to fedd_create.py. fedd_create autodetects and composes the proper message. The experiment controller now just takes the topology as given.

  • Property mode set to 100644
File size: 81.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205            self.get_access = self.legacy_get_access
206        elif self.auth_type == 'abac':
207            self.auth = abac_authorizer(load=self.auth_dir)
208        else:
209            raise service_error(service_error.internal, 
210                    "Unknown auth_type: %s" % self.auth_type)
211
212
213        if self.ssh_pubkey_file:
214            try:
215                f = open(self.ssh_pubkey_file, 'r')
216                self.ssh_pubkey = f.read()
217                f.close()
218            except EnvironmentError:
219                raise service_error(service_error.internal,
220                        "Cannot read sshpubkey")
221        else:
222            raise service_error(service_error.internal, 
223                    "No SSH public key file?")
224
225        if not self.ssh_privkey_file:
226            raise service_error(service_error.internal, 
227                    "No SSH public key file?")
228
229
230        if mapdb_file:
231            self.read_mapdb(mapdb_file)
232        else:
233            self.log.warn("[experiment_control] No testbed map, using defaults")
234            self.tbmap = { 
235                    'deter':'https://users.isi.deterlab.net:23235',
236                    'emulab':'https://users.isi.deterlab.net:23236',
237                    'ucb':'https://users.isi.deterlab.net:23237',
238                    }
239
240        if accessdb_file:
241                self.read_accessdb(accessdb_file)
242        else:
243            raise service_error(service_error.internal,
244                    "No accessdb specified in config")
245
246        # Grab saved state.  OK to do this w/o locking because it's read only
247        # and only one thread should be in existence that can see self.state at
248        # this point.
249        if self.state_filename:
250            self.read_state()
251
252        if self.store_filename:
253            self.read_store()
254        else:
255            self.log.warning("No saved synch store")
256            self.synch_store = synch_store
257
258        # Dispatch tables
259        self.soap_services = {\
260                'New': soap_handler('New', self.new_experiment),
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
266                'Terminate': soap_handler('Terminate', 
267                    self.terminate_experiment),
268                'GetValue': soap_handler('GetValue', self.GetValue),
269                'SetValue': soap_handler('SetValue', self.SetValue),
270        }
271
272        self.xmlrpc_services = {\
273                'New': xmlrpc_handler('New', self.new_experiment),
274                'Create': xmlrpc_handler('Create', self.create_experiment),
275                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
276                'Vis': xmlrpc_handler('Vis', self.get_vis),
277                'Info': xmlrpc_handler('Info', self.get_info),
278                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
279                'Terminate': xmlrpc_handler('Terminate',
280                    self.terminate_experiment),
281                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
282                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
283        }
284
285    # Call while holding self.state_lock
286    def write_state(self):
287        """
288        Write a new copy of experiment state after copying the existing state
289        to a backup.
290
291        State format is a simple pickling of the state dictionary.
292        """
293        if os.access(self.state_filename, os.W_OK):
294            copy_file(self.state_filename, \
295                    "%s.bak" % self.state_filename)
296        try:
297            f = open(self.state_filename, 'w')
298            pickle.dump(self.state, f)
299        except EnvironmentError, e:
300            self.log.error("Can't write file %s: %s" % \
301                    (self.state_filename, e))
302        except pickle.PicklingError, e:
303            self.log.error("Pickling problem: %s" % e)
304        except TypeError, e:
305            self.log.error("Pickling problem (TypeError): %s" % e)
306
307    @staticmethod
308    def get_alloc_ids(state):
309        """
310        Pull the fedids of the identifiers of each allocation from the
311        state.  Again, a dict dive that's best isolated.
312
313        Used by read_store and read state
314        """
315
316        return [ f['allocID']['fedid'] 
317                for f in state.get('federant',[]) \
318                    if f.has_key('allocID') and \
319                        f['allocID'].has_key('fedid')]
320
321    # Call while holding self.state_lock
322    def read_state(self):
323        """
324        Read a new copy of experiment state.  Old state is overwritten.
325
326        State format is a simple pickling of the state dictionary.
327        """
328       
329        def get_experiment_id(state):
330            """
331            Pull the fedid experimentID out of the saved state.  This is kind
332            of a gross walk through the dict.
333            """
334
335            if state.has_key('experimentID'):
336                for e in state['experimentID']:
337                    if e.has_key('fedid'):
338                        return e['fedid']
339                else:
340                    return None
341            else:
342                return None
343
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except EnvironmentError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355       
356        for s in self.state.values():
357            try:
358
359                eid = get_experiment_id(s)
360                if eid : 
361                    if self.auth_type == 'legacy':
362                        # XXX: legacy
363                        # Give the owner rights to the experiment
364                        self.auth.set_attribute(s['owner'], eid)
365                        # And holders of the eid as well
366                        self.auth.set_attribute(eid, eid)
367                        # allow overrides to control experiments as well
368                        for o in self.overrides:
369                            self.auth.set_attribute(o, eid)
370                        # Set permissions to allow reading of the software
371                        # repo, if any, as well.
372                        for a in self.get_alloc_ids(s):
373                            self.auth.set_attribute(a, 'repo/%s' % eid)
374                else:
375                    raise KeyError("No experiment id")
376            except KeyError, e:
377                self.log.warning("[read_state]: State ownership or identity " +\
378                        "misformatted in %s: %s" % (self.state_filename, e))
379
380
381    def read_accessdb(self, accessdb_file):
382        """
383        Read the mapping from fedids that can create experiments to their name
384        in the 3-level access namespace.  All will be asserted from this
385        testbed and can include the local username and porject that will be
386        asserted on their behalf by this fedd.  Each fedid is also added to the
387        authorization system with the "create" attribute.
388        """
389        self.accessdb = {}
390        # These are the regexps for parsing the db
391        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
392        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
394        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
395                "\s*->\s*(" + name_expr + ")\s*$")
396        lineno = 0
397
398        # Parse the mappings and store in self.authdb, a dict of
399        # fedid -> (proj, user)
400        try:
401            f = open(accessdb_file, "r")
402            for line in f:
403                lineno += 1
404                line = line.strip()
405                if len(line) == 0 or line.startswith('#'):
406                    continue
407                m = project_line.match(line)
408                if m:
409                    fid = fedid(hexstr=m.group(1))
410                    project, user = m.group(2,3)
411                    if not self.accessdb.has_key(fid):
412                        self.accessdb[fid] = []
413                    self.accessdb[fid].append((project, user))
414                    continue
415
416                m = user_line.match(line)
417                if m:
418                    fid = fedid(hexstr=m.group(1))
419                    project = None
420                    user = m.group(2)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425                self.log.warn("[experiment_control] Error parsing access " +\
426                        "db %s at line %d" %  (accessdb_file, lineno))
427        except EnvironmentError:
428            raise service_error(service_error.internal,
429                    ("Error opening/reading %s as experiment " +\
430                            "control accessdb") %  accessdb_file)
431        f.close()
432
433        # Initialize the authorization attributes
434        # XXX: legacy
435        if self.auth_type == 'legacy':
436            for fid in self.accessdb.keys():
437                self.auth.set_attribute(fid, 'create')
438                self.auth.set_attribute(fid, 'new')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except EnvironmentError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def read_store(self):
467        try:
468            self.synch_store = synch_store()
469            self.synch_store.load(self.store_filename)
470            self.log.debug("[read_store]: Read store from %s" % \
471                    self.store_filename)
472        except EnvironmentError, e:
473            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
474                    % (self.state_filename, e))
475            self.synch_store = synch_store()
476
477        # Set the initial permissions on data in the store.  XXX: This ad hoc
478        # authorization attribute initialization is getting out of hand.
479        # XXX: legacy
480        if self.auth_type == 'legacy':
481            for k in self.synch_store.all_keys():
482                try:
483                    if k.startswith('fedid:'):
484                        fid = fedid(hexstr=k[6:46])
485                        if self.state.has_key(fid):
486                            for a in self.get_alloc_ids(self.state[fid]):
487                                self.auth.set_attribute(a, k)
488                except ValueError, e:
489                    self.log.warn("Cannot deduce permissions for %s" % k)
490
491
492    def write_store(self):
493        """
494        Write a new copy of synch_store after writing current state
495        to a backup.  We use the internal synch_store pickle method to avoid
496        incinsistent data.
497
498        State format is a simple pickling of the store.
499        """
500        if os.access(self.store_filename, os.W_OK):
501            copy_file(self.store_filename, \
502                    "%s.bak" % self.store_filename)
503        try:
504            self.synch_store.save(self.store_filename)
505        except EnvironmentError, e:
506            self.log.error("Can't write file %s: %s" % \
507                    (self.store_filename, e))
508        except TypeError, e:
509            self.log.error("Pickling problem (TypeError): %s" % e)
510
511
512    def remove_dirs(self, dir):
513        """
514        Remove the directory tree and all files rooted at dir.  Log any errors,
515        but continue.
516        """
517        self.log.debug("[removedirs]: removing %s" % dir)
518        try:
519            for path, dirs, files in os.walk(dir, topdown=False):
520                for f in files:
521                    os.remove(os.path.join(path, f))
522                for d in dirs:
523                    os.rmdir(os.path.join(path, d))
524            os.rmdir(dir)
525        except EnvironmentError, e:
526            self.log.error("Error deleting directory tree in %s" % e);
527
528    @staticmethod
529    def make_temp_certfile(expcert, tmpdir):
530        """
531        make a protected copy of the access certificate so the experiment
532        controller can act as the experiment principal.  mkstemp is the most
533        secure way to do that. The directory should be created by
534        mkdtemp.  Return the filename.
535        """
536        if expcert and tmpdir:
537            try:
538                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
539                f = os.fdopen(certf, 'w')
540                print >> f, expcert
541                f.close()
542            except EnvironmentError, e:
543                raise service_error(service_error.internal, 
544                        "Cannot create temp cert file?")
545            return certfn
546        else:
547            return None
548
549       
550    def generate_ssh_keys(self, dest, type="rsa" ):
551        """
552        Generate a set of keys for the gateways to use to talk.
553
554        Keys are of type type and are stored in the required dest file.
555        """
556        valid_types = ("rsa", "dsa")
557        t = type.lower();
558        if t not in valid_types: raise ValueError
559        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
560
561        try:
562            trace = open("/dev/null", "w")
563        except EnvironmentError:
564            raise service_error(service_error.internal,
565                    "Cannot open /dev/null??");
566
567        # May raise CalledProcessError
568        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
569        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
570        if rv != 0:
571            raise service_error(service_error.internal, 
572                    "Cannot generate nonce ssh keys.  %s return code %d" \
573                            % (self.ssh_keygen, rv))
574
575    def gentopo(self, str):
576        """
577        Generate the topology data structure from the splitter's XML
578        representation of it.
579
580        The topology XML looks like:
581            <experiment>
582                <nodes>
583                    <node><vname></vname><ips>ip1:ip2</ips></node>
584                </nodes>
585                <lans>
586                    <lan>
587                        <vname></vname><vnode></vnode><ip></ip>
588                        <bandwidth></bandwidth><member>node:port</member>
589                    </lan>
590                </lans>
591        """
592        class topo_parse:
593            """
594            Parse the topology XML and create the dats structure.
595            """
596            def __init__(self):
597                # Typing of the subelements for data conversion
598                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
599                self.int_subelements = ( 'bandwidth',)
600                self.float_subelements = ( 'delay',)
601                # The final data structure
602                self.nodes = [ ]
603                self.lans =  [ ]
604                self.topo = { \
605                        'node': self.nodes,\
606                        'lan' : self.lans,\
607                    }
608                self.element = { }  # Current element being created
609                self.chars = ""     # Last text seen
610
611            def end_element(self, name):
612                # After each sub element the contents is added to the current
613                # element or to the appropriate list.
614                if name == 'node':
615                    self.nodes.append(self.element)
616                    self.element = { }
617                elif name == 'lan':
618                    self.lans.append(self.element)
619                    self.element = { }
620                elif name in self.str_subelements:
621                    self.element[name] = self.chars
622                    self.chars = ""
623                elif name in self.int_subelements:
624                    self.element[name] = int(self.chars)
625                    self.chars = ""
626                elif name in self.float_subelements:
627                    self.element[name] = float(self.chars)
628                    self.chars = ""
629
630            def found_chars(self, data):
631                self.chars += data.rstrip()
632
633
634        tp = topo_parse();
635        parser = xml.parsers.expat.ParserCreate()
636        parser.EndElementHandler = tp.end_element
637        parser.CharacterDataHandler = tp.found_chars
638
639        parser.Parse(str)
640
641        return tp.topo
642       
643
644    def genviz(self, topo):
645        """
646        Generate the visualization the virtual topology
647        """
648
649        neato = "/usr/local/bin/neato"
650        # These are used to parse neato output and to create the visualization
651        # file.
652        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
653        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
654                "%s</type></node>"
655
656        try:
657            # Node names
658            nodes = [ n['vname'] for n in topo['node'] ]
659            topo_lans = topo['lan']
660        except KeyError, e:
661            raise service_error(service_error.internal, "Bad topology: %s" %e)
662
663        lans = { }
664        links = { }
665
666        # Walk through the virtual topology, organizing the connections into
667        # 2-node connections (links) and more-than-2-node connections (lans).
668        # When a lan is created, it's added to the list of nodes (there's a
669        # node in the visualization for the lan).
670        for l in topo_lans:
671            if links.has_key(l['vname']):
672                if len(links[l['vname']]) < 2:
673                    links[l['vname']].append(l['vnode'])
674                else:
675                    nodes.append(l['vname'])
676                    lans[l['vname']] = links[l['vname']]
677                    del links[l['vname']]
678                    lans[l['vname']].append(l['vnode'])
679            elif lans.has_key(l['vname']):
680                lans[l['vname']].append(l['vnode'])
681            else:
682                links[l['vname']] = [ l['vnode'] ]
683
684
685        # Open up a temporary file for dot to turn into a visualization
686        try:
687            df, dotname = tempfile.mkstemp()
688            dotfile = os.fdopen(df, 'w')
689        except EnvironmentError:
690            raise service_error(service_error.internal,
691                    "Failed to open file in genviz")
692
693        try:
694            dnull = open('/dev/null', 'w')
695        except EnvironmentError:
696            service_error(service_error.internal,
697                    "Failed to open /dev/null in genviz")
698
699        # Generate a dot/neato input file from the links, nodes and lans
700        try:
701            print >>dotfile, "graph G {"
702            for n in nodes:
703                print >>dotfile, '\t"%s"' % n
704            for l in links.keys():
705                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
706            for l in lans.keys():
707                for n in lans[l]:
708                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
709            print >>dotfile, "}"
710            dotfile.close()
711        except TypeError:
712            raise service_error(service_error.internal,
713                    "Single endpoint link in vtopo")
714        except EnvironmentError:
715            raise service_error(service_error.internal, "Cannot write dot file")
716
717        # Use dot to create a visualization
718        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
719                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
720                close_fds=True)
721        dnull.close()
722
723        # Translate dot to vis format
724        vis_nodes = [ ]
725        vis = { 'node': vis_nodes }
726        for line in dot.stdout:
727            m = vis_re.match(line)
728            if m:
729                vn = m.group(1)
730                vis_node = {'name': vn, \
731                        'x': float(m.group(2)),\
732                        'y' : float(m.group(3)),\
733                    }
734                if vn in links.keys() or vn in lans.keys():
735                    vis_node['type'] = 'lan'
736                else:
737                    vis_node['type'] = 'node'
738                vis_nodes.append(vis_node)
739        rv = dot.wait()
740
741        os.remove(dotname)
742        if rv == 0 : return vis
743        else: return None
744
745
746    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
747            cert_pwd=None):
748        """
749        Release access to testbed through fedd
750        """
751
752        if not uri and tbmap:
753            uri = tbmap.get(tb, None)
754        if not uri:
755            raise service_error(service_error.server_config, 
756                    "Unknown testbed: %s" % tb)
757
758        if self.local_access.has_key(uri):
759            resp = self.local_access[uri].ReleaseAccess(\
760                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
761                    fedid(file=cert_file))
762            resp = { 'ReleaseAccessResponseBody': resp } 
763        else:
764            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
765                    cert_file, cert_pwd, self.trusted_certs)
766
767        # better error coding
768
769    def remote_ns2topdl(self, uri, desc):
770
771        req = {
772                'description' : { 'ns2description': desc },
773            }
774
775        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
776                self.trusted_certs)
777
778        if r.has_key('Ns2TopdlResponseBody'):
779            r = r['Ns2TopdlResponseBody']
780            ed = r.get('experimentdescription', None)
781            if ed.has_key('topdldescription'):
782                return topdl.Topology(**ed['topdldescription'])
783            else:
784                raise service_error(service_error.protocol, 
785                        "Bad splitter response (no output)")
786        else:
787            raise service_error(service_error.protocol, "Bad splitter response")
788
789    class start_segment:
790        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
791                cert_pwd=None, trusted_certs=None, caller=None,
792                log_collector=None):
793            self.log = log
794            self.debug = debug
795            self.cert_file = cert_file
796            self.cert_pwd = cert_pwd
797            self.trusted_certs = None
798            self.caller = caller
799            self.testbed = testbed
800            self.log_collector = log_collector
801            self.response = None
802            self.node = { }
803
804        def make_map(self, resp):
805            for e in resp.get('embedding', []):
806                if 'toponame' in e and 'physname' in e:
807                    self.node[e['toponame']] = e['physname'][0]
808
809        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
810            req = {
811                    'allocID': { 'fedid' : aid }, 
812                    'segmentdescription': { 
813                        'topdldescription': topo.to_dict(),
814                    },
815                }
816
817            if connInfo:
818                req['connection'] = connInfo
819
820            import_svcs = [ s for m in masters.values() \
821                    for s in m if self.testbed in s.importers]
822
823            if import_svcs or self.testbed in masters:
824                req['service'] = []
825
826            for s in import_svcs:
827                for r in s.reqs:
828                    sr = copy.deepcopy(r)
829                    sr['visibility'] = 'import';
830                    req['service'].append(sr)
831
832            for s in masters.get(self.testbed, []):
833                for r in s.reqs:
834                    sr = copy.deepcopy(r)
835                    sr['visibility'] = 'export';
836                    req['service'].append(sr)
837
838            if attrs:
839                req['fedAttr'] = attrs
840
841            try:
842                self.log.debug("Calling StartSegment at %s " % uri)
843                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
844                        self.trusted_certs)
845                if r.has_key('StartSegmentResponseBody'):
846                    lval = r['StartSegmentResponseBody'].get('allocationLog',
847                            None)
848                    if lval and self.log_collector:
849                        for line in  lval.splitlines(True):
850                            self.log_collector.write(line)
851                    self.make_map(r['StartSegmentResponseBody'])
852                    self.response = r
853                else:
854                    raise service_error(service_error.internal, 
855                            "Bad response!?: %s" %r)
856                return True
857            except service_error, e:
858                self.log.error("Start segment failed on %s: %s" % \
859                        (self.testbed, e))
860                return False
861
862
863
864    class terminate_segment:
865        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
866                cert_pwd=None, trusted_certs=None, caller=None):
867            self.log = log
868            self.debug = debug
869            self.cert_file = cert_file
870            self.cert_pwd = cert_pwd
871            self.trusted_certs = None
872            self.caller = caller
873            self.testbed = testbed
874
875        def __call__(self, uri, aid ):
876            req = {
877                    'allocID': aid , 
878                }
879            try:
880                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
881                        self.trusted_certs)
882                return True
883            except service_error, e:
884                self.log.error("Terminate segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888    def allocate_resources(self, allocated, masters, eid, expid, 
889            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
890            attrs=None, connInfo={}, tbmap=None, expcert=None):
891
892        started = { }           # Testbeds where a sub-experiment started
893                                # successfully
894
895        # XXX
896        fail_soft = False
897
898        if tbmap is None: tbmap = { }
899
900        log = alloc_log or self.log
901
902        tp = thread_pool(self.nthreads)
903        threads = [ ]
904        starters = [ ]
905
906        if expcert:
907            cert = expcert
908            pw = None
909        else:
910            cert = self.cert_file
911            pw = self.cert_pwd
912
913        for tb in allocated.keys():
914            # Create and start a thread to start the segment, and save it
915            # to get the return value later
916            tb_attrs = copy.copy(attrs)
917            tp.wait_for_slot()
918            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
919            base, suffix = split_testbed(tb)
920            if suffix:
921                tb_attrs.append({'attribute': 'experiment_name', 
922                    'value': "%s-%s" % (eid, suffix)})
923            else:
924                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
925            if not uri:
926                raise service_error(service_error.internal, 
927                        "Unknown testbed %s !?" % tb)
928
929            if tbparams[tb].has_key('allocID') and \
930                    tbparams[tb]['allocID'].has_key('fedid'):
931                aid = tbparams[tb]['allocID']['fedid']
932            else:
933                raise service_error(service_error.internal, 
934                        "No alloc id for testbed %s !?" % tb)
935
936            s = self.start_segment(log=log, debug=self.debug,
937                    testbed=tb, cert_file=cert,
938                    cert_pwd=pw, trusted_certs=self.trusted_certs,
939                    caller=self.call_StartSegment,
940                    log_collector=log_collector)
941            starters.append(s)
942            t  = pooled_thread(\
943                    target=s, name=tb,
944                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
945                    pdata=tp, trace_file=self.trace_file)
946            threads.append(t)
947            t.start()
948
949        # Wait until all finish (keep pinging the log, though)
950        mins = 0
951        revoked = False
952        while not tp.wait_for_all_done(60.0):
953            mins += 1
954            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
955                    % mins)
956            if not revoked and \
957                    len([ t.getName() for t in threads if t.rv == False]) > 0:
958                # a testbed has failed.  Revoke this experiment's
959                # synchronizarion values so that sub experiments will not
960                # deadlock waiting for synchronization that will never happen
961                self.log.info("A subexperiment has failed to swap in, " + \
962                        "revoking synch keys")
963                var_key = "fedid:%s" % expid
964                for k in self.synch_store.all_keys():
965                    if len(k) > 45 and k[0:46] == var_key:
966                        self.synch_store.revoke_key(k)
967                revoked = True
968
969        failed = [ t.getName() for t in threads if not t.rv ]
970        succeeded = [tb for tb in allocated.keys() if tb not in failed]
971
972        # If one failed clean up, unless fail_soft is set
973        if failed:
974            if not fail_soft:
975                tp.clear()
976                for tb in succeeded:
977                    # Create and start a thread to stop the segment
978                    tp.wait_for_slot()
979                    uri = tbparams[tb]['uri']
980                    t  = pooled_thread(\
981                            target=self.terminate_segment(log=log,
982                                testbed=tb,
983                                cert_file=cert, 
984                                cert_pwd=pw,
985                                trusted_certs=self.trusted_certs,
986                                caller=self.call_TerminateSegment),
987                            args=(uri, tbparams[tb]['federant']['allocID']),
988                            name=tb,
989                            pdata=tp, trace_file=self.trace_file)
990                    t.start()
991                # Wait until all finish (if any are being stopped)
992                if succeeded:
993                    tp.wait_for_all_done()
994
995                # release the allocations
996                for tb in tbparams.keys():
997                    try:
998                        self.release_access(tb, tbparams[tb]['allocID'], 
999                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1000                                cert_file=cert, cert_pwd=pw)
1001                    except service_error, e:
1002                        self.log.warn("Error releasing access: %s" % e.desc)
1003                # Remove the placeholder
1004                self.state_lock.acquire()
1005                self.state[eid]['experimentStatus'] = 'failed'
1006                if self.state_filename: self.write_state()
1007                self.state_lock.release()
1008                # Remove the repo dir
1009                self.remove_dirs("%s/%s" %(self.repodir, expid))
1010                # Walk up tmpdir, deleting as we go
1011                if self.cleanup:
1012                    self.remove_dirs(tmpdir)
1013                else:
1014                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1015
1016
1017                log.error("Swap in failed on %s" % ",".join(failed))
1018                return
1019        else:
1020            # Walk through the successes and gather the virtual to physical
1021            # mapping.
1022            embedding = [ ]
1023            for s in starters:
1024                for k, v in s.node.items():
1025                    embedding.append({
1026                        'toponame': k, 
1027                        'physname': [ v],
1028                        'testbed': s.testbed
1029                        })
1030            log.info("[start_segment]: Experiment %s active" % eid)
1031
1032
1033        # Walk up tmpdir, deleting as we go
1034        if self.cleanup:
1035            self.remove_dirs(tmpdir)
1036        else:
1037            log.debug("[start_experiment]: not removing %s" % tmpdir)
1038
1039        # Insert the experiment into our state and update the disk copy.
1040        self.state_lock.acquire()
1041        self.state[expid]['experimentStatus'] = 'active'
1042        self.state[eid] = self.state[expid]
1043        self.state[eid]['experimentdescription']['topdldescription'] = \
1044                top.to_dict()
1045        self.state[eid]['embedding'] = embedding
1046        if self.state_filename: self.write_state()
1047        self.state_lock.release()
1048        return
1049
1050
1051    def add_kit(self, e, kit):
1052        """
1053        Add a Software object created from the list of (install, location)
1054        tuples passed as kit  to the software attribute of an object e.  We
1055        do this enough to break out the code, but it's kind of a hack to
1056        avoid changing the old tuple rep.
1057        """
1058
1059        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1060
1061        if isinstance(e.software, list): e.software.extend(s)
1062        else: e.software = s
1063
1064
1065    def create_experiment_state(self, fid, req, expid, expcert,
1066            state='starting'):
1067        """
1068        Create the initial entry in the experiment's state.  The expid and
1069        expcert are the experiment's fedid and certifacte that represents that
1070        ID, which are installed in the experiment state.  If the request
1071        includes a suggested local name that is used if possible.  If the local
1072        name is already taken by an experiment owned by this user that has
1073        failed, it is overwritten.  Otherwise new letters are added until a
1074        valid localname is found.  The generated local name is returned.
1075        """
1076
1077        if req.has_key('experimentID') and \
1078                req['experimentID'].has_key('localname'):
1079            overwrite = False
1080            eid = req['experimentID']['localname']
1081            # If there's an old failed experiment here with the same local name
1082            # and accessible by this user, we'll overwrite it, otherwise we'll
1083            # fall through and do the collision avoidance.
1084            old_expid = self.get_experiment_fedid(eid)
1085            if old_expid and self.check_experiment_access(fid, old_expid):
1086                self.state_lock.acquire()
1087                status = self.state[eid].get('experimentStatus', None)
1088                if status and status == 'failed':
1089                    # remove the old access attribute
1090                    self.auth.unset_attribute(fid, old_expid)
1091                    self.auth.save()
1092                    overwrite = True
1093                    del self.state[eid]
1094                    del self.state[old_expid]
1095                self.state_lock.release()
1096            self.state_lock.acquire()
1097            while (self.state.has_key(eid) and not overwrite):
1098                eid += random.choice(string.ascii_letters)
1099            # Initial state
1100            self.state[eid] = {
1101                    'experimentID' : \
1102                            [ { 'localname' : eid }, {'fedid': expid } ],
1103                    'experimentStatus': state,
1104                    'experimentAccess': { 'X509' : expcert },
1105                    'owner': fid,
1106                    'log' : [],
1107                }
1108            self.state[expid] = self.state[eid]
1109            if self.state_filename: self.write_state()
1110            self.state_lock.release()
1111        else:
1112            eid = self.exp_stem
1113            for i in range(0,5):
1114                eid += random.choice(string.ascii_letters)
1115            self.state_lock.acquire()
1116            while (self.state.has_key(eid)):
1117                eid = self.exp_stem
1118                for i in range(0,5):
1119                    eid += random.choice(string.ascii_letters)
1120            # Initial state
1121            self.state[eid] = {
1122                    'experimentID' : \
1123                            [ { 'localname' : eid }, {'fedid': expid } ],
1124                    'experimentStatus': state,
1125                    'experimentAccess': { 'X509' : expcert },
1126                    'owner': fid,
1127                    'log' : [],
1128                }
1129            self.state[expid] = self.state[eid]
1130            if self.state_filename: self.write_state()
1131            self.state_lock.release()
1132
1133        return eid
1134
1135
1136    def allocate_ips_to_topo(self, top):
1137        """
1138        Add an ip4_address attribute to all the hosts in the topology, based on
1139        the shared substrates on which they sit.  An /etc/hosts file is also
1140        created and returned as a list of hostfiles entries.  We also return
1141        the allocator, because we may need to allocate IPs to portals
1142        (specifically DRAGON portals).
1143        """
1144        subs = sorted(top.substrates, 
1145                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1146                reverse=True)
1147        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1148        ifs = { }
1149        hosts = [ ]
1150
1151        for idx, s in enumerate(subs):
1152            net_size = len(s.interfaces)+2
1153
1154            a = ips.allocate(net_size)
1155            if a :
1156                base, num = a
1157                if num < net_size: 
1158                    raise service_error(service_error.internal,
1159                            "Allocator returned wrong number of IPs??")
1160            else:
1161                raise service_error(service_error.req, 
1162                        "Cannot allocate IP addresses")
1163            mask = ips.min_alloc
1164            while mask < net_size:
1165                mask *= 2
1166
1167            netmask = ((2**32-1) ^ (mask-1))
1168
1169            base += 1
1170            for i in s.interfaces:
1171                i.attribute.append(
1172                        topdl.Attribute('ip4_address', 
1173                            "%s" % ip_addr(base)))
1174                i.attribute.append(
1175                        topdl.Attribute('ip4_netmask', 
1176                            "%s" % ip_addr(int(netmask))))
1177
1178                hname = i.element.name
1179                if ifs.has_key(hname):
1180                    hosts.append("%s\t%s-%s %s-%d" % \
1181                            (ip_addr(base), hname, s.name, hname,
1182                                ifs[hname]))
1183                else:
1184                    ifs[hname] = 0
1185                    hosts.append("%s\t%s-%s %s-%d %s" % \
1186                            (ip_addr(base), hname, s.name, hname,
1187                                ifs[hname], hname))
1188
1189                ifs[hname] += 1
1190                base += 1
1191        return hosts, ips
1192
1193    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1194            tbparam, masters, tbmap, expid=None, expcert=None):
1195        for tb in testbeds:
1196            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1197                    expcert)
1198            allocated[tb] = 1
1199
1200    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1201            expcert=None):
1202        """
1203        Get access to testbed through fedd and set the parameters for that tb
1204        """
1205        def get_export_project(svcs):
1206            """
1207            Look through for the list of federated_service for this testbed
1208            objects for a project_export service, and extract the project
1209            parameter.
1210            """
1211
1212            pe = [s for s in svcs if s.name=='project_export']
1213            if len(pe) == 1:
1214                return pe[0].params.get('project', None)
1215            elif len(pe) == 0:
1216                return None
1217            else:
1218                raise service_error(service_error.req,
1219                        "More than one project export is not supported")
1220
1221        def add_services(svcs, type, slist):
1222            """
1223            Add the given services to slist.  type is import or export.
1224            """
1225            for i, s in enumerate(svcs):
1226                idx = '%s%d' % (type, i)
1227                sr = {'id': idx, 'name': s.name, 'visibility': type }
1228                if s.params:
1229                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1230                            for k, v in s.params.items()]
1231                slist.append(sr)
1232
1233        uri = tbmap.get(testbed_base(tb), None)
1234        if not uri:
1235            raise service_error(service_error.server_config, 
1236                    "Unknown testbed: %s" % tb)
1237
1238        export_svcs = masters.get(tb,[])
1239        import_svcs = [ s for m in masters.values() \
1240                for s in m \
1241                    if tb in s.importers ]
1242
1243        export_project = get_export_project(export_svcs)
1244        # Compose the credential list so that IDs come before attributes
1245        creds = set()
1246        keys = set()
1247        certs = self.auth.get_creds_for_principal(fid)
1248        if expid:
1249            certs.update(self.auth.get_creds_for_principal(expid))
1250        for c in certs:
1251            keys.add(c.issuer_cert())
1252            creds.add(c.attribute_cert())
1253        creds = list(keys) + list(creds)
1254
1255        if expcert: cert, pw = expcert, None
1256        else: cert, pw = self.cert_file, self.cert_pw
1257
1258        # Request credentials
1259        req = {
1260                'abac_credential': creds,
1261            }
1262        # Make the service request from the services we're importing and
1263        # exporting.  Keep track of the export request ids so we can
1264        # collect the resulting info from the access response.
1265        e_keys = { }
1266        if import_svcs or export_svcs:
1267            slist = []
1268            add_services(import_svcs, 'import', slist)
1269            add_services(export_svcs, 'export', slist)
1270            req['service'] = slist
1271
1272        if self.local_access.has_key(uri):
1273            # Local access call
1274            req = { 'RequestAccessRequestBody' : req }
1275            r = self.local_access[uri].RequestAccess(req, 
1276                    fedid(file=self.cert_file))
1277            r = { 'RequestAccessResponseBody' : r }
1278        else:
1279            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1280
1281        if r.has_key('RequestAccessResponseBody'):
1282            # Through to here we have a valid response, not a fault.
1283            # Access denied is a fault, so something better or worse than
1284            # access denied has happened.
1285            r = r['RequestAccessResponseBody']
1286            self.log.debug("[get_access] Access granted")
1287        else:
1288            raise service_error(service_error.protocol,
1289                        "Bad proxy response")
1290       
1291        tbparam[tb] = { 
1292                "allocID" : r['allocID'],
1293                "uri": uri,
1294                }
1295
1296        # Collect the responses corresponding to the services this testbed
1297        # exports.  These will be the service requests that we will include in
1298        # the start segment requests (with appropriate visibility values) to
1299        # import and export the segments.
1300        for s in r.get('service', []):
1301            id = s.get('id', None)
1302            if id and id in e_keys:
1303                e_keys[id].reqs.append(s)
1304
1305        # Add attributes to parameter space.  We don't allow attributes to
1306        # overlay any parameters already installed.
1307        for a in r.get('fedAttr', []):
1308            try:
1309                if a['attribute'] and \
1310                        isinstance(a['attribute'], basestring)\
1311                        and not tbparam[tb].has_key(a['attribute'].lower()):
1312                    tbparam[tb][a['attribute'].lower()] = a['value']
1313            except KeyError:
1314                self.log.error("Bad attribute in response: %s" % a)
1315
1316
1317    def split_topology(self, top, topo, testbeds):
1318        """
1319        Create the sub-topologies that are needed for experiment instantiation.
1320        """
1321        for tb in testbeds:
1322            topo[tb] = top.clone()
1323            # copy in for loop allows deletions from the original
1324            for e in [ e for e in topo[tb].elements]:
1325                etb = e.get_attribute('testbed')
1326                # NB: elements without a testbed attribute won't appear in any
1327                # sub topologies. 
1328                if not etb or etb != tb:
1329                    for i in e.interface:
1330                        for s in i.subs:
1331                            try:
1332                                s.interfaces.remove(i)
1333                            except ValueError:
1334                                raise service_error(service_error.internal,
1335                                        "Can't remove interface??")
1336                    topo[tb].elements.remove(e)
1337            topo[tb].make_indices()
1338
1339    def wrangle_software(self, expid, top, topo, tbparams):
1340        """
1341        Copy software out to the repository directory, allocate permissions and
1342        rewrite the segment topologies to look for the software in local
1343        places.
1344        """
1345
1346        # Copy the rpms and tarfiles to a distribution directory from
1347        # which the federants can retrieve them
1348        linkpath = "%s/software" %  expid
1349        softdir ="%s/%s" % ( self.repodir, linkpath)
1350        softmap = { }
1351        # These are in a list of tuples format (each kit).  This comprehension
1352        # unwraps them into a single list of tuples that initilaizes the set of
1353        # tuples.
1354        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1355                for p, t in l ])
1356        pkgs.update([x.location for e in top.elements \
1357                for x in e.software])
1358        try:
1359            os.makedirs(softdir)
1360        except EnvironmentError, e:
1361            raise service_error(
1362                    "Cannot create software directory: %s" % e)
1363        # The actual copying.  Everything's converted into a url for copying.
1364        for pkg in pkgs:
1365            loc = pkg
1366
1367            scheme, host, path = urlparse(loc)[0:3]
1368            dest = os.path.basename(path)
1369            if not scheme:
1370                if not loc.startswith('/'):
1371                    loc = "/%s" % loc
1372                loc = "file://%s" %loc
1373            try:
1374                u = urlopen(loc)
1375            except Exception, e:
1376                raise service_error(service_error.req, 
1377                        "Cannot open %s: %s" % (loc, e))
1378            try:
1379                f = open("%s/%s" % (softdir, dest) , "w")
1380                self.log.debug("Writing %s/%s" % (softdir,dest) )
1381                data = u.read(4096)
1382                while data:
1383                    f.write(data)
1384                    data = u.read(4096)
1385                f.close()
1386                u.close()
1387            except Exception, e:
1388                raise service_error(service_error.internal,
1389                        "Could not copy %s: %s" % (loc, e))
1390            path = re.sub("/tmp", "", linkpath)
1391            # XXX
1392            softmap[pkg] = \
1393                    "%s/%s/%s" %\
1394                    ( self.repo_url, path, dest)
1395
1396            # Allow the individual segments to access the software.
1397            for tb in tbparams.keys():
1398                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1399                        "/%s/%s" % ( path, dest))
1400            self.auth.save()
1401
1402        # Convert the software locations in the segments into the local
1403        # copies on this host
1404        for soft in [ s for tb in topo.values() \
1405                for e in tb.elements \
1406                    if getattr(e, 'software', False) \
1407                        for s in e.software ]:
1408            if softmap.has_key(soft.location):
1409                soft.location = softmap[soft.location]
1410
1411
1412    def new_experiment(self, req, fid):
1413        """
1414        The external interface to empty initial experiment creation called from
1415        the dispatcher.
1416
1417        Creates a working directory, splits the incoming description using the
1418        splitter script and parses out the avrious subsections using the
1419        lcasses above.  Once each sub-experiment is created, use pooled threads
1420        to instantiate them and start it all up.
1421        """
1422        req = req.get('NewRequestBody', None)
1423        if not req:
1424            raise service_error(service_error.req,
1425                    "Bad request format (no NewRequestBody)")
1426
1427        if self.auth.import_credentials(data_list=req.get('credential', [])):
1428            self.auth.save()
1429
1430        if not self.auth.check_attribute(fid, 'new'):
1431            raise service_error(service_error.access, "New access denied")
1432
1433        try:
1434            tmpdir = tempfile.mkdtemp(prefix="split-")
1435        except EnvironmentError:
1436            raise service_error(service_error.internal, "Cannot create tmp dir")
1437
1438        try:
1439            access_user = self.accessdb[fid]
1440        except KeyError:
1441            raise service_error(service_error.internal,
1442                    "Access map and authorizer out of sync in " + \
1443                            "new_experiment for fedid %s"  % fid)
1444
1445        # Generate an ID for the experiment (slice) and a certificate that the
1446        # allocator can use to prove they own it.  We'll ship it back through
1447        # the encrypted connection.  If the requester supplied one, use it.
1448        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1449            expcert = req['experimentAccess']['X509']
1450            expid = fedid(certstr=expcert)
1451            self.state_lock.acquire()
1452            if expid in self.state:
1453                self.state_lock.release()
1454                raise service_error(service_error.req, 
1455                        'fedid %s identifies an existing experiment' % expid)
1456            self.state_lock.release()
1457        else:
1458            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1459
1460        #now we're done with the tmpdir, and it should be empty
1461        if self.cleanup:
1462            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1463            os.rmdir(tmpdir)
1464        else:
1465            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1466
1467        eid = self.create_experiment_state(fid, req, expid, expcert, 
1468                state='empty')
1469
1470        # Let users touch the state
1471        self.auth.set_attribute(fid, expid)
1472        self.auth.set_attribute(expid, expid)
1473        # Override fedids can manipulate state as well
1474        for o in self.overrides:
1475            self.auth.set_attribute(o, expid)
1476        self.auth.save()
1477
1478        rv = {
1479                'experimentID': [
1480                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1481                ],
1482                'experimentStatus': 'empty',
1483                'experimentAccess': { 'X509' : expcert }
1484            }
1485
1486        return rv
1487
1488    # create_experiment sub-functions
1489
1490    @staticmethod
1491    def get_experiment_key(req, field='experimentID'):
1492        """
1493        Parse the experiment identifiers out of the request (the request body
1494        tag has been removed).  Specifically this pulls either the fedid or the
1495        localname out of the experimentID field.  A fedid is preferred.  If
1496        neither is present or the request does not contain the fields,
1497        service_errors are raised.
1498        """
1499        # Get the experiment access
1500        exp = req.get(field, None)
1501        if exp:
1502            if exp.has_key('fedid'):
1503                key = exp['fedid']
1504            elif exp.has_key('localname'):
1505                key = exp['localname']
1506            else:
1507                raise service_error(service_error.req, "Unknown lookup type")
1508        else:
1509            raise service_error(service_error.req, "No request?")
1510
1511        return key
1512
1513    def get_experiment_ids_and_start(self, key, tmpdir):
1514        """
1515        Get the experiment name, id and access certificate from the state, and
1516        set the experiment state to 'starting'.  returns a triple (fedid,
1517        localname, access_cert_file). The access_cert_file is a copy of the
1518        contents of the access certificate, created in the tempdir with
1519        restricted permissions.  If things are confused, raise an exception.
1520        """
1521
1522        expid = eid = None
1523        self.state_lock.acquire()
1524        if self.state.has_key(key):
1525            self.state[key]['experimentStatus'] = "starting"
1526            for e in self.state[key].get('experimentID',[]):
1527                if not expid and e.has_key('fedid'):
1528                    expid = e['fedid']
1529                elif not eid and e.has_key('localname'):
1530                    eid = e['localname']
1531            if 'experimentAccess' in self.state[key] and \
1532                    'X509' in self.state[key]['experimentAccess']:
1533                expcert = self.state[key]['experimentAccess']['X509']
1534            else:
1535                expcert = None
1536        self.state_lock.release()
1537
1538        # make a protected copy of the access certificate so the experiment
1539        # controller can act as the experiment principal.
1540        if expcert:
1541            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1542            if not expcert_file:
1543                raise service_error(service_error.internal, 
1544                        "Cannot create temp cert file?")
1545        else:
1546            expcert_file = None
1547
1548        return (eid, expid, expcert_file)
1549
1550    def get_topology(self, req, tmpdir):
1551        """
1552        Get the ns2 content and put it into a file for parsing.  Call the local
1553        or remote parser and return the topdl.Topology.  Errors result in
1554        exceptions.  req is the request and tmpdir is a work directory.
1555        """
1556
1557        # The tcl parser needs to read a file so put the content into that file
1558        descr=req.get('experimentdescription', None)
1559        if descr:
1560            if 'ns2description' in descr:
1561                file_content=descr['ns2description']
1562            elif 'topdldescription' in descr:
1563                return topdl.Topology(**descr['topdldescription'])
1564            else:
1565                raise service_error(service_error.req, 
1566                        'Unknown experiment description type')
1567        else:
1568            raise service_error(service_error.req, "No experiment description")
1569
1570
1571        if self.splitter_url:
1572            self.log.debug("Calling remote topdl translator at %s" % \
1573                    self.splitter_url)
1574            top = self.remote_ns2topdl(self.splitter_url, file_content)
1575        else:
1576            tclfile = os.path.join(tmpdir, "experiment.tcl")
1577            if file_content:
1578                try:
1579                    f = open(tclfile, 'w')
1580                    f.write(file_content)
1581                    f.close()
1582                except EnvironmentError:
1583                    raise service_error(service_error.internal,
1584                            "Cannot write temp experiment description")
1585            else:
1586                raise service_error(service_error.req, 
1587                        "Only ns2descriptions supported")
1588            pid = "dummy"
1589            gid = "dummy"
1590            eid = "dummy"
1591
1592            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1593                str(self.muxmax), '-m', 'dummy']
1594
1595            tclcmd.extend([pid, gid, eid, tclfile])
1596
1597            self.log.debug("running local splitter %s", " ".join(tclcmd))
1598            # This is just fantastic.  As a side effect the parser copies
1599            # tb_compat.tcl into the current directory, so that directory
1600            # must be writable by the fedd user.  Doing this in the
1601            # temporary subdir ensures this is the case.
1602            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1603                    cwd=tmpdir)
1604            split_data = tclparser.stdout
1605
1606            top = topdl.topology_from_xml(file=split_data, top="experiment")
1607            os.remove(tclfile)
1608
1609        return top
1610
1611    def get_testbed_services(self, req):
1612        """
1613        Parse the services section of the request into into two dicts mapping
1614        testbed to lists of federated_service objects.  The first lists all
1615        exporters of services, and the second all exporters of services that
1616        need control portals int the experiment.
1617        """
1618        masters = { }
1619        pmasters = { }
1620        for s in req.get('service', []):
1621            # If this is a service request with the importall field
1622            # set, fill it out.
1623
1624            if s.get('importall', False):
1625                s['import'] = [ tb for tb in testbeds \
1626                        if tb not in s.get('export',[])]
1627                del s['importall']
1628
1629            # Add the service to masters
1630            for tb in s.get('export', []):
1631                if s.get('name', None):
1632
1633                    params = { }
1634                    for a in s.get('fedAttr', []):
1635                        params[a.get('attribute', '')] = a.get('value','')
1636
1637                    fser = federated_service(name=s['name'],
1638                            exporter=tb, importers=s.get('import',[]),
1639                            params=params)
1640                    if fser.name == 'hide_hosts' \
1641                            and 'hosts' not in fser.params:
1642                        fser.params['hosts'] = \
1643                                ",".join(tb_hosts.get(fser.exporter, []))
1644                    if tb in masters: masters[tb].append(fser)
1645                    else: masters[tb] = [fser]
1646
1647                    if fser.portal:
1648                        if tb not in pmasters: pmasters[tb] = [ fser ]
1649                        else: pmasters[tb].append(fser)
1650                else:
1651                    self.log.error('Testbed service does not have name " + \
1652                            "and importers')
1653        return masters, pmasters
1654
1655    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1656        """
1657        Create the ssh keys necessary for interconnecting the potral nodes and
1658        the global hosts file for letting each segment know about the IP
1659        addresses in play.  Save these into the repo.  Add attributes to the
1660        autorizer allowing access controllers to download them and return a set
1661        of attributes that inform the segments where to find this stuff.  Mau
1662        raise service_errors in if there are problems.
1663        """
1664        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1665        gw_secretkey_base = "fed.%s" % self.ssh_type
1666        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1667        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1668
1669        try:
1670            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1671        except ValueError:
1672            raise service_error(service_error.server_config, 
1673                    "Bad key type (%s)" % self.ssh_type)
1674
1675
1676        # Copy configuration files into the remote file store
1677        # The config urlpath
1678        configpath = "/%s/config" % expid
1679        # The config file system location
1680        configdir ="%s%s" % ( self.repodir, configpath)
1681        try:
1682            os.makedirs(configdir)
1683        except EnvironmentError, e:
1684            raise service_error(service_error.internal,
1685                    "Cannot create config directory: %s" % e)
1686        try:
1687            f = open("%s/hosts" % configdir, "w")
1688            print >> f, string.join(hosts, '\n')
1689            f.close()
1690        except EnvironmentError, e:
1691            raise service_error(service_error.internal, 
1692                    "Cannot write hosts file: %s" % e)
1693        try:
1694            copy_file("%s" % gw_pubkey, "%s/%s" % \
1695                    (configdir, gw_pubkey_base))
1696            copy_file("%s" % gw_secretkey, "%s/%s" % \
1697                    (configdir, gw_secretkey_base))
1698        except EnvironmentError, e:
1699            raise service_error(service_error.internal, 
1700                    "Cannot copy keyfiles: %s" % e)
1701
1702        # Allow the individual testbeds to access the configuration files.
1703        for tb in tbparams.keys():
1704            asignee = tbparams[tb]['allocID']['fedid']
1705            for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1706                self.auth.set_attribute(asignee, "%s/%s" % \
1707                        (configpath, f))
1708            self.auth.save()
1709
1710        attrs = [ 
1711                {
1712                    'attribute': 'ssh_pubkey', 
1713                    'value': '%s/%s/config/%s' % \
1714                            (self.repo_url, expid, gw_pubkey_base)
1715                },
1716                {
1717                    'attribute': 'ssh_secretkey', 
1718                    'value': '%s/%s/config/%s' % \
1719                            (self.repo_url, expid, gw_secretkey_base)
1720                },
1721                {
1722                    'attribute': 'hosts', 
1723                    'value': '%s/%s/config/hosts' % \
1724                            (self.repo_url, expid)
1725                },
1726            ]
1727        return attrs
1728
1729
1730    def get_vtopo(self, req, fid):
1731        """
1732        Return the stored virtual topology for this experiment
1733        """
1734        rv = None
1735        state = None
1736
1737        req = req.get('VtopoRequestBody', None)
1738        if not req:
1739            raise service_error(service_error.req,
1740                    "Bad request format (no VtopoRequestBody)")
1741        exp = req.get('experiment', None)
1742        if exp:
1743            if exp.has_key('fedid'):
1744                key = exp['fedid']
1745                keytype = "fedid"
1746            elif exp.has_key('localname'):
1747                key = exp['localname']
1748                keytype = "localname"
1749            else:
1750                raise service_error(service_error.req, "Unknown lookup type")
1751        else:
1752            raise service_error(service_error.req, "No request?")
1753
1754        self.check_experiment_access(fid, key)
1755
1756        self.state_lock.acquire()
1757        if self.state.has_key(key):
1758            if self.state[key].has_key('vtopo'):
1759                rv = { 'experiment' : {keytype: key },\
1760                        'vtopo': self.state[key]['vtopo'],\
1761                    }
1762            else:
1763                state = self.state[key]['experimentStatus']
1764        self.state_lock.release()
1765
1766        if rv: return rv
1767        else: 
1768            if state:
1769                raise service_error(service_error.partial, 
1770                        "Not ready: %s" % state)
1771            else:
1772                raise service_error(service_error.req, "No such experiment")
1773
1774    def get_vis(self, req, fid):
1775        """
1776        Return the stored visualization for this experiment
1777        """
1778        rv = None
1779        state = None
1780
1781        req = req.get('VisRequestBody', None)
1782        if not req:
1783            raise service_error(service_error.req,
1784                    "Bad request format (no VisRequestBody)")
1785        exp = req.get('experiment', None)
1786        if exp:
1787            if exp.has_key('fedid'):
1788                key = exp['fedid']
1789                keytype = "fedid"
1790            elif exp.has_key('localname'):
1791                key = exp['localname']
1792                keytype = "localname"
1793            else:
1794                raise service_error(service_error.req, "Unknown lookup type")
1795        else:
1796            raise service_error(service_error.req, "No request?")
1797
1798        self.check_experiment_access(fid, key)
1799
1800        self.state_lock.acquire()
1801        if self.state.has_key(key):
1802            if self.state[key].has_key('vis'):
1803                rv =  { 'experiment' : {keytype: key },\
1804                        'vis': self.state[key]['vis'],\
1805                        }
1806            else:
1807                state = self.state[key]['experimentStatus']
1808        self.state_lock.release()
1809
1810        if rv: return rv
1811        else:
1812            if state:
1813                raise service_error(service_error.partial, 
1814                        "Not ready: %s" % state)
1815            else:
1816                raise service_error(service_error.req, "No such experiment")
1817
1818   
1819    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1820            top):
1821        """
1822        Store the various data that have changed in the experiment state
1823        between when it was started and the beginning of resource allocation.
1824        This is basically the information about each local allocation.  This
1825        fills in the values of the placeholder allocation in the state.
1826        """
1827        # save federant information
1828        for k in allocated.keys():
1829            tbparams[k]['federant'] = {
1830                    'name': [ { 'localname' : eid} ],
1831                    'allocID' : tbparams[k]['allocID'],
1832                    'uri': tbparams[k]['uri'],
1833                }
1834
1835        self.state_lock.acquire()
1836        self.state[eid]['vtopo'] = vtopo
1837        self.state[eid]['vis'] = vis
1838        self.state[eid]['experimentdescription'] = \
1839                { 'topdldescription': top.to_dict() }
1840        self.state[eid]['federant'] = \
1841                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1842                    if tbparams[tb].has_key('federant') ]
1843        if self.state_filename: 
1844            self.write_state()
1845        self.state_lock.release()
1846
1847    def clear_placeholder(self, eid, expid, tmpdir):
1848        """
1849        Clear the placeholder and remove any allocated temporary dir.
1850        """
1851
1852        self.state_lock.acquire()
1853        del self.state[eid]
1854        del self.state[expid]
1855        if self.state_filename: self.write_state()
1856        self.state_lock.release()
1857        if tmpdir and self.cleanup:
1858            self.remove_dirs(tmpdir)
1859
1860    # end of create_experiment sub-functions
1861
1862    def create_experiment(self, req, fid):
1863        """
1864        The external interface to experiment creation called from the
1865        dispatcher.
1866
1867        Creates a working directory, splits the incoming description using the
1868        splitter script and parses out the various subsections using the
1869        classes above.  Once each sub-experiment is created, use pooled threads
1870        to instantiate them and start it all up.
1871        """
1872
1873        req = req.get('CreateRequestBody', None)
1874        if req:
1875            key = self.get_experiment_key(req)
1876        else:
1877            raise service_error(service_error.req,
1878                    "Bad request format (no CreateRequestBody)")
1879
1880        # Import information from the requester
1881        if self.auth.import_credentials(data_list=req.get('credential', [])):
1882            self.auth.save()
1883
1884        # Make sure that the caller can talk to us
1885        self.check_experiment_access(fid, key)
1886
1887        # Install the testbed map entries supplied with the request into a copy
1888        # of the testbed map.
1889        tbmap = dict(self.tbmap)
1890        for m in req.get('testbedmap', []):
1891            if 'testbed' in m and 'uri' in m:
1892                tbmap[m['testbed']] = m['uri']
1893
1894        # a place to work
1895        try:
1896            tmpdir = tempfile.mkdtemp(prefix="split-")
1897            os.mkdir(tmpdir+"/keys")
1898        except EnvironmentError:
1899            raise service_error(service_error.internal, "Cannot create tmp dir")
1900
1901        tbparams = { }
1902
1903        eid, expid, expcert_file = \
1904                self.get_experiment_ids_and_start(key, tmpdir)
1905
1906        # This catches exceptions to clear the placeholder if necessary
1907        try: 
1908            if not (eid and expid):
1909                raise service_error(service_error.internal, 
1910                        "Cannot find local experiment info!?")
1911
1912            top = self.get_topology(req, tmpdir)
1913            # Assign the IPs
1914            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1915            # Find the testbeds to look up
1916            tb_hosts = { }
1917            testbeds = [ ]
1918            for e in top.elements:
1919                if isinstance(e, topdl.Computer):
1920                    tb = e.get_attribute('testbed') or 'default'
1921                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1922                    else: 
1923                        tb_hosts[tb] = [ e.name ]
1924                        testbeds.append(tb)
1925
1926            masters, pmasters = self.get_testbed_services(req)
1927            allocated = { }         # Testbeds we can access
1928            topo ={ }               # Sub topologies
1929            connInfo = { }          # Connection information
1930
1931            self.get_access_to_testbeds(testbeds, fid, allocated, 
1932                    tbparams, masters, tbmap, expid, expcert_file)
1933
1934            self.split_topology(top, topo, testbeds)
1935
1936            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
1937
1938            part = experiment_partition(self.auth, self.store_url, tbmap,
1939                    self.muxmax, self.direct_transit)
1940            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1941                    connInfo, expid)
1942            # Now get access to the dynamic testbeds (those added above)
1943            for tb in [ t for t in topo if t not in allocated]:
1944                self.get_access(tb, tbparams, fid, masters, tbmap, 
1945                        expid, expcert_file)
1946                allocated[tb] = 1
1947                store_keys = topo[tb].get_attribute('store_keys')
1948                # Give the testbed access to keys it exports or imports
1949                if store_keys:
1950                    for sk in store_keys.split(" "):
1951                        self.auth.set_attribute(\
1952                                tbparams[tb]['allocID']['fedid'], sk)
1953            self.auth.save()
1954
1955            # transit and disconnected testbeds may not have a connInfo entry.
1956            # Fill in the blanks.
1957            for t in allocated.keys():
1958                if not connInfo.has_key(t):
1959                    connInfo[t] = { }
1960
1961            self.wrangle_software(expid, top, topo, tbparams)
1962
1963            vtopo = topdl.topology_to_vtopo(top)
1964            vis = self.genviz(vtopo)
1965            self.save_federant_information(allocated, tbparams, eid, vtopo, 
1966                    vis, top)
1967        except service_error, e:
1968            # If something goes wrong in the parse (usually an access error)
1969            # clear the placeholder state.  From here on out the code delays
1970            # exceptions.  Failing at this point returns a fault to the remote
1971            # caller.
1972            self.clear_placeholder(eid, expid, tmpdir)
1973            raise e
1974
1975        # Start the background swapper and return the starting state.  From
1976        # here on out, the state will stick around a while.
1977
1978        # Let users touch the state
1979        self.auth.set_attribute(fid, expid)
1980        self.auth.set_attribute(expid, expid)
1981        # Override fedids can manipulate state as well
1982        for o in self.overrides:
1983            self.auth.set_attribute(o, expid)
1984        self.auth.save()
1985
1986        # Create a logger that logs to the experiment's state object as well as
1987        # to the main log file.
1988        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1989        alloc_collector = self.list_log(self.state[eid]['log'])
1990        h = logging.StreamHandler(alloc_collector)
1991        # XXX: there should be a global one of these rather than repeating the
1992        # code.
1993        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1994                    '%d %b %y %H:%M:%S'))
1995        alloc_log.addHandler(h)
1996
1997        # Start a thread to do the resource allocation
1998        t  = Thread(target=self.allocate_resources,
1999                args=(allocated, masters, eid, expid, tbparams, 
2000                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2001                    connInfo, tbmap, expcert_file),
2002                name=eid)
2003        t.start()
2004
2005        rv = {
2006                'experimentID': [
2007                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2008                ],
2009                'experimentStatus': 'starting',
2010            }
2011
2012        return rv
2013   
2014    def get_experiment_fedid(self, key):
2015        """
2016        find the fedid associated with the localname key in the state database.
2017        """
2018
2019        rv = None
2020        self.state_lock.acquire()
2021        if self.state.has_key(key):
2022            if isinstance(self.state[key], dict):
2023                try:
2024                    kl = [ f['fedid'] for f in \
2025                            self.state[key]['experimentID']\
2026                                if f.has_key('fedid') ]
2027                except KeyError:
2028                    self.state_lock.release()
2029                    raise service_error(service_error.internal, 
2030                            "No fedid for experiment %s when getting "+\
2031                                    "fedid(!?)" % key)
2032                if len(kl) == 1:
2033                    rv = kl[0]
2034                else:
2035                    self.state_lock.release()
2036                    raise service_error(service_error.internal, 
2037                            "multiple fedids for experiment %s when " +\
2038                                    "getting fedid(!?)" % key)
2039            else:
2040                self.state_lock.release()
2041                raise service_error(service_error.internal, 
2042                        "Unexpected state for %s" % key)
2043        self.state_lock.release()
2044        return rv
2045
2046    def check_experiment_access(self, fid, key):
2047        """
2048        Confirm that the fid has access to the experiment.  Though a request
2049        may be made in terms of a local name, the access attribute is always
2050        the experiment's fedid.
2051        """
2052        if not isinstance(key, fedid):
2053            key = self.get_experiment_fedid(key)
2054
2055        if self.auth.check_attribute(fid, key):
2056            return True
2057        else:
2058            raise service_error(service_error.access, "Access Denied")
2059
2060
2061    def get_handler(self, path, fid):
2062        """
2063        Perhaps surprisingly named, this function handles HTTP GET requests to
2064        this server (SOAP requests are POSTs).
2065        """
2066        self.log.info("Get handler %s %s" % (path, fid))
2067        if self.auth.check_attribute(fid, path):
2068            return ("%s/%s" % (self.repodir, path), "application/binary")
2069        else:
2070            return (None, None)
2071
2072    def clean_info_response(self, rv):
2073        """
2074        Remove the information in the experiment's state object that is not in
2075        the info response.
2076        """
2077        # Remove the owner info (should always be there, but...)
2078        if rv.has_key('owner'): del rv['owner']
2079
2080        # Convert the log into the allocationLog parameter and remove the
2081        # log entry (with defensive programming)
2082        if rv.has_key('log'):
2083            rv['allocationLog'] = "".join(rv['log'])
2084            del rv['log']
2085        else:
2086            rv['allocationLog'] = ""
2087
2088        if rv['experimentStatus'] != 'active':
2089            if rv.has_key('federant'): del rv['federant']
2090        else:
2091            # remove the allocationID and uri info from each federant
2092            for f in rv.get('federant', []):
2093                if f.has_key('allocID'): del f['allocID']
2094                if f.has_key('uri'): del f['uri']
2095
2096        return rv
2097
2098    def get_info(self, req, fid):
2099        """
2100        Return all the stored info about this experiment
2101        """
2102        rv = None
2103
2104        req = req.get('InfoRequestBody', None)
2105        if not req:
2106            raise service_error(service_error.req,
2107                    "Bad request format (no InfoRequestBody)")
2108        exp = req.get('experiment', None)
2109        if exp:
2110            if exp.has_key('fedid'):
2111                key = exp['fedid']
2112                keytype = "fedid"
2113            elif exp.has_key('localname'):
2114                key = exp['localname']
2115                keytype = "localname"
2116            else:
2117                raise service_error(service_error.req, "Unknown lookup type")
2118        else:
2119            raise service_error(service_error.req, "No request?")
2120
2121        self.check_experiment_access(fid, key)
2122
2123        # The state may be massaged by the service function that called
2124        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2125        # state.
2126        self.state_lock.acquire()
2127        if self.state.has_key(key):
2128            rv = copy.deepcopy(self.state[key])
2129        self.state_lock.release()
2130
2131        if rv:
2132            return self.clean_info_response(rv)
2133        else:
2134            raise service_error(service_error.req, "No such experiment")
2135
2136    def get_multi_info(self, req, fid):
2137        """
2138        Return all the stored info that this fedid can access
2139        """
2140        rv = { 'info': [ ] }
2141
2142        self.state_lock.acquire()
2143        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2144            try:
2145                self.check_experiment_access(fid, key)
2146            except service_error, e:
2147                if e.code == service_error.access:
2148                    continue
2149                else:
2150                    self.state_lock.release()
2151                    raise e
2152
2153            if self.state.has_key(key):
2154                e = copy.deepcopy(self.state[key])
2155                e = self.clean_info_response(e)
2156                rv['info'].append(e)
2157        self.state_lock.release()
2158        return rv
2159
2160    def check_termination_status(self, fed_exp, force):
2161        """
2162        Confirm that the experiment is sin a valid state to stop (or force it)
2163        return the state - invalid states for deletion and force settings cause
2164        exceptions.
2165        """
2166        self.state_lock.acquire()
2167        status = fed_exp.get('experimentStatus', None)
2168
2169        if status:
2170            if status in ('starting', 'terminating'):
2171                if not force:
2172                    self.state_lock.release()
2173                    raise service_error(service_error.partial, 
2174                            'Experiment still being created or destroyed')
2175                else:
2176                    self.log.warning('Experiment in %s state ' % status + \
2177                            'being terminated by force.')
2178            self.state_lock.release()
2179            return status
2180        else:
2181            # No status??? trouble
2182            self.state_lock.release()
2183            raise service_error(service_error.internal,
2184                    "Experiment has no status!?")
2185
2186
2187    def get_termination_info(self, fed_exp):
2188        ids = []
2189        term_params = { }
2190        self.state_lock.acquire()
2191        #  experimentID is a list of dicts that are self-describing
2192        #  identifiers.  This finds all the fedids and localnames - the
2193        #  keys of self.state - and puts them into ids, which is used to delete
2194        #  the state after everything is swapped out.
2195        for id in fed_exp.get('experimentID', []):
2196            if 'fedid' in id: 
2197                ids.append(id['fedid'])
2198                repo = "%s" % id['fedid']
2199            if 'localname' in id: ids.append(id['localname'])
2200
2201        # Get the experimentAccess - the principal for this experiment.  It
2202        # is this principal to which credentials have been delegated, and
2203        # as which the experiment controller must act.
2204        if 'experimentAccess' in fed_exp and \
2205                'X509' in fed_exp['experimentAccess']:
2206            expcert = fed_exp['experimentAccess']['X509']
2207        else:
2208            expcert = None
2209
2210        # Collect the allocation/segment ids into a dict keyed by the fedid
2211        # of the allocation (or a monotonically increasing integer) that
2212        # contains a tuple of uri, aid (which is a dict...)
2213        for i, fed in enumerate(fed_exp.get('federant', [])):
2214            try:
2215                uri = fed['uri']
2216                aid = fed['allocID']
2217                k = fed['allocID'].get('fedid', i)
2218            except KeyError, e:
2219                continue
2220            term_params[k] = (uri, aid)
2221        # Change the experiment state
2222        fed_exp['experimentStatus'] = 'terminating'
2223        if self.state_filename: self.write_state()
2224        self.state_lock.release()
2225
2226        return ids, term_params, expcert, repo
2227
2228
2229    def deallocate_resources(self, term_params, expcert, status, force, 
2230            dealloc_log):
2231        tmpdir = None
2232        # This try block makes sure the tempdir is cleared
2233        try:
2234            # If no expcert, try the deallocation as the experiment
2235            # controller instance.
2236            if expcert and self.auth_type != 'legacy': 
2237                try:
2238                    tmpdir = tempfile.mkdtemp(prefix="term-")
2239                except EnvironmentError:
2240                    raise service_error(service_error.internal, 
2241                            "Cannot create tmp dir")
2242                cert_file = self.make_temp_certfile(expcert, tmpdir)
2243                pw = None
2244            else: 
2245                cert_file = self.cert_file
2246                pw = self.cert_pwd
2247
2248            # Stop everyone.  NB, wait_for_all waits until a thread starts
2249            # and then completes, so we can't wait if nothing starts.  So,
2250            # no tbparams, no start.
2251            if len(term_params) > 0:
2252                tp = thread_pool(self.nthreads)
2253                for k, (uri, aid) in term_params.items():
2254                    # Create and start a thread to stop the segment
2255                    tp.wait_for_slot()
2256                    t  = pooled_thread(\
2257                            target=self.terminate_segment(log=dealloc_log,
2258                                testbed=uri,
2259                                cert_file=cert_file, 
2260                                cert_pwd=pw,
2261                                trusted_certs=self.trusted_certs,
2262                                caller=self.call_TerminateSegment),
2263                            args=(uri, aid), name=k,
2264                            pdata=tp, trace_file=self.trace_file)
2265                    t.start()
2266                # Wait for completions
2267                tp.wait_for_all_done()
2268
2269            # release the allocations (failed experiments have done this
2270            # already, and starting experiments may be in odd states, so we
2271            # ignore errors releasing those allocations
2272            try: 
2273                for k, (uri, aid)  in term_params.items():
2274                    self.release_access(None, aid, uri=uri,
2275                            cert_file=cert_file, cert_pwd=pw)
2276            except service_error, e:
2277                if status != 'failed' and not force:
2278                    raise e
2279
2280        # Clean up the tmpdir no matter what
2281        finally:
2282            if tmpdir: self.remove_dirs(tmpdir)
2283
2284
2285    def terminate_experiment(self, req, fid):
2286        """
2287        Swap this experiment out on the federants and delete the shared
2288        information
2289        """
2290        tbparams = { }
2291        req = req.get('TerminateRequestBody', None)
2292        if not req:
2293            raise service_error(service_error.req,
2294                    "Bad request format (no TerminateRequestBody)")
2295
2296        key = self.get_experiment_key(req, 'experiment')
2297        self.check_experiment_access(fid, key)
2298        exp = req.get('experiment', False)
2299        force = req.get('force', False)
2300
2301        dealloc_list = [ ]
2302
2303
2304        # Create a logger that logs to the dealloc_list as well as to the main
2305        # log file.
2306        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2307        h = logging.StreamHandler(self.list_log(dealloc_list))
2308        # XXX: there should be a global one of these rather than repeating the
2309        # code.
2310        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2311                    '%d %b %y %H:%M:%S'))
2312        dealloc_log.addHandler(h)
2313
2314        self.state_lock.acquire()
2315        fed_exp = self.state.get(key, None)
2316        self.state_lock.release()
2317        repo = None
2318
2319        if fed_exp:
2320            status = self.check_termination_status(fed_exp, force)
2321            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2322            self.deallocate_resources(term_params, expcert, status, force, 
2323                    dealloc_log)
2324
2325            # Remove the terminated experiment
2326            self.state_lock.acquire()
2327            for id in ids:
2328                if id in self.state: del self.state[id]
2329
2330            if self.state_filename: self.write_state()
2331            self.state_lock.release()
2332
2333            # Delete any synch points associated with this experiment.  All
2334            # synch points begin with the fedid of the experiment.
2335            fedid_keys = set(["fedid:%s" % f for f in ids \
2336                    if isinstance(f, fedid)])
2337            for k in self.synch_store.all_keys():
2338                try:
2339                    if len(k) > 45 and k[0:46] in fedid_keys:
2340                        self.synch_store.del_value(k)
2341                except synch_store.BadDeletionError:
2342                    pass
2343            self.write_store()
2344
2345            # Remove software and other cached stuff from the filesystem.
2346            if repo:
2347                self.remove_dirs("%s/%s" % (self.repodir, repo))
2348               
2349            return { 
2350                    'experiment': exp , 
2351                    'deallocationLog': string.join(dealloc_list, ''),
2352                    }
2353        else:
2354            raise service_error(service_error.req, "No saved state")
2355
2356
2357    def GetValue(self, req, fid):
2358        """
2359        Get a value from the synchronized store
2360        """
2361        req = req.get('GetValueRequestBody', None)
2362        if not req:
2363            raise service_error(service_error.req,
2364                    "Bad request format (no GetValueRequestBody)")
2365       
2366        name = req.get('name', None)
2367        wait = req.get('wait', False)
2368        rv = { 'name': name }
2369
2370        if name and self.auth.check_attribute(fid, name):
2371            self.log.debug("[GetValue] asking for %s " % name)
2372            try:
2373                v = self.synch_store.get_value(name, wait)
2374            except synch_store.RevokedKeyError:
2375                # No more synch on this key
2376                raise service_error(service_error.federant, 
2377                        "Synch key %s revoked" % name)
2378            if v is not None:
2379                rv['value'] = v
2380            self.log.debug("[GetValue] got %s from %s" % (v, name))
2381            return rv
2382        else:
2383            raise service_error(service_error.access, "Access Denied")
2384       
2385
2386    def SetValue(self, req, fid):
2387        """
2388        Set a value in the synchronized store
2389        """
2390        req = req.get('SetValueRequestBody', None)
2391        if not req:
2392            raise service_error(service_error.req,
2393                    "Bad request format (no SetValueRequestBody)")
2394       
2395        name = req.get('name', None)
2396        v = req.get('value', '')
2397
2398        if name and self.auth.check_attribute(fid, name):
2399            try:
2400                self.synch_store.set_value(name, v)
2401                self.write_store()
2402                self.log.debug("[SetValue] set %s to %s" % (name, v))
2403            except synch_store.CollisionError:
2404                # Translate into a service_error
2405                raise service_error(service_error.req,
2406                        "Value already set: %s" %name)
2407            except synch_store.RevokedKeyError:
2408                # No more synch on this key
2409                raise service_error(service_error.federant, 
2410                        "Synch key %s revoked" % name)
2411            return { 'name': name, 'value': v }
2412        else:
2413            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.