source: fedd/federation/experiment_control.py @ cf0ff4f

axis_examplecompt_changesinfo-ops
Last change on this file since cf0ff4f was cf0ff4f, checked in by Ted Faber <faber@…>, 13 years ago

End of a detangling pass.

There are still some functions that are too long, but overall this is a
cleaner version of the experiment controller that's somewhat easier to
read and maintain.

Closes #10

  • Property mode set to 100644
File size: 80.8 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205            self.get_access = self.legacy_get_access
206        elif self.auth_type == 'abac':
207            self.auth = abac_authorizer(load=self.auth_dir)
208        else:
209            raise service_error(service_error.internal, 
210                    "Unknown auth_type: %s" % self.auth_type)
211
212
213        if self.ssh_pubkey_file:
214            try:
215                f = open(self.ssh_pubkey_file, 'r')
216                self.ssh_pubkey = f.read()
217                f.close()
218            except EnvironmentError:
219                raise service_error(service_error.internal,
220                        "Cannot read sshpubkey")
221        else:
222            raise service_error(service_error.internal, 
223                    "No SSH public key file?")
224
225        if not self.ssh_privkey_file:
226            raise service_error(service_error.internal, 
227                    "No SSH public key file?")
228
229
230        if mapdb_file:
231            self.read_mapdb(mapdb_file)
232        else:
233            self.log.warn("[experiment_control] No testbed map, using defaults")
234            self.tbmap = { 
235                    'deter':'https://users.isi.deterlab.net:23235',
236                    'emulab':'https://users.isi.deterlab.net:23236',
237                    'ucb':'https://users.isi.deterlab.net:23237',
238                    }
239
240        if accessdb_file:
241                self.read_accessdb(accessdb_file)
242        else:
243            raise service_error(service_error.internal,
244                    "No accessdb specified in config")
245
246        # Grab saved state.  OK to do this w/o locking because it's read only
247        # and only one thread should be in existence that can see self.state at
248        # this point.
249        if self.state_filename:
250            self.read_state()
251
252        if self.store_filename:
253            self.read_store()
254        else:
255            self.log.warning("No saved synch store")
256            self.synch_store = synch_store
257
258        # Dispatch tables
259        self.soap_services = {\
260                'New': soap_handler('New', self.new_experiment),
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
266                'Terminate': soap_handler('Terminate', 
267                    self.terminate_experiment),
268                'GetValue': soap_handler('GetValue', self.GetValue),
269                'SetValue': soap_handler('SetValue', self.SetValue),
270        }
271
272        self.xmlrpc_services = {\
273                'New': xmlrpc_handler('New', self.new_experiment),
274                'Create': xmlrpc_handler('Create', self.create_experiment),
275                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
276                'Vis': xmlrpc_handler('Vis', self.get_vis),
277                'Info': xmlrpc_handler('Info', self.get_info),
278                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
279                'Terminate': xmlrpc_handler('Terminate',
280                    self.terminate_experiment),
281                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
282                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
283        }
284
285    # Call while holding self.state_lock
286    def write_state(self):
287        """
288        Write a new copy of experiment state after copying the existing state
289        to a backup.
290
291        State format is a simple pickling of the state dictionary.
292        """
293        if os.access(self.state_filename, os.W_OK):
294            copy_file(self.state_filename, \
295                    "%s.bak" % self.state_filename)
296        try:
297            f = open(self.state_filename, 'w')
298            pickle.dump(self.state, f)
299        except EnvironmentError, e:
300            self.log.error("Can't write file %s: %s" % \
301                    (self.state_filename, e))
302        except pickle.PicklingError, e:
303            self.log.error("Pickling problem: %s" % e)
304        except TypeError, e:
305            self.log.error("Pickling problem (TypeError): %s" % e)
306
307    @staticmethod
308    def get_alloc_ids(state):
309        """
310        Pull the fedids of the identifiers of each allocation from the
311        state.  Again, a dict dive that's best isolated.
312
313        Used by read_store and read state
314        """
315
316        return [ f['allocID']['fedid'] 
317                for f in state.get('federant',[]) \
318                    if f.has_key('allocID') and \
319                        f['allocID'].has_key('fedid')]
320
321    # Call while holding self.state_lock
322    def read_state(self):
323        """
324        Read a new copy of experiment state.  Old state is overwritten.
325
326        State format is a simple pickling of the state dictionary.
327        """
328       
329        def get_experiment_id(state):
330            """
331            Pull the fedid experimentID out of the saved state.  This is kind
332            of a gross walk through the dict.
333            """
334
335            if state.has_key('experimentID'):
336                for e in state['experimentID']:
337                    if e.has_key('fedid'):
338                        return e['fedid']
339                else:
340                    return None
341            else:
342                return None
343
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except EnvironmentError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355       
356        for s in self.state.values():
357            try:
358
359                eid = get_experiment_id(s)
360                if eid : 
361                    if self.auth_type == 'legacy':
362                        # XXX: legacy
363                        # Give the owner rights to the experiment
364                        self.auth.set_attribute(s['owner'], eid)
365                        # And holders of the eid as well
366                        self.auth.set_attribute(eid, eid)
367                        # allow overrides to control experiments as well
368                        for o in self.overrides:
369                            self.auth.set_attribute(o, eid)
370                        # Set permissions to allow reading of the software
371                        # repo, if any, as well.
372                        for a in self.get_alloc_ids(s):
373                            self.auth.set_attribute(a, 'repo/%s' % eid)
374                else:
375                    raise KeyError("No experiment id")
376            except KeyError, e:
377                self.log.warning("[read_state]: State ownership or identity " +\
378                        "misformatted in %s: %s" % (self.state_filename, e))
379
380
381    def read_accessdb(self, accessdb_file):
382        """
383        Read the mapping from fedids that can create experiments to their name
384        in the 3-level access namespace.  All will be asserted from this
385        testbed and can include the local username and porject that will be
386        asserted on their behalf by this fedd.  Each fedid is also added to the
387        authorization system with the "create" attribute.
388        """
389        self.accessdb = {}
390        # These are the regexps for parsing the db
391        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
392        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
394        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
395                "\s*->\s*(" + name_expr + ")\s*$")
396        lineno = 0
397
398        # Parse the mappings and store in self.authdb, a dict of
399        # fedid -> (proj, user)
400        try:
401            f = open(accessdb_file, "r")
402            for line in f:
403                lineno += 1
404                line = line.strip()
405                if len(line) == 0 or line.startswith('#'):
406                    continue
407                m = project_line.match(line)
408                if m:
409                    fid = fedid(hexstr=m.group(1))
410                    project, user = m.group(2,3)
411                    if not self.accessdb.has_key(fid):
412                        self.accessdb[fid] = []
413                    self.accessdb[fid].append((project, user))
414                    continue
415
416                m = user_line.match(line)
417                if m:
418                    fid = fedid(hexstr=m.group(1))
419                    project = None
420                    user = m.group(2)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425                self.log.warn("[experiment_control] Error parsing access " +\
426                        "db %s at line %d" %  (accessdb_file, lineno))
427        except EnvironmentError:
428            raise service_error(service_error.internal,
429                    ("Error opening/reading %s as experiment " +\
430                            "control accessdb") %  accessdb_file)
431        f.close()
432
433        # Initialize the authorization attributes
434        # XXX: legacy
435        if self.auth_type == 'legacy':
436            for fid in self.accessdb.keys():
437                self.auth.set_attribute(fid, 'create')
438                self.auth.set_attribute(fid, 'new')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except EnvironmentError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def read_store(self):
467        try:
468            self.synch_store = synch_store()
469            self.synch_store.load(self.store_filename)
470            self.log.debug("[read_store]: Read store from %s" % \
471                    self.store_filename)
472        except EnvironmentError, e:
473            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
474                    % (self.state_filename, e))
475            self.synch_store = synch_store()
476
477        # Set the initial permissions on data in the store.  XXX: This ad hoc
478        # authorization attribute initialization is getting out of hand.
479        # XXX: legacy
480        if self.auth_type == 'legacy':
481            for k in self.synch_store.all_keys():
482                try:
483                    if k.startswith('fedid:'):
484                        fid = fedid(hexstr=k[6:46])
485                        if self.state.has_key(fid):
486                            for a in self.get_alloc_ids(self.state[fid]):
487                                self.auth.set_attribute(a, k)
488                except ValueError, e:
489                    self.log.warn("Cannot deduce permissions for %s" % k)
490
491
492    def write_store(self):
493        """
494        Write a new copy of synch_store after writing current state
495        to a backup.  We use the internal synch_store pickle method to avoid
496        incinsistent data.
497
498        State format is a simple pickling of the store.
499        """
500        if os.access(self.store_filename, os.W_OK):
501            copy_file(self.store_filename, \
502                    "%s.bak" % self.store_filename)
503        try:
504            self.synch_store.save(self.store_filename)
505        except EnvironmentError, e:
506            self.log.error("Can't write file %s: %s" % \
507                    (self.store_filename, e))
508        except TypeError, e:
509            self.log.error("Pickling problem (TypeError): %s" % e)
510
511
512    def remove_dirs(self, dir):
513        """
514        Remove the directory tree and all files rooted at dir.  Log any errors,
515        but continue.
516        """
517        self.log.debug("[removedirs]: removing %s" % dir)
518        try:
519            for path, dirs, files in os.walk(dir, topdown=False):
520                for f in files:
521                    os.remove(os.path.join(path, f))
522                for d in dirs:
523                    os.rmdir(os.path.join(path, d))
524            os.rmdir(dir)
525        except EnvironmentError, e:
526            self.log.error("Error deleting directory tree in %s" % e);
527
528    @staticmethod
529    def make_temp_certfile(expcert, tmpdir):
530        """
531        make a protected copy of the access certificate so the experiment
532        controller can act as the experiment principal.  mkstemp is the most
533        secure way to do that. The directory should be created by
534        mkdtemp.  Return the filename.
535        """
536        if expcert and tmpdir:
537            try:
538                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
539                f = os.fdopen(certf, 'w')
540                print >> f, expcert
541                f.close()
542            except EnvironmentError, e:
543                raise service_error(service_error.internal, 
544                        "Cannot create temp cert file?")
545            return certfn
546        else:
547            return None
548
549       
550    def generate_ssh_keys(self, dest, type="rsa" ):
551        """
552        Generate a set of keys for the gateways to use to talk.
553
554        Keys are of type type and are stored in the required dest file.
555        """
556        valid_types = ("rsa", "dsa")
557        t = type.lower();
558        if t not in valid_types: raise ValueError
559        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
560
561        try:
562            trace = open("/dev/null", "w")
563        except EnvironmentError:
564            raise service_error(service_error.internal,
565                    "Cannot open /dev/null??");
566
567        # May raise CalledProcessError
568        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
569        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
570        if rv != 0:
571            raise service_error(service_error.internal, 
572                    "Cannot generate nonce ssh keys.  %s return code %d" \
573                            % (self.ssh_keygen, rv))
574
575    def gentopo(self, str):
576        """
577        Generate the topology data structure from the splitter's XML
578        representation of it.
579
580        The topology XML looks like:
581            <experiment>
582                <nodes>
583                    <node><vname></vname><ips>ip1:ip2</ips></node>
584                </nodes>
585                <lans>
586                    <lan>
587                        <vname></vname><vnode></vnode><ip></ip>
588                        <bandwidth></bandwidth><member>node:port</member>
589                    </lan>
590                </lans>
591        """
592        class topo_parse:
593            """
594            Parse the topology XML and create the dats structure.
595            """
596            def __init__(self):
597                # Typing of the subelements for data conversion
598                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
599                self.int_subelements = ( 'bandwidth',)
600                self.float_subelements = ( 'delay',)
601                # The final data structure
602                self.nodes = [ ]
603                self.lans =  [ ]
604                self.topo = { \
605                        'node': self.nodes,\
606                        'lan' : self.lans,\
607                    }
608                self.element = { }  # Current element being created
609                self.chars = ""     # Last text seen
610
611            def end_element(self, name):
612                # After each sub element the contents is added to the current
613                # element or to the appropriate list.
614                if name == 'node':
615                    self.nodes.append(self.element)
616                    self.element = { }
617                elif name == 'lan':
618                    self.lans.append(self.element)
619                    self.element = { }
620                elif name in self.str_subelements:
621                    self.element[name] = self.chars
622                    self.chars = ""
623                elif name in self.int_subelements:
624                    self.element[name] = int(self.chars)
625                    self.chars = ""
626                elif name in self.float_subelements:
627                    self.element[name] = float(self.chars)
628                    self.chars = ""
629
630            def found_chars(self, data):
631                self.chars += data.rstrip()
632
633
634        tp = topo_parse();
635        parser = xml.parsers.expat.ParserCreate()
636        parser.EndElementHandler = tp.end_element
637        parser.CharacterDataHandler = tp.found_chars
638
639        parser.Parse(str)
640
641        return tp.topo
642       
643
644    def genviz(self, topo):
645        """
646        Generate the visualization the virtual topology
647        """
648
649        neato = "/usr/local/bin/neato"
650        # These are used to parse neato output and to create the visualization
651        # file.
652        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
653        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
654                "%s</type></node>"
655
656        try:
657            # Node names
658            nodes = [ n['vname'] for n in topo['node'] ]
659            topo_lans = topo['lan']
660        except KeyError, e:
661            raise service_error(service_error.internal, "Bad topology: %s" %e)
662
663        lans = { }
664        links = { }
665
666        # Walk through the virtual topology, organizing the connections into
667        # 2-node connections (links) and more-than-2-node connections (lans).
668        # When a lan is created, it's added to the list of nodes (there's a
669        # node in the visualization for the lan).
670        for l in topo_lans:
671            if links.has_key(l['vname']):
672                if len(links[l['vname']]) < 2:
673                    links[l['vname']].append(l['vnode'])
674                else:
675                    nodes.append(l['vname'])
676                    lans[l['vname']] = links[l['vname']]
677                    del links[l['vname']]
678                    lans[l['vname']].append(l['vnode'])
679            elif lans.has_key(l['vname']):
680                lans[l['vname']].append(l['vnode'])
681            else:
682                links[l['vname']] = [ l['vnode'] ]
683
684
685        # Open up a temporary file for dot to turn into a visualization
686        try:
687            df, dotname = tempfile.mkstemp()
688            dotfile = os.fdopen(df, 'w')
689        except EnvironmentError:
690            raise service_error(service_error.internal,
691                    "Failed to open file in genviz")
692
693        try:
694            dnull = open('/dev/null', 'w')
695        except EnvironmentError:
696            service_error(service_error.internal,
697                    "Failed to open /dev/null in genviz")
698
699        # Generate a dot/neato input file from the links, nodes and lans
700        try:
701            print >>dotfile, "graph G {"
702            for n in nodes:
703                print >>dotfile, '\t"%s"' % n
704            for l in links.keys():
705                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
706            for l in lans.keys():
707                for n in lans[l]:
708                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
709            print >>dotfile, "}"
710            dotfile.close()
711        except TypeError:
712            raise service_error(service_error.internal,
713                    "Single endpoint link in vtopo")
714        except EnvironmentError:
715            raise service_error(service_error.internal, "Cannot write dot file")
716
717        # Use dot to create a visualization
718        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
719                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
720                close_fds=True)
721        dnull.close()
722
723        # Translate dot to vis format
724        vis_nodes = [ ]
725        vis = { 'node': vis_nodes }
726        for line in dot.stdout:
727            m = vis_re.match(line)
728            if m:
729                vn = m.group(1)
730                vis_node = {'name': vn, \
731                        'x': float(m.group(2)),\
732                        'y' : float(m.group(3)),\
733                    }
734                if vn in links.keys() or vn in lans.keys():
735                    vis_node['type'] = 'lan'
736                else:
737                    vis_node['type'] = 'node'
738                vis_nodes.append(vis_node)
739        rv = dot.wait()
740
741        os.remove(dotname)
742        if rv == 0 : return vis
743        else: return None
744
745
746    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
747            cert_pwd=None):
748        """
749        Release access to testbed through fedd
750        """
751
752        if not uri and tbmap:
753            uri = tbmap.get(tb, None)
754        if not uri:
755            raise service_error(service_error.server_config, 
756                    "Unknown testbed: %s" % tb)
757
758        if self.local_access.has_key(uri):
759            resp = self.local_access[uri].ReleaseAccess(\
760                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
761                    fedid(file=cert_file))
762            resp = { 'ReleaseAccessResponseBody': resp } 
763        else:
764            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
765                    cert_file, cert_pwd, self.trusted_certs)
766
767        # better error coding
768
769    def remote_ns2topdl(self, uri, desc):
770
771        req = {
772                'description' : { 'ns2description': desc },
773            }
774
775        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
776                self.trusted_certs)
777
778        if r.has_key('Ns2TopdlResponseBody'):
779            r = r['Ns2TopdlResponseBody']
780            ed = r.get('experimentdescription', None)
781            if ed.has_key('topdldescription'):
782                return topdl.Topology(**ed['topdldescription'])
783            else:
784                raise service_error(service_error.protocol, 
785                        "Bad splitter response (no output)")
786        else:
787            raise service_error(service_error.protocol, "Bad splitter response")
788
789    class start_segment:
790        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
791                cert_pwd=None, trusted_certs=None, caller=None,
792                log_collector=None):
793            self.log = log
794            self.debug = debug
795            self.cert_file = cert_file
796            self.cert_pwd = cert_pwd
797            self.trusted_certs = None
798            self.caller = caller
799            self.testbed = testbed
800            self.log_collector = log_collector
801            self.response = None
802            self.node = { }
803
804        def make_map(self, resp):
805            for e in resp.get('embedding', []):
806                if 'toponame' in e and 'physname' in e:
807                    self.node[e['toponame']] = e['physname'][0]
808
809        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
810            req = {
811                    'allocID': { 'fedid' : aid }, 
812                    'segmentdescription': { 
813                        'topdldescription': topo.to_dict(),
814                    },
815                }
816
817            if connInfo:
818                req['connection'] = connInfo
819
820            import_svcs = [ s for m in masters.values() \
821                    for s in m if self.testbed in s.importers]
822
823            if import_svcs or self.testbed in masters:
824                req['service'] = []
825
826            for s in import_svcs:
827                for r in s.reqs:
828                    sr = copy.deepcopy(r)
829                    sr['visibility'] = 'import';
830                    req['service'].append(sr)
831
832            for s in masters.get(self.testbed, []):
833                for r in s.reqs:
834                    sr = copy.deepcopy(r)
835                    sr['visibility'] = 'export';
836                    req['service'].append(sr)
837
838            if attrs:
839                req['fedAttr'] = attrs
840
841            try:
842                self.log.debug("Calling StartSegment at %s " % uri)
843                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
844                        self.trusted_certs)
845                if r.has_key('StartSegmentResponseBody'):
846                    lval = r['StartSegmentResponseBody'].get('allocationLog',
847                            None)
848                    if lval and self.log_collector:
849                        for line in  lval.splitlines(True):
850                            self.log_collector.write(line)
851                    self.make_map(r['StartSegmentResponseBody'])
852                    self.response = r
853                else:
854                    raise service_error(service_error.internal, 
855                            "Bad response!?: %s" %r)
856                return True
857            except service_error, e:
858                self.log.error("Start segment failed on %s: %s" % \
859                        (self.testbed, e))
860                return False
861
862
863
864    class terminate_segment:
865        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
866                cert_pwd=None, trusted_certs=None, caller=None):
867            self.log = log
868            self.debug = debug
869            self.cert_file = cert_file
870            self.cert_pwd = cert_pwd
871            self.trusted_certs = None
872            self.caller = caller
873            self.testbed = testbed
874
875        def __call__(self, uri, aid ):
876            req = {
877                    'allocID': aid , 
878                }
879            try:
880                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
881                        self.trusted_certs)
882                return True
883            except service_error, e:
884                self.log.error("Terminate segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888    def allocate_resources(self, allocated, masters, eid, expid, 
889            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
890            attrs=None, connInfo={}, tbmap=None, expcert=None):
891
892        started = { }           # Testbeds where a sub-experiment started
893                                # successfully
894
895        # XXX
896        fail_soft = False
897
898        if tbmap is None: tbmap = { }
899
900        log = alloc_log or self.log
901
902        tp = thread_pool(self.nthreads)
903        threads = [ ]
904        starters = [ ]
905
906        if expcert:
907            cert = expcert
908            pw = None
909        else:
910            cert = self.cert_file
911            pw = self.cert_pwd
912
913        for tb in allocated.keys():
914            # Create and start a thread to start the segment, and save it
915            # to get the return value later
916            tb_attrs = copy.copy(attrs)
917            tp.wait_for_slot()
918            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
919            base, suffix = split_testbed(tb)
920            if suffix:
921                tb_attrs.append({'attribute': 'experiment_name', 
922                    'value': "%s-%s" % (eid, suffix)})
923            else:
924                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
925            if not uri:
926                raise service_error(service_error.internal, 
927                        "Unknown testbed %s !?" % tb)
928
929            if tbparams[tb].has_key('allocID') and \
930                    tbparams[tb]['allocID'].has_key('fedid'):
931                aid = tbparams[tb]['allocID']['fedid']
932            else:
933                raise service_error(service_error.internal, 
934                        "No alloc id for testbed %s !?" % tb)
935
936            s = self.start_segment(log=log, debug=self.debug,
937                    testbed=tb, cert_file=cert,
938                    cert_pwd=pw, trusted_certs=self.trusted_certs,
939                    caller=self.call_StartSegment,
940                    log_collector=log_collector)
941            starters.append(s)
942            t  = pooled_thread(\
943                    target=s, name=tb,
944                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
945                    pdata=tp, trace_file=self.trace_file)
946            threads.append(t)
947            t.start()
948
949        # Wait until all finish (keep pinging the log, though)
950        mins = 0
951        revoked = False
952        while not tp.wait_for_all_done(60.0):
953            mins += 1
954            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
955                    % mins)
956            if not revoked and \
957                    len([ t.getName() for t in threads if t.rv == False]) > 0:
958                # a testbed has failed.  Revoke this experiment's
959                # synchronizarion values so that sub experiments will not
960                # deadlock waiting for synchronization that will never happen
961                self.log.info("A subexperiment has failed to swap in, " + \
962                        "revoking synch keys")
963                var_key = "fedid:%s" % expid
964                for k in self.synch_store.all_keys():
965                    if len(k) > 45 and k[0:46] == var_key:
966                        self.synch_store.revoke_key(k)
967                revoked = True
968
969        failed = [ t.getName() for t in threads if not t.rv ]
970        succeeded = [tb for tb in allocated.keys() if tb not in failed]
971
972        # If one failed clean up, unless fail_soft is set
973        if failed:
974            if not fail_soft:
975                tp.clear()
976                for tb in succeeded:
977                    # Create and start a thread to stop the segment
978                    tp.wait_for_slot()
979                    uri = tbparams[tb]['uri']
980                    t  = pooled_thread(\
981                            target=self.terminate_segment(log=log,
982                                testbed=tb,
983                                cert_file=cert, 
984                                cert_pwd=pw,
985                                trusted_certs=self.trusted_certs,
986                                caller=self.call_TerminateSegment),
987                            args=(uri, tbparams[tb]['federant']['allocID']),
988                            name=tb,
989                            pdata=tp, trace_file=self.trace_file)
990                    t.start()
991                # Wait until all finish (if any are being stopped)
992                if succeeded:
993                    tp.wait_for_all_done()
994
995                # release the allocations
996                for tb in tbparams.keys():
997                    try:
998                        self.release_access(tb, tbparams[tb]['allocID'], 
999                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1000                                cert_file=cert, cert_pwd=pw)
1001                    except service_error, e:
1002                        self.log.warn("Error releasing access: %s" % e.desc)
1003                # Remove the placeholder
1004                self.state_lock.acquire()
1005                self.state[eid]['experimentStatus'] = 'failed'
1006                if self.state_filename: self.write_state()
1007                self.state_lock.release()
1008                # Remove the repo dir
1009                self.remove_dirs("%s/%s" %(self.repodir, expid))
1010                # Walk up tmpdir, deleting as we go
1011                if self.cleanup:
1012                    self.remove_dirs(tmpdir)
1013                else:
1014                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1015
1016
1017                log.error("Swap in failed on %s" % ",".join(failed))
1018                return
1019        else:
1020            # Walk through the successes and gather the virtual to physical
1021            # mapping.
1022            embedding = [ ]
1023            for s in starters:
1024                for k, v in s.node.items():
1025                    embedding.append({
1026                        'toponame': k, 
1027                        'physname': [ v],
1028                        'testbed': s.testbed
1029                        })
1030            log.info("[start_segment]: Experiment %s active" % eid)
1031
1032
1033        # Walk up tmpdir, deleting as we go
1034        if self.cleanup:
1035            self.remove_dirs(tmpdir)
1036        else:
1037            log.debug("[start_experiment]: not removing %s" % tmpdir)
1038
1039        # Insert the experiment into our state and update the disk copy.
1040        self.state_lock.acquire()
1041        self.state[expid]['experimentStatus'] = 'active'
1042        self.state[eid] = self.state[expid]
1043        self.state[eid]['experimentdescription']['topdldescription'] = \
1044                top.to_dict()
1045        self.state[eid]['embedding'] = embedding
1046        if self.state_filename: self.write_state()
1047        self.state_lock.release()
1048        return
1049
1050
1051    def add_kit(self, e, kit):
1052        """
1053        Add a Software object created from the list of (install, location)
1054        tuples passed as kit  to the software attribute of an object e.  We
1055        do this enough to break out the code, but it's kind of a hack to
1056        avoid changing the old tuple rep.
1057        """
1058
1059        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1060
1061        if isinstance(e.software, list): e.software.extend(s)
1062        else: e.software = s
1063
1064
1065    def create_experiment_state(self, fid, req, expid, expcert,
1066            state='starting'):
1067        """
1068        Create the initial entry in the experiment's state.  The expid and
1069        expcert are the experiment's fedid and certifacte that represents that
1070        ID, which are installed in the experiment state.  If the request
1071        includes a suggested local name that is used if possible.  If the local
1072        name is already taken by an experiment owned by this user that has
1073        failed, it is overwritten.  Otherwise new letters are added until a
1074        valid localname is found.  The generated local name is returned.
1075        """
1076
1077        if req.has_key('experimentID') and \
1078                req['experimentID'].has_key('localname'):
1079            overwrite = False
1080            eid = req['experimentID']['localname']
1081            # If there's an old failed experiment here with the same local name
1082            # and accessible by this user, we'll overwrite it, otherwise we'll
1083            # fall through and do the collision avoidance.
1084            old_expid = self.get_experiment_fedid(eid)
1085            if old_expid and self.check_experiment_access(fid, old_expid):
1086                self.state_lock.acquire()
1087                status = self.state[eid].get('experimentStatus', None)
1088                if status and status == 'failed':
1089                    # remove the old access attribute
1090                    self.auth.unset_attribute(fid, old_expid)
1091                    self.auth.save()
1092                    overwrite = True
1093                    del self.state[eid]
1094                    del self.state[old_expid]
1095                self.state_lock.release()
1096            self.state_lock.acquire()
1097            while (self.state.has_key(eid) and not overwrite):
1098                eid += random.choice(string.ascii_letters)
1099            # Initial state
1100            self.state[eid] = {
1101                    'experimentID' : \
1102                            [ { 'localname' : eid }, {'fedid': expid } ],
1103                    'experimentStatus': state,
1104                    'experimentAccess': { 'X509' : expcert },
1105                    'owner': fid,
1106                    'log' : [],
1107                }
1108            self.state[expid] = self.state[eid]
1109            if self.state_filename: self.write_state()
1110            self.state_lock.release()
1111        else:
1112            eid = self.exp_stem
1113            for i in range(0,5):
1114                eid += random.choice(string.ascii_letters)
1115            self.state_lock.acquire()
1116            while (self.state.has_key(eid)):
1117                eid = self.exp_stem
1118                for i in range(0,5):
1119                    eid += random.choice(string.ascii_letters)
1120            # Initial state
1121            self.state[eid] = {
1122                    'experimentID' : \
1123                            [ { 'localname' : eid }, {'fedid': expid } ],
1124                    'experimentStatus': state,
1125                    'experimentAccess': { 'X509' : expcert },
1126                    'owner': fid,
1127                    'log' : [],
1128                }
1129            self.state[expid] = self.state[eid]
1130            if self.state_filename: self.write_state()
1131            self.state_lock.release()
1132
1133        return eid
1134
1135
1136    def allocate_ips_to_topo(self, top):
1137        """
1138        Add an ip4_address attribute to all the hosts in the topology, based on
1139        the shared substrates on which they sit.  An /etc/hosts file is also
1140        created and returned as a list of hostfiles entries.  We also return
1141        the allocator, because we may need to allocate IPs to portals
1142        (specifically DRAGON portals).
1143        """
1144        subs = sorted(top.substrates, 
1145                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1146                reverse=True)
1147        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1148        ifs = { }
1149        hosts = [ ]
1150
1151        for idx, s in enumerate(subs):
1152            net_size = len(s.interfaces)+2
1153
1154            a = ips.allocate(net_size)
1155            if a :
1156                base, num = a
1157                if num < net_size: 
1158                    raise service_error(service_error.internal,
1159                            "Allocator returned wrong number of IPs??")
1160            else:
1161                raise service_error(service_error.req, 
1162                        "Cannot allocate IP addresses")
1163            mask = ips.min_alloc
1164            while mask < net_size:
1165                mask *= 2
1166
1167            netmask = ((2**32-1) ^ (mask-1))
1168
1169            base += 1
1170            for i in s.interfaces:
1171                i.attribute.append(
1172                        topdl.Attribute('ip4_address', 
1173                            "%s" % ip_addr(base)))
1174                i.attribute.append(
1175                        topdl.Attribute('ip4_netmask', 
1176                            "%s" % ip_addr(int(netmask))))
1177
1178                hname = i.element.name
1179                if ifs.has_key(hname):
1180                    hosts.append("%s\t%s-%s %s-%d" % \
1181                            (ip_addr(base), hname, s.name, hname,
1182                                ifs[hname]))
1183                else:
1184                    ifs[hname] = 0
1185                    hosts.append("%s\t%s-%s %s-%d %s" % \
1186                            (ip_addr(base), hname, s.name, hname,
1187                                ifs[hname], hname))
1188
1189                ifs[hname] += 1
1190                base += 1
1191        return hosts, ips
1192
1193    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1194            tbparam, masters, tbmap, expid=None, expcert=None):
1195        for tb in testbeds:
1196            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1197                    expcert)
1198            allocated[tb] = 1
1199
1200    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1201            expcert=None):
1202        """
1203        Get access to testbed through fedd and set the parameters for that tb
1204        """
1205        def get_export_project(svcs):
1206            """
1207            Look through for the list of federated_service for this testbed
1208            objects for a project_export service, and extract the project
1209            parameter.
1210            """
1211
1212            pe = [s for s in svcs if s.name=='project_export']
1213            if len(pe) == 1:
1214                return pe[0].params.get('project', None)
1215            elif len(pe) == 0:
1216                return None
1217            else:
1218                raise service_error(service_error.req,
1219                        "More than one project export is not supported")
1220
1221        def add_services(svcs, type, slist):
1222            """
1223            Add the given services to slist.  type is import or export.
1224            """
1225            for i, s in enumerate(svcs):
1226                idx = '%s%d' % (type, i)
1227                sr = {'id': idx, 'name': s.name, 'visibility': type }
1228                if s.params:
1229                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1230                            for k, v in s.params.items()]
1231                slist.append(sr)
1232
1233        uri = tbmap.get(testbed_base(tb), None)
1234        if not uri:
1235            raise service_error(service_error.server_config, 
1236                    "Unknown testbed: %s" % tb)
1237
1238        export_svcs = masters.get(tb,[])
1239        import_svcs = [ s for m in masters.values() \
1240                for s in m \
1241                    if tb in s.importers ]
1242
1243        export_project = get_export_project(export_svcs)
1244        # Compose the credential list so that IDs come before attributes
1245        creds = set()
1246        keys = set()
1247        certs = self.auth.get_creds_for_principal(fid)
1248        if expid:
1249            certs.update(self.auth.get_creds_for_principal(expid))
1250        for c in certs:
1251            keys.add(c.issuer_cert())
1252            creds.add(c.attribute_cert())
1253        creds = list(keys) + list(creds)
1254
1255        if expcert: cert, pw = expcert, None
1256        else: cert, pw = self.cert_file, self.cert_pw
1257
1258        # Request credentials
1259        req = {
1260                'abac_credential': creds,
1261            }
1262        # Make the service request from the services we're importing and
1263        # exporting.  Keep track of the export request ids so we can
1264        # collect the resulting info from the access response.
1265        e_keys = { }
1266        if import_svcs or export_svcs:
1267            slist = []
1268            add_services(import_svcs, 'import', slist)
1269            add_services(export_svcs, 'export', slist)
1270            req['service'] = slist
1271
1272        if self.local_access.has_key(uri):
1273            # Local access call
1274            req = { 'RequestAccessRequestBody' : req }
1275            r = self.local_access[uri].RequestAccess(req, 
1276                    fedid(file=self.cert_file))
1277            r = { 'RequestAccessResponseBody' : r }
1278        else:
1279            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1280
1281        if r.has_key('RequestAccessResponseBody'):
1282            # Through to here we have a valid response, not a fault.
1283            # Access denied is a fault, so something better or worse than
1284            # access denied has happened.
1285            r = r['RequestAccessResponseBody']
1286            self.log.debug("[get_access] Access granted")
1287        else:
1288            raise service_error(service_error.protocol,
1289                        "Bad proxy response")
1290       
1291        tbparam[tb] = { 
1292                "allocID" : r['allocID'],
1293                "uri": uri,
1294                }
1295
1296        # Collect the responses corresponding to the services this testbed
1297        # exports.  These will be the service requests that we will include in
1298        # the start segment requests (with appropriate visibility values) to
1299        # import and export the segments.
1300        for s in r.get('service', []):
1301            id = s.get('id', None)
1302            if id and id in e_keys:
1303                e_keys[id].reqs.append(s)
1304
1305        # Add attributes to parameter space.  We don't allow attributes to
1306        # overlay any parameters already installed.
1307        for a in r.get('fedAttr', []):
1308            try:
1309                if a['attribute'] and \
1310                        isinstance(a['attribute'], basestring)\
1311                        and not tbparam[tb].has_key(a['attribute'].lower()):
1312                    tbparam[tb][a['attribute'].lower()] = a['value']
1313            except KeyError:
1314                self.log.error("Bad attribute in response: %s" % a)
1315
1316
1317    def split_topology(self, top, topo, testbeds):
1318        """
1319        Create the sub-topologies that are needed for experiment instantiation.
1320        """
1321        for tb in testbeds:
1322            topo[tb] = top.clone()
1323            # copy in for loop allows deletions from the original
1324            for e in [ e for e in topo[tb].elements]:
1325                etb = e.get_attribute('testbed')
1326                # NB: elements without a testbed attribute won't appear in any
1327                # sub topologies. 
1328                if not etb or etb != tb:
1329                    for i in e.interface:
1330                        for s in i.subs:
1331                            try:
1332                                s.interfaces.remove(i)
1333                            except ValueError:
1334                                raise service_error(service_error.internal,
1335                                        "Can't remove interface??")
1336                    topo[tb].elements.remove(e)
1337            topo[tb].make_indices()
1338
1339    def wrangle_software(self, expid, top, topo, tbparams):
1340        """
1341        Copy software out to the repository directory, allocate permissions and
1342        rewrite the segment topologies to look for the software in local
1343        places.
1344        """
1345
1346        # Copy the rpms and tarfiles to a distribution directory from
1347        # which the federants can retrieve them
1348        linkpath = "%s/software" %  expid
1349        softdir ="%s/%s" % ( self.repodir, linkpath)
1350        softmap = { }
1351        # These are in a list of tuples format (each kit).  This comprehension
1352        # unwraps them into a single list of tuples that initilaizes the set of
1353        # tuples.
1354        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1355                for p, t in l ])
1356        pkgs.update([x.location for e in top.elements \
1357                for x in e.software])
1358        try:
1359            os.makedirs(softdir)
1360        except EnvironmentError, e:
1361            raise service_error(
1362                    "Cannot create software directory: %s" % e)
1363        # The actual copying.  Everything's converted into a url for copying.
1364        for pkg in pkgs:
1365            loc = pkg
1366
1367            scheme, host, path = urlparse(loc)[0:3]
1368            dest = os.path.basename(path)
1369            if not scheme:
1370                if not loc.startswith('/'):
1371                    loc = "/%s" % loc
1372                loc = "file://%s" %loc
1373            try:
1374                u = urlopen(loc)
1375            except Exception, e:
1376                raise service_error(service_error.req, 
1377                        "Cannot open %s: %s" % (loc, e))
1378            try:
1379                f = open("%s/%s" % (softdir, dest) , "w")
1380                self.log.debug("Writing %s/%s" % (softdir,dest) )
1381                data = u.read(4096)
1382                while data:
1383                    f.write(data)
1384                    data = u.read(4096)
1385                f.close()
1386                u.close()
1387            except Exception, e:
1388                raise service_error(service_error.internal,
1389                        "Could not copy %s: %s" % (loc, e))
1390            path = re.sub("/tmp", "", linkpath)
1391            # XXX
1392            softmap[pkg] = \
1393                    "%s/%s/%s" %\
1394                    ( self.repo_url, path, dest)
1395
1396            # Allow the individual segments to access the software.
1397            for tb in tbparams.keys():
1398                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1399                        "/%s/%s" % ( path, dest))
1400            self.auth.save()
1401
1402        # Convert the software locations in the segments into the local
1403        # copies on this host
1404        for soft in [ s for tb in topo.values() \
1405                for e in tb.elements \
1406                    if getattr(e, 'software', False) \
1407                        for s in e.software ]:
1408            if softmap.has_key(soft.location):
1409                soft.location = softmap[soft.location]
1410
1411
1412    def new_experiment(self, req, fid):
1413        """
1414        The external interface to empty initial experiment creation called from
1415        the dispatcher.
1416
1417        Creates a working directory, splits the incoming description using the
1418        splitter script and parses out the avrious subsections using the
1419        lcasses above.  Once each sub-experiment is created, use pooled threads
1420        to instantiate them and start it all up.
1421        """
1422        req = req.get('NewRequestBody', None)
1423        if not req:
1424            raise service_error(service_error.req,
1425                    "Bad request format (no NewRequestBody)")
1426
1427        if self.auth.import_credentials(data_list=req.get('credential', [])):
1428            self.auth.save()
1429
1430        if not self.auth.check_attribute(fid, 'new'):
1431            raise service_error(service_error.access, "New access denied")
1432
1433        try:
1434            tmpdir = tempfile.mkdtemp(prefix="split-")
1435        except EnvironmentError:
1436            raise service_error(service_error.internal, "Cannot create tmp dir")
1437
1438        try:
1439            access_user = self.accessdb[fid]
1440        except KeyError:
1441            raise service_error(service_error.internal,
1442                    "Access map and authorizer out of sync in " + \
1443                            "new_experiment for fedid %s"  % fid)
1444
1445        # Generate an ID for the experiment (slice) and a certificate that the
1446        # allocator can use to prove they own it.  We'll ship it back through
1447        # the encrypted connection.  If the requester supplied one, use it.
1448        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1449            expcert = req['experimentAccess']['X509']
1450            expid = fedid(certstr=expcert)
1451            self.state_lock.acquire()
1452            if expid in self.state:
1453                self.state_lock.release()
1454                raise service_error(service_error.req, 
1455                        'fedid %s identifies an existing experiment' % expid)
1456            self.state_lock.release()
1457        else:
1458            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1459
1460        #now we're done with the tmpdir, and it should be empty
1461        if self.cleanup:
1462            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1463            os.rmdir(tmpdir)
1464        else:
1465            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1466
1467        eid = self.create_experiment_state(fid, req, expid, expcert, 
1468                state='empty')
1469
1470        # Let users touch the state
1471        self.auth.set_attribute(fid, expid)
1472        self.auth.set_attribute(expid, expid)
1473        # Override fedids can manipulate state as well
1474        for o in self.overrides:
1475            self.auth.set_attribute(o, expid)
1476        self.auth.save()
1477
1478        rv = {
1479                'experimentID': [
1480                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1481                ],
1482                'experimentStatus': 'empty',
1483                'experimentAccess': { 'X509' : expcert }
1484            }
1485
1486        return rv
1487
1488    # create_experiment sub-functions
1489
1490    @staticmethod
1491    def get_experiment_key(req, field='experimentID'):
1492        """
1493        Parse the experiment identifiers out of the request (the request body
1494        tag has been removed).  Specifically this pulls either the fedid or the
1495        localname out of the experimentID field.  A fedid is preferred.  If
1496        neither is present or the request does not contain the fields,
1497        service_errors are raised.
1498        """
1499        # Get the experiment access
1500        exp = req.get(field, None)
1501        if exp:
1502            if exp.has_key('fedid'):
1503                key = exp['fedid']
1504            elif exp.has_key('localname'):
1505                key = exp['localname']
1506            else:
1507                raise service_error(service_error.req, "Unknown lookup type")
1508        else:
1509            raise service_error(service_error.req, "No request?")
1510
1511        return key
1512
1513    def get_experiment_ids_and_start(self, key, tmpdir):
1514        """
1515        Get the experiment name, id and access certificate from the state, and
1516        set the experiment state to 'starting'.  returns a triple (fedid,
1517        localname, access_cert_file). The access_cert_file is a copy of the
1518        contents of the access certificate, created in the tempdir with
1519        restricted permissions.  If things are confused, raise an exception.
1520        """
1521
1522        expid = eid = None
1523        self.state_lock.acquire()
1524        if self.state.has_key(key):
1525            self.state[key]['experimentStatus'] = "starting"
1526            for e in self.state[key].get('experimentID',[]):
1527                if not expid and e.has_key('fedid'):
1528                    expid = e['fedid']
1529                elif not eid and e.has_key('localname'):
1530                    eid = e['localname']
1531            if 'experimentAccess' in self.state[key] and \
1532                    'X509' in self.state[key]['experimentAccess']:
1533                expcert = self.state[key]['experimentAccess']['X509']
1534            else:
1535                expcert = None
1536        self.state_lock.release()
1537
1538        # make a protected copy of the access certificate so the experiment
1539        # controller can act as the experiment principal.
1540        if expcert:
1541            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1542            if not expcert_file:
1543                raise service_error(service_error.internal, 
1544                        "Cannot create temp cert file?")
1545        else:
1546            expcert_file = None
1547
1548        return (eid, expid, expcert_file)
1549
1550    def get_topology(self, req, tmpdir):
1551        """
1552        Get the ns2 content and put it into a file for parsing.  Call the local
1553        or remote parser and return the topdl.Topology.  Errors result in
1554        exceptions.  req is the request and tmpdir is a work directory.
1555        """
1556
1557        # The tcl parser needs to read a file so put the content into that file
1558        descr=req.get('experimentdescription', None)
1559        if descr:
1560            file_content=descr.get('ns2description', None)
1561        else:
1562            raise service_error(service_error.req, "No experiment description")
1563
1564
1565        if self.splitter_url:
1566            self.log.debug("Calling remote topdl translator at %s" % \
1567                    self.splitter_url)
1568            top = self.remote_ns2topdl(self.splitter_url, file_content)
1569        else:
1570            tclfile = os.path.join(tmpdir, "experiment.tcl")
1571            if file_content:
1572                try:
1573                    f = open(tclfile, 'w')
1574                    f.write(file_content)
1575                    f.close()
1576                except EnvironmentError:
1577                    raise service_error(service_error.internal,
1578                            "Cannot write temp experiment description")
1579            else:
1580                raise service_error(service_error.req, 
1581                        "Only ns2descriptions supported")
1582            pid = "dummy"
1583            gid = "dummy"
1584            eid = "dummy"
1585
1586            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1587                str(self.muxmax), '-m', 'dummy']
1588
1589            tclcmd.extend([pid, gid, eid, tclfile])
1590
1591            self.log.debug("running local splitter %s", " ".join(tclcmd))
1592            # This is just fantastic.  As a side effect the parser copies
1593            # tb_compat.tcl into the current directory, so that directory
1594            # must be writable by the fedd user.  Doing this in the
1595            # temporary subdir ensures this is the case.
1596            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1597                    cwd=tmpdir)
1598            split_data = tclparser.stdout
1599
1600            top = topdl.topology_from_xml(file=split_data, top="experiment")
1601            os.remove(tclfile)
1602
1603        return top
1604
1605    def get_testbed_services(self, req):
1606        """
1607        Parse the services section of the request into into two dicts mapping
1608        testbed to lists of federated_service objects.  The first lists all
1609        exporters of services, and the second all exporters of services that
1610        need control portals int the experiment.
1611        """
1612        masters = { }
1613        pmasters = { }
1614        for s in req.get('service', []):
1615            # If this is a service request with the importall field
1616            # set, fill it out.
1617
1618            if s.get('importall', False):
1619                s['import'] = [ tb for tb in testbeds \
1620                        if tb not in s.get('export',[])]
1621                del s['importall']
1622
1623            # Add the service to masters
1624            for tb in s.get('export', []):
1625                if s.get('name', None):
1626
1627                    params = { }
1628                    for a in s.get('fedAttr', []):
1629                        params[a.get('attribute', '')] = a.get('value','')
1630
1631                    fser = federated_service(name=s['name'],
1632                            exporter=tb, importers=s.get('import',[]),
1633                            params=params)
1634                    if fser.name == 'hide_hosts' \
1635                            and 'hosts' not in fser.params:
1636                        fser.params['hosts'] = \
1637                                ",".join(tb_hosts.get(fser.exporter, []))
1638                    if tb in masters: masters[tb].append(fser)
1639                    else: masters[tb] = [fser]
1640
1641                    if fser.portal:
1642                        if tb not in pmasters: pmasters[tb] = [ fser ]
1643                        else: pmasters[tb].append(fser)
1644                else:
1645                    self.log.error('Testbed service does not have name " + \
1646                            "and importers')
1647        return masters, pmasters
1648
1649    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1650        """
1651        Create the ssh keys necessary for interconnecting the potral nodes and
1652        the global hosts file for letting each segment know about the IP
1653        addresses in play.  Save these into the repo.  Add attributes to the
1654        autorizer allowing access controllers to download them and return a set
1655        of attributes that inform the segments where to find this stuff.  Mau
1656        raise service_errors in if there are problems.
1657        """
1658        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1659        gw_secretkey_base = "fed.%s" % self.ssh_type
1660        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1661        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1662
1663        try:
1664            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1665        except ValueError:
1666            raise service_error(service_error.server_config, 
1667                    "Bad key type (%s)" % self.ssh_type)
1668
1669
1670        # Copy configuration files into the remote file store
1671        # The config urlpath
1672        configpath = "/%s/config" % expid
1673        # The config file system location
1674        configdir ="%s%s" % ( self.repodir, configpath)
1675        try:
1676            os.makedirs(configdir)
1677        except EnvironmentError, e:
1678            raise service_error(service_error.internal,
1679                    "Cannot create config directory: %s" % e)
1680        try:
1681            f = open("%s/hosts" % configdir, "w")
1682            print >> f, string.join(hosts, '\n')
1683            f.close()
1684        except EnvironmentError, e:
1685            raise service_error(service_error.internal, 
1686                    "Cannot write hosts file: %s" % e)
1687        try:
1688            copy_file("%s" % gw_pubkey, "%s/%s" % \
1689                    (configdir, gw_pubkey_base))
1690            copy_file("%s" % gw_secretkey, "%s/%s" % \
1691                    (configdir, gw_secretkey_base))
1692        except EnvironmentError, e:
1693            raise service_error(service_error.internal, 
1694                    "Cannot copy keyfiles: %s" % e)
1695
1696        # Allow the individual testbeds to access the configuration files.
1697        for tb in tbparams.keys():
1698            asignee = tbparams[tb]['allocID']['fedid']
1699            for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1700                self.auth.set_attribute(asignee, "%s/%s" % \
1701                        (configpath, f))
1702            self.auth.save()
1703
1704        attrs = [ 
1705                {
1706                    'attribute': 'ssh_pubkey', 
1707                    'value': '%s/%s/config/%s' % \
1708                            (self.repo_url, expid, gw_pubkey_base)
1709                },
1710                {
1711                    'attribute': 'ssh_secretkey', 
1712                    'value': '%s/%s/config/%s' % \
1713                            (self.repo_url, expid, gw_secretkey_base)
1714                },
1715                {
1716                    'attribute': 'hosts', 
1717                    'value': '%s/%s/config/hosts' % \
1718                            (self.repo_url, expid)
1719                },
1720            ]
1721        return attrs
1722
1723
1724    def get_vtopo(self, req, fid):
1725        """
1726        Return the stored virtual topology for this experiment
1727        """
1728        rv = None
1729        state = None
1730
1731        req = req.get('VtopoRequestBody', None)
1732        if not req:
1733            raise service_error(service_error.req,
1734                    "Bad request format (no VtopoRequestBody)")
1735        exp = req.get('experiment', None)
1736        if exp:
1737            if exp.has_key('fedid'):
1738                key = exp['fedid']
1739                keytype = "fedid"
1740            elif exp.has_key('localname'):
1741                key = exp['localname']
1742                keytype = "localname"
1743            else:
1744                raise service_error(service_error.req, "Unknown lookup type")
1745        else:
1746            raise service_error(service_error.req, "No request?")
1747
1748        self.check_experiment_access(fid, key)
1749
1750        self.state_lock.acquire()
1751        if self.state.has_key(key):
1752            if self.state[key].has_key('vtopo'):
1753                rv = { 'experiment' : {keytype: key },\
1754                        'vtopo': self.state[key]['vtopo'],\
1755                    }
1756            else:
1757                state = self.state[key]['experimentStatus']
1758        self.state_lock.release()
1759
1760        if rv: return rv
1761        else: 
1762            if state:
1763                raise service_error(service_error.partial, 
1764                        "Not ready: %s" % state)
1765            else:
1766                raise service_error(service_error.req, "No such experiment")
1767
1768    def get_vis(self, req, fid):
1769        """
1770        Return the stored visualization for this experiment
1771        """
1772        rv = None
1773        state = None
1774
1775        req = req.get('VisRequestBody', None)
1776        if not req:
1777            raise service_error(service_error.req,
1778                    "Bad request format (no VisRequestBody)")
1779        exp = req.get('experiment', None)
1780        if exp:
1781            if exp.has_key('fedid'):
1782                key = exp['fedid']
1783                keytype = "fedid"
1784            elif exp.has_key('localname'):
1785                key = exp['localname']
1786                keytype = "localname"
1787            else:
1788                raise service_error(service_error.req, "Unknown lookup type")
1789        else:
1790            raise service_error(service_error.req, "No request?")
1791
1792        self.check_experiment_access(fid, key)
1793
1794        self.state_lock.acquire()
1795        if self.state.has_key(key):
1796            if self.state[key].has_key('vis'):
1797                rv =  { 'experiment' : {keytype: key },\
1798                        'vis': self.state[key]['vis'],\
1799                        }
1800            else:
1801                state = self.state[key]['experimentStatus']
1802        self.state_lock.release()
1803
1804        if rv: return rv
1805        else:
1806            if state:
1807                raise service_error(service_error.partial, 
1808                        "Not ready: %s" % state)
1809            else:
1810                raise service_error(service_error.req, "No such experiment")
1811
1812   
1813    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1814            top):
1815        """
1816        Store the various data that have changed in the experiment state
1817        between when it was started and the beginning of resource allocation.
1818        This is basically the information about each local allocation.  This
1819        fills in the values of the placeholder allocation in the state.
1820        """
1821        # save federant information
1822        for k in allocated.keys():
1823            tbparams[k]['federant'] = {
1824                    'name': [ { 'localname' : eid} ],
1825                    'allocID' : tbparams[k]['allocID'],
1826                    'uri': tbparams[k]['uri'],
1827                }
1828
1829        self.state_lock.acquire()
1830        self.state[eid]['vtopo'] = vtopo
1831        self.state[eid]['vis'] = vis
1832        self.state[eid]['experimentdescription'] = \
1833                { 'topdldescription': top.to_dict() }
1834        self.state[eid]['federant'] = \
1835                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1836                    if tbparams[tb].has_key('federant') ]
1837        if self.state_filename: 
1838            self.write_state()
1839        self.state_lock.release()
1840
1841    def clear_placeholder(self, eid, expid, tmpdir):
1842        """
1843        Clear the placeholder and remove any allocated temporary dir.
1844        """
1845
1846        self.state_lock.acquire()
1847        del self.state[eid]
1848        del self.state[expid]
1849        if self.state_filename: self.write_state()
1850        self.state_lock.release()
1851        if tmpdir and self.cleanup:
1852            self.remove_dirs(tmpdir)
1853
1854    # end of create_experiment sub-functions
1855
1856    def create_experiment(self, req, fid):
1857        """
1858        The external interface to experiment creation called from the
1859        dispatcher.
1860
1861        Creates a working directory, splits the incoming description using the
1862        splitter script and parses out the various subsections using the
1863        classes above.  Once each sub-experiment is created, use pooled threads
1864        to instantiate them and start it all up.
1865        """
1866
1867        req = req.get('CreateRequestBody', None)
1868        if req:
1869            key = self.get_experiment_key(req)
1870        else:
1871            raise service_error(service_error.req,
1872                    "Bad request format (no CreateRequestBody)")
1873
1874        # Import information from the requester
1875        if self.auth.import_credentials(data_list=req.get('credential', [])):
1876            self.auth.save()
1877
1878        # Make sure that the caller can talk to us
1879        self.check_experiment_access(fid, key)
1880
1881        # Install the testbed map entries supplied with the request into a copy
1882        # of the testbed map.
1883        tbmap = dict(self.tbmap)
1884        for m in req.get('testbedmap', []):
1885            if 'testbed' in m and 'uri' in m:
1886                tbmap[m['testbed']] = m['uri']
1887
1888        # a place to work
1889        try:
1890            tmpdir = tempfile.mkdtemp(prefix="split-")
1891            os.mkdir(tmpdir+"/keys")
1892        except EnvironmentError:
1893            raise service_error(service_error.internal, "Cannot create tmp dir")
1894
1895        tbparams = { }
1896
1897        eid, expid, expcert_file = \
1898                self.get_experiment_ids_and_start(key, tmpdir)
1899
1900        # This catches exceptions to clear the placeholder if necessary
1901        try: 
1902            if not (eid and expid):
1903                raise service_error(service_error.internal, 
1904                        "Cannot find local experiment info!?")
1905
1906            top = self.get_topology(req, tmpdir)
1907            # Assign the IPs
1908            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1909            # Find the testbeds to look up
1910            tb_hosts = { }
1911            testbeds = [ ]
1912            for e in top.elements:
1913                if isinstance(e, topdl.Computer):
1914                    tb = e.get_attribute('testbed') or 'default'
1915                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1916                    else: 
1917                        tb_hosts[tb] = [ e.name ]
1918                        testbeds.append(tb)
1919
1920            masters, pmasters = self.get_testbed_services(req)
1921            allocated = { }         # Testbeds we can access
1922            topo ={ }               # Sub topologies
1923            connInfo = { }          # Connection information
1924
1925            self.get_access_to_testbeds(testbeds, fid, allocated, 
1926                    tbparams, masters, tbmap, expid, expcert_file)
1927
1928            self.split_topology(top, topo, testbeds)
1929
1930            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
1931
1932            part = experiment_partition(self.auth, self.store_url, tbmap,
1933                    self.muxmax, self.direct_transit)
1934            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1935                    connInfo, expid)
1936            # Now get access to the dynamic testbeds (those added above)
1937            for tb in [ t for t in topo if t not in allocated]:
1938                self.get_access(tb, tbparams, fid, masters, tbmap, 
1939                        expid, expcert_file)
1940                allocated[tb] = 1
1941                store_keys = topo[tb].get_attribute('store_keys')
1942                # Give the testbed access to keys it exports or imports
1943                if store_keys:
1944                    for sk in store_keys.split(" "):
1945                        self.auth.set_attribute(\
1946                                tbparams[tb]['allocID']['fedid'], sk)
1947            self.auth.save()
1948
1949            # transit and disconnected testbeds may not have a connInfo entry.
1950            # Fill in the blanks.
1951            for t in allocated.keys():
1952                if not connInfo.has_key(t):
1953                    connInfo[t] = { }
1954
1955            self.wrangle_software(expid, top, topo, tbparams)
1956
1957            vtopo = topdl.topology_to_vtopo(top)
1958            vis = self.genviz(vtopo)
1959            self.save_federant_information(allocated, tbparams, eid, vtopo, 
1960                    vis, top)
1961        except service_error, e:
1962            # If something goes wrong in the parse (usually an access error)
1963            # clear the placeholder state.  From here on out the code delays
1964            # exceptions.  Failing at this point returns a fault to the remote
1965            # caller.
1966            self.clear_placeholder(eid, expid, tmpdir)
1967            raise e
1968
1969        # Start the background swapper and return the starting state.  From
1970        # here on out, the state will stick around a while.
1971
1972        # Let users touch the state
1973        self.auth.set_attribute(fid, expid)
1974        self.auth.set_attribute(expid, expid)
1975        # Override fedids can manipulate state as well
1976        for o in self.overrides:
1977            self.auth.set_attribute(o, expid)
1978        self.auth.save()
1979
1980        # Create a logger that logs to the experiment's state object as well as
1981        # to the main log file.
1982        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1983        alloc_collector = self.list_log(self.state[eid]['log'])
1984        h = logging.StreamHandler(alloc_collector)
1985        # XXX: there should be a global one of these rather than repeating the
1986        # code.
1987        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1988                    '%d %b %y %H:%M:%S'))
1989        alloc_log.addHandler(h)
1990
1991        # Start a thread to do the resource allocation
1992        t  = Thread(target=self.allocate_resources,
1993                args=(allocated, masters, eid, expid, tbparams, 
1994                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1995                    connInfo, tbmap, expcert_file),
1996                name=eid)
1997        t.start()
1998
1999        rv = {
2000                'experimentID': [
2001                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2002                ],
2003                'experimentStatus': 'starting',
2004            }
2005
2006        return rv
2007   
2008    def get_experiment_fedid(self, key):
2009        """
2010        find the fedid associated with the localname key in the state database.
2011        """
2012
2013        rv = None
2014        self.state_lock.acquire()
2015        if self.state.has_key(key):
2016            if isinstance(self.state[key], dict):
2017                try:
2018                    kl = [ f['fedid'] for f in \
2019                            self.state[key]['experimentID']\
2020                                if f.has_key('fedid') ]
2021                except KeyError:
2022                    self.state_lock.release()
2023                    raise service_error(service_error.internal, 
2024                            "No fedid for experiment %s when getting "+\
2025                                    "fedid(!?)" % key)
2026                if len(kl) == 1:
2027                    rv = kl[0]
2028                else:
2029                    self.state_lock.release()
2030                    raise service_error(service_error.internal, 
2031                            "multiple fedids for experiment %s when " +\
2032                                    "getting fedid(!?)" % key)
2033            else:
2034                self.state_lock.release()
2035                raise service_error(service_error.internal, 
2036                        "Unexpected state for %s" % key)
2037        self.state_lock.release()
2038        return rv
2039
2040    def check_experiment_access(self, fid, key):
2041        """
2042        Confirm that the fid has access to the experiment.  Though a request
2043        may be made in terms of a local name, the access attribute is always
2044        the experiment's fedid.
2045        """
2046        if not isinstance(key, fedid):
2047            key = self.get_experiment_fedid(key)
2048
2049        if self.auth.check_attribute(fid, key):
2050            return True
2051        else:
2052            raise service_error(service_error.access, "Access Denied")
2053
2054
2055    def get_handler(self, path, fid):
2056        """
2057        Perhaps surprisingly named, this function handles HTTP GET requests to
2058        this server (SOAP requests are POSTs).
2059        """
2060        self.log.info("Get handler %s %s" % (path, fid))
2061        if self.auth.check_attribute(fid, path):
2062            return ("%s/%s" % (self.repodir, path), "application/binary")
2063        else:
2064            return (None, None)
2065
2066    def clean_info_response(self, rv):
2067        """
2068        Remove the information in the experiment's state object that is not in
2069        the info response.
2070        """
2071        # Remove the owner info (should always be there, but...)
2072        if rv.has_key('owner'): del rv['owner']
2073
2074        # Convert the log into the allocationLog parameter and remove the
2075        # log entry (with defensive programming)
2076        if rv.has_key('log'):
2077            rv['allocationLog'] = "".join(rv['log'])
2078            del rv['log']
2079        else:
2080            rv['allocationLog'] = ""
2081
2082        if rv['experimentStatus'] != 'active':
2083            if rv.has_key('federant'): del rv['federant']
2084        else:
2085            # remove the allocationID and uri info from each federant
2086            for f in rv.get('federant', []):
2087                if f.has_key('allocID'): del f['allocID']
2088                if f.has_key('uri'): del f['uri']
2089
2090        return rv
2091
2092    def get_info(self, req, fid):
2093        """
2094        Return all the stored info about this experiment
2095        """
2096        rv = None
2097
2098        req = req.get('InfoRequestBody', None)
2099        if not req:
2100            raise service_error(service_error.req,
2101                    "Bad request format (no InfoRequestBody)")
2102        exp = req.get('experiment', None)
2103        if exp:
2104            if exp.has_key('fedid'):
2105                key = exp['fedid']
2106                keytype = "fedid"
2107            elif exp.has_key('localname'):
2108                key = exp['localname']
2109                keytype = "localname"
2110            else:
2111                raise service_error(service_error.req, "Unknown lookup type")
2112        else:
2113            raise service_error(service_error.req, "No request?")
2114
2115        self.check_experiment_access(fid, key)
2116
2117        # The state may be massaged by the service function that called
2118        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2119        # state.
2120        self.state_lock.acquire()
2121        if self.state.has_key(key):
2122            rv = copy.deepcopy(self.state[key])
2123        self.state_lock.release()
2124
2125        if rv:
2126            return self.clean_info_response(rv)
2127        else:
2128            raise service_error(service_error.req, "No such experiment")
2129
2130    def get_multi_info(self, req, fid):
2131        """
2132        Return all the stored info that this fedid can access
2133        """
2134        rv = { 'info': [ ] }
2135
2136        self.state_lock.acquire()
2137        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2138            try:
2139                self.check_experiment_access(fid, key)
2140            except service_error, e:
2141                if e.code == service_error.access:
2142                    continue
2143                else:
2144                    self.state_lock.release()
2145                    raise e
2146
2147            if self.state.has_key(key):
2148                e = copy.deepcopy(self.state[key])
2149                e = self.clean_info_response(e)
2150                rv['info'].append(e)
2151        self.state_lock.release()
2152        return rv
2153
2154    def check_termination_status(self, fed_exp, force):
2155        """
2156        Confirm that the experiment is sin a valid state to stop (or force it)
2157        return the state - invalid states for deletion and force settings cause
2158        exceptions.
2159        """
2160        self.state_lock.acquire()
2161        status = fed_exp.get('experimentStatus', None)
2162
2163        if status:
2164            if status in ('starting', 'terminating'):
2165                if not force:
2166                    self.state_lock.release()
2167                    raise service_error(service_error.partial, 
2168                            'Experiment still being created or destroyed')
2169                else:
2170                    self.log.warning('Experiment in %s state ' % status + \
2171                            'being terminated by force.')
2172            self.state_lock.release()
2173            return status
2174        else:
2175            # No status??? trouble
2176            self.state_lock.release()
2177            raise service_error(service_error.internal,
2178                    "Experiment has no status!?")
2179
2180
2181    def get_termination_info(self, fed_exp):
2182        ids = []
2183        term_params = { }
2184        self.state_lock.acquire()
2185        #  experimentID is a list of dicts that are self-describing
2186        #  identifiers.  This finds all the fedids and localnames - the
2187        #  keys of self.state - and puts them into ids, which is used to delete
2188        #  the state after everything is swapped out.
2189        for id in fed_exp.get('experimentID', []):
2190            if 'fedid' in id: 
2191                ids.append(id['fedid'])
2192                repo = "%s" % id['fedid']
2193            if 'localname' in id: ids.append(id['localname'])
2194
2195        # Get the experimentAccess - the principal for this experiment.  It
2196        # is this principal to which credentials have been delegated, and
2197        # as which the experiment controller must act.
2198        if 'experimentAccess' in fed_exp and \
2199                'X509' in fed_exp['experimentAccess']:
2200            expcert = fed_exp['experimentAccess']['X509']
2201        else:
2202            expcert = None
2203
2204        # Collect the allocation/segment ids into a dict keyed by the fedid
2205        # of the allocation (or a monotonically increasing integer) that
2206        # contains a tuple of uri, aid (which is a dict...)
2207        for i, fed in enumerate(fed_exp.get('federant', [])):
2208            try:
2209                uri = fed['uri']
2210                aid = fed['allocID']
2211                k = fed['allocID'].get('fedid', i)
2212            except KeyError, e:
2213                continue
2214            term_params[k] = (uri, aid)
2215        # Change the experiment state
2216        fed_exp['experimentStatus'] = 'terminating'
2217        if self.state_filename: self.write_state()
2218        self.state_lock.release()
2219
2220        return ids, term_params, expcert, repo
2221
2222
2223    def deallocate_resources(self, term_params, expcert, status, force, 
2224            dealloc_log):
2225        tmpdir = None
2226        # This try block makes sure the tempdir is cleared
2227        try:
2228            # If no expcert, try the deallocation as the experiment
2229            # controller instance.
2230            if expcert and self.auth_type != 'legacy': 
2231                try:
2232                    tmpdir = tempfile.mkdtemp(prefix="term-")
2233                except EnvironmentError:
2234                    raise service_error(service_error.internal, 
2235                            "Cannot create tmp dir")
2236                cert_file = self.make_temp_certfile(expcert, tmpdir)
2237                pw = None
2238            else: 
2239                cert_file = self.cert_file
2240                pw = self.cert_pwd
2241
2242            # Stop everyone.  NB, wait_for_all waits until a thread starts
2243            # and then completes, so we can't wait if nothing starts.  So,
2244            # no tbparams, no start.
2245            if len(term_params) > 0:
2246                tp = thread_pool(self.nthreads)
2247                for k, (uri, aid) in term_params.items():
2248                    # Create and start a thread to stop the segment
2249                    tp.wait_for_slot()
2250                    t  = pooled_thread(\
2251                            target=self.terminate_segment(log=dealloc_log,
2252                                testbed=uri,
2253                                cert_file=cert_file, 
2254                                cert_pwd=pw,
2255                                trusted_certs=self.trusted_certs,
2256                                caller=self.call_TerminateSegment),
2257                            args=(uri, aid), name=k,
2258                            pdata=tp, trace_file=self.trace_file)
2259                    t.start()
2260                # Wait for completions
2261                tp.wait_for_all_done()
2262
2263            # release the allocations (failed experiments have done this
2264            # already, and starting experiments may be in odd states, so we
2265            # ignore errors releasing those allocations
2266            try: 
2267                for k, (uri, aid)  in term_params.items():
2268                    self.release_access(None, aid, uri=uri,
2269                            cert_file=cert_file, cert_pwd=pw)
2270            except service_error, e:
2271                if status != 'failed' and not force:
2272                    raise e
2273
2274        # Clean up the tmpdir no matter what
2275        finally:
2276            if tmpdir: self.remove_dirs(tmpdir)
2277
2278
2279    def terminate_experiment(self, req, fid):
2280        """
2281        Swap this experiment out on the federants and delete the shared
2282        information
2283        """
2284        tbparams = { }
2285        req = req.get('TerminateRequestBody', None)
2286        if not req:
2287            raise service_error(service_error.req,
2288                    "Bad request format (no TerminateRequestBody)")
2289
2290        key = self.get_experiment_key(req, 'experiment')
2291        self.check_experiment_access(fid, key)
2292        exp = req.get('experiment', False)
2293        force = req.get('force', False)
2294
2295        dealloc_list = [ ]
2296
2297
2298        # Create a logger that logs to the dealloc_list as well as to the main
2299        # log file.
2300        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2301        h = logging.StreamHandler(self.list_log(dealloc_list))
2302        # XXX: there should be a global one of these rather than repeating the
2303        # code.
2304        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2305                    '%d %b %y %H:%M:%S'))
2306        dealloc_log.addHandler(h)
2307
2308        self.state_lock.acquire()
2309        fed_exp = self.state.get(key, None)
2310        self.state_lock.release()
2311        repo = None
2312
2313        if fed_exp:
2314            status = self.check_termination_status(fed_exp, force)
2315            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2316            self.deallocate_resources(term_params, expcert, status, force, 
2317                    dealloc_log)
2318
2319            # Remove the terminated experiment
2320            self.state_lock.acquire()
2321            for id in ids:
2322                if id in self.state: del self.state[id]
2323
2324            if self.state_filename: self.write_state()
2325            self.state_lock.release()
2326
2327            # Delete any synch points associated with this experiment.  All
2328            # synch points begin with the fedid of the experiment.
2329            fedid_keys = set(["fedid:%s" % f for f in ids \
2330                    if isinstance(f, fedid)])
2331            for k in self.synch_store.all_keys():
2332                try:
2333                    if len(k) > 45 and k[0:46] in fedid_keys:
2334                        self.synch_store.del_value(k)
2335                except synch_store.BadDeletionError:
2336                    pass
2337            self.write_store()
2338
2339            # Remove software and other cached stuff from the filesystem.
2340            if repo:
2341                self.remove_dirs("%s/%s" % (self.repodir, repo))
2342               
2343            return { 
2344                    'experiment': exp , 
2345                    'deallocationLog': string.join(dealloc_list, ''),
2346                    }
2347        else:
2348            raise service_error(service_error.req, "No saved state")
2349
2350
2351    def GetValue(self, req, fid):
2352        """
2353        Get a value from the synchronized store
2354        """
2355        req = req.get('GetValueRequestBody', None)
2356        if not req:
2357            raise service_error(service_error.req,
2358                    "Bad request format (no GetValueRequestBody)")
2359       
2360        name = req.get('name', None)
2361        wait = req.get('wait', False)
2362        rv = { 'name': name }
2363
2364        if name and self.auth.check_attribute(fid, name):
2365            self.log.debug("[GetValue] asking for %s " % name)
2366            try:
2367                v = self.synch_store.get_value(name, wait)
2368            except synch_store.RevokedKeyError:
2369                # No more synch on this key
2370                raise service_error(service_error.federant, 
2371                        "Synch key %s revoked" % name)
2372            if v is not None:
2373                rv['value'] = v
2374            self.log.debug("[GetValue] got %s from %s" % (v, name))
2375            return rv
2376        else:
2377            raise service_error(service_error.access, "Access Denied")
2378       
2379
2380    def SetValue(self, req, fid):
2381        """
2382        Set a value in the synchronized store
2383        """
2384        req = req.get('SetValueRequestBody', None)
2385        if not req:
2386            raise service_error(service_error.req,
2387                    "Bad request format (no SetValueRequestBody)")
2388       
2389        name = req.get('name', None)
2390        v = req.get('value', '')
2391
2392        if name and self.auth.check_attribute(fid, name):
2393            try:
2394                self.synch_store.set_value(name, v)
2395                self.write_store()
2396                self.log.debug("[SetValue] set %s to %s" % (name, v))
2397            except synch_store.CollisionError:
2398                # Translate into a service_error
2399                raise service_error(service_error.req,
2400                        "Value already set: %s" %name)
2401            except synch_store.RevokedKeyError:
2402                # No more synch on this key
2403                raise service_error(service_error.federant, 
2404                        "Synch key %s revoked" % name)
2405            return { 'name': name, 'value': v }
2406        else:
2407            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.