source: fedd/federation/experiment_control.py @ b67fd22

axis_examplecompt_changesinfo-ops
Last change on this file since b67fd22 was b67fd22, checked in by Ted Faber <faber@…>, 14 years ago

Remove abac credentials when the experiment is removed.

  • Property mode set to 100644
File size: 82.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205            self.get_access = self.legacy_get_access
206        elif self.auth_type == 'abac':
207            self.auth = abac_authorizer(load=self.auth_dir)
208        else:
209            raise service_error(service_error.internal, 
210                    "Unknown auth_type: %s" % self.auth_type)
211
212
213        if self.ssh_pubkey_file:
214            try:
215                f = open(self.ssh_pubkey_file, 'r')
216                self.ssh_pubkey = f.read()
217                f.close()
218            except EnvironmentError:
219                raise service_error(service_error.internal,
220                        "Cannot read sshpubkey")
221        else:
222            raise service_error(service_error.internal, 
223                    "No SSH public key file?")
224
225        if not self.ssh_privkey_file:
226            raise service_error(service_error.internal, 
227                    "No SSH public key file?")
228
229
230        if mapdb_file:
231            self.read_mapdb(mapdb_file)
232        else:
233            self.log.warn("[experiment_control] No testbed map, using defaults")
234            self.tbmap = { 
235                    'deter':'https://users.isi.deterlab.net:23235',
236                    'emulab':'https://users.isi.deterlab.net:23236',
237                    'ucb':'https://users.isi.deterlab.net:23237',
238                    }
239
240        if accessdb_file:
241                self.read_accessdb(accessdb_file)
242        else:
243            raise service_error(service_error.internal,
244                    "No accessdb specified in config")
245
246        # Grab saved state.  OK to do this w/o locking because it's read only
247        # and only one thread should be in existence that can see self.state at
248        # this point.
249        if self.state_filename:
250            self.read_state()
251
252        if self.store_filename:
253            self.read_store()
254        else:
255            self.log.warning("No saved synch store")
256            self.synch_store = synch_store
257
258        # Dispatch tables
259        self.soap_services = {\
260                'New': soap_handler('New', self.new_experiment),
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
266                'Terminate': soap_handler('Terminate', 
267                    self.terminate_experiment),
268                'GetValue': soap_handler('GetValue', self.GetValue),
269                'SetValue': soap_handler('SetValue', self.SetValue),
270        }
271
272        self.xmlrpc_services = {\
273                'New': xmlrpc_handler('New', self.new_experiment),
274                'Create': xmlrpc_handler('Create', self.create_experiment),
275                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
276                'Vis': xmlrpc_handler('Vis', self.get_vis),
277                'Info': xmlrpc_handler('Info', self.get_info),
278                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
279                'Terminate': xmlrpc_handler('Terminate',
280                    self.terminate_experiment),
281                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
282                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
283        }
284
285    # Call while holding self.state_lock
286    def write_state(self):
287        """
288        Write a new copy of experiment state after copying the existing state
289        to a backup.
290
291        State format is a simple pickling of the state dictionary.
292        """
293        if os.access(self.state_filename, os.W_OK):
294            copy_file(self.state_filename, \
295                    "%s.bak" % self.state_filename)
296        try:
297            f = open(self.state_filename, 'w')
298            pickle.dump(self.state, f)
299        except EnvironmentError, e:
300            self.log.error("Can't write file %s: %s" % \
301                    (self.state_filename, e))
302        except pickle.PicklingError, e:
303            self.log.error("Pickling problem: %s" % e)
304        except TypeError, e:
305            self.log.error("Pickling problem (TypeError): %s" % e)
306
307    @staticmethod
308    def get_alloc_ids(state):
309        """
310        Pull the fedids of the identifiers of each allocation from the
311        state.  Again, a dict dive that's best isolated.
312
313        Used by read_store and read state
314        """
315
316        return [ f['allocID']['fedid'] 
317                for f in state.get('federant',[]) \
318                    if f.has_key('allocID') and \
319                        f['allocID'].has_key('fedid')]
320
321    # Call while holding self.state_lock
322    def read_state(self):
323        """
324        Read a new copy of experiment state.  Old state is overwritten.
325
326        State format is a simple pickling of the state dictionary.
327        """
328       
329        def get_experiment_id(state):
330            """
331            Pull the fedid experimentID out of the saved state.  This is kind
332            of a gross walk through the dict.
333            """
334
335            if state.has_key('experimentID'):
336                for e in state['experimentID']:
337                    if e.has_key('fedid'):
338                        return e['fedid']
339                else:
340                    return None
341            else:
342                return None
343
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except EnvironmentError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355       
356        for s in self.state.values():
357            try:
358
359                eid = get_experiment_id(s)
360                if eid : 
361                    if self.auth_type == 'legacy':
362                        # XXX: legacy
363                        # Give the owner rights to the experiment
364                        self.auth.set_attribute(s['owner'], eid)
365                        # And holders of the eid as well
366                        self.auth.set_attribute(eid, eid)
367                        # allow overrides to control experiments as well
368                        for o in self.overrides:
369                            self.auth.set_attribute(o, eid)
370                        # Set permissions to allow reading of the software
371                        # repo, if any, as well.
372                        for a in self.get_alloc_ids(s):
373                            self.auth.set_attribute(a, 'repo/%s' % eid)
374                else:
375                    raise KeyError("No experiment id")
376            except KeyError, e:
377                self.log.warning("[read_state]: State ownership or identity " +\
378                        "misformatted in %s: %s" % (self.state_filename, e))
379
380
381    def read_accessdb(self, accessdb_file):
382        """
383        Read the mapping from fedids that can create experiments to their name
384        in the 3-level access namespace.  All will be asserted from this
385        testbed and can include the local username and porject that will be
386        asserted on their behalf by this fedd.  Each fedid is also added to the
387        authorization system with the "create" attribute.
388        """
389        self.accessdb = {}
390        # These are the regexps for parsing the db
391        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
392        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
394        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
395                "\s*->\s*(" + name_expr + ")\s*$")
396        lineno = 0
397
398        # Parse the mappings and store in self.authdb, a dict of
399        # fedid -> (proj, user)
400        try:
401            f = open(accessdb_file, "r")
402            for line in f:
403                lineno += 1
404                line = line.strip()
405                if len(line) == 0 or line.startswith('#'):
406                    continue
407                m = project_line.match(line)
408                if m:
409                    fid = fedid(hexstr=m.group(1))
410                    project, user = m.group(2,3)
411                    if not self.accessdb.has_key(fid):
412                        self.accessdb[fid] = []
413                    self.accessdb[fid].append((project, user))
414                    continue
415
416                m = user_line.match(line)
417                if m:
418                    fid = fedid(hexstr=m.group(1))
419                    project = None
420                    user = m.group(2)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425                self.log.warn("[experiment_control] Error parsing access " +\
426                        "db %s at line %d" %  (accessdb_file, lineno))
427        except EnvironmentError:
428            raise service_error(service_error.internal,
429                    ("Error opening/reading %s as experiment " +\
430                            "control accessdb") %  accessdb_file)
431        f.close()
432
433        # Initialize the authorization attributes
434        # XXX: legacy
435        if self.auth_type == 'legacy':
436            for fid in self.accessdb.keys():
437                self.auth.set_attribute(fid, 'create')
438                self.auth.set_attribute(fid, 'new')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except EnvironmentError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def read_store(self):
467        try:
468            self.synch_store = synch_store()
469            self.synch_store.load(self.store_filename)
470            self.log.debug("[read_store]: Read store from %s" % \
471                    self.store_filename)
472        except EnvironmentError, e:
473            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
474                    % (self.state_filename, e))
475            self.synch_store = synch_store()
476
477        # Set the initial permissions on data in the store.  XXX: This ad hoc
478        # authorization attribute initialization is getting out of hand.
479        # XXX: legacy
480        if self.auth_type == 'legacy':
481            for k in self.synch_store.all_keys():
482                try:
483                    if k.startswith('fedid:'):
484                        fid = fedid(hexstr=k[6:46])
485                        if self.state.has_key(fid):
486                            for a in self.get_alloc_ids(self.state[fid]):
487                                self.auth.set_attribute(a, k)
488                except ValueError, e:
489                    self.log.warn("Cannot deduce permissions for %s" % k)
490
491
492    def write_store(self):
493        """
494        Write a new copy of synch_store after writing current state
495        to a backup.  We use the internal synch_store pickle method to avoid
496        incinsistent data.
497
498        State format is a simple pickling of the store.
499        """
500        if os.access(self.store_filename, os.W_OK):
501            copy_file(self.store_filename, \
502                    "%s.bak" % self.store_filename)
503        try:
504            self.synch_store.save(self.store_filename)
505        except EnvironmentError, e:
506            self.log.error("Can't write file %s: %s" % \
507                    (self.store_filename, e))
508        except TypeError, e:
509            self.log.error("Pickling problem (TypeError): %s" % e)
510
511
512    def remove_dirs(self, dir):
513        """
514        Remove the directory tree and all files rooted at dir.  Log any errors,
515        but continue.
516        """
517        self.log.debug("[removedirs]: removing %s" % dir)
518        try:
519            for path, dirs, files in os.walk(dir, topdown=False):
520                for f in files:
521                    os.remove(os.path.join(path, f))
522                for d in dirs:
523                    os.rmdir(os.path.join(path, d))
524            os.rmdir(dir)
525        except EnvironmentError, e:
526            self.log.error("Error deleting directory tree in %s" % e);
527
528    @staticmethod
529    def make_temp_certfile(expcert, tmpdir):
530        """
531        make a protected copy of the access certificate so the experiment
532        controller can act as the experiment principal.  mkstemp is the most
533        secure way to do that. The directory should be created by
534        mkdtemp.  Return the filename.
535        """
536        if expcert and tmpdir:
537            try:
538                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
539                f = os.fdopen(certf, 'w')
540                print >> f, expcert
541                f.close()
542            except EnvironmentError, e:
543                raise service_error(service_error.internal, 
544                        "Cannot create temp cert file?")
545            return certfn
546        else:
547            return None
548
549       
550    def generate_ssh_keys(self, dest, type="rsa" ):
551        """
552        Generate a set of keys for the gateways to use to talk.
553
554        Keys are of type type and are stored in the required dest file.
555        """
556        valid_types = ("rsa", "dsa")
557        t = type.lower();
558        if t not in valid_types: raise ValueError
559        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
560
561        try:
562            trace = open("/dev/null", "w")
563        except EnvironmentError:
564            raise service_error(service_error.internal,
565                    "Cannot open /dev/null??");
566
567        # May raise CalledProcessError
568        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
569        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
570        if rv != 0:
571            raise service_error(service_error.internal, 
572                    "Cannot generate nonce ssh keys.  %s return code %d" \
573                            % (self.ssh_keygen, rv))
574
575    def gentopo(self, str):
576        """
577        Generate the topology data structure from the splitter's XML
578        representation of it.
579
580        The topology XML looks like:
581            <experiment>
582                <nodes>
583                    <node><vname></vname><ips>ip1:ip2</ips></node>
584                </nodes>
585                <lans>
586                    <lan>
587                        <vname></vname><vnode></vnode><ip></ip>
588                        <bandwidth></bandwidth><member>node:port</member>
589                    </lan>
590                </lans>
591        """
592        class topo_parse:
593            """
594            Parse the topology XML and create the dats structure.
595            """
596            def __init__(self):
597                # Typing of the subelements for data conversion
598                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
599                self.int_subelements = ( 'bandwidth',)
600                self.float_subelements = ( 'delay',)
601                # The final data structure
602                self.nodes = [ ]
603                self.lans =  [ ]
604                self.topo = { \
605                        'node': self.nodes,\
606                        'lan' : self.lans,\
607                    }
608                self.element = { }  # Current element being created
609                self.chars = ""     # Last text seen
610
611            def end_element(self, name):
612                # After each sub element the contents is added to the current
613                # element or to the appropriate list.
614                if name == 'node':
615                    self.nodes.append(self.element)
616                    self.element = { }
617                elif name == 'lan':
618                    self.lans.append(self.element)
619                    self.element = { }
620                elif name in self.str_subelements:
621                    self.element[name] = self.chars
622                    self.chars = ""
623                elif name in self.int_subelements:
624                    self.element[name] = int(self.chars)
625                    self.chars = ""
626                elif name in self.float_subelements:
627                    self.element[name] = float(self.chars)
628                    self.chars = ""
629
630            def found_chars(self, data):
631                self.chars += data.rstrip()
632
633
634        tp = topo_parse();
635        parser = xml.parsers.expat.ParserCreate()
636        parser.EndElementHandler = tp.end_element
637        parser.CharacterDataHandler = tp.found_chars
638
639        parser.Parse(str)
640
641        return tp.topo
642       
643
644    def genviz(self, topo):
645        """
646        Generate the visualization the virtual topology
647        """
648
649        neato = "/usr/local/bin/neato"
650        # These are used to parse neato output and to create the visualization
651        # file.
652        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
653        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
654                "%s</type></node>"
655
656        try:
657            # Node names
658            nodes = [ n['vname'] for n in topo['node'] ]
659            topo_lans = topo['lan']
660        except KeyError, e:
661            raise service_error(service_error.internal, "Bad topology: %s" %e)
662
663        lans = { }
664        links = { }
665
666        # Walk through the virtual topology, organizing the connections into
667        # 2-node connections (links) and more-than-2-node connections (lans).
668        # When a lan is created, it's added to the list of nodes (there's a
669        # node in the visualization for the lan).
670        for l in topo_lans:
671            if links.has_key(l['vname']):
672                if len(links[l['vname']]) < 2:
673                    links[l['vname']].append(l['vnode'])
674                else:
675                    nodes.append(l['vname'])
676                    lans[l['vname']] = links[l['vname']]
677                    del links[l['vname']]
678                    lans[l['vname']].append(l['vnode'])
679            elif lans.has_key(l['vname']):
680                lans[l['vname']].append(l['vnode'])
681            else:
682                links[l['vname']] = [ l['vnode'] ]
683
684
685        # Open up a temporary file for dot to turn into a visualization
686        try:
687            df, dotname = tempfile.mkstemp()
688            dotfile = os.fdopen(df, 'w')
689        except EnvironmentError:
690            raise service_error(service_error.internal,
691                    "Failed to open file in genviz")
692
693        try:
694            dnull = open('/dev/null', 'w')
695        except EnvironmentError:
696            service_error(service_error.internal,
697                    "Failed to open /dev/null in genviz")
698
699        # Generate a dot/neato input file from the links, nodes and lans
700        try:
701            print >>dotfile, "graph G {"
702            for n in nodes:
703                print >>dotfile, '\t"%s"' % n
704            for l in links.keys():
705                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
706            for l in lans.keys():
707                for n in lans[l]:
708                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
709            print >>dotfile, "}"
710            dotfile.close()
711        except TypeError:
712            raise service_error(service_error.internal,
713                    "Single endpoint link in vtopo")
714        except EnvironmentError:
715            raise service_error(service_error.internal, "Cannot write dot file")
716
717        # Use dot to create a visualization
718        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
719                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
720                close_fds=True)
721        dnull.close()
722
723        # Translate dot to vis format
724        vis_nodes = [ ]
725        vis = { 'node': vis_nodes }
726        for line in dot.stdout:
727            m = vis_re.match(line)
728            if m:
729                vn = m.group(1)
730                vis_node = {'name': vn, \
731                        'x': float(m.group(2)),\
732                        'y' : float(m.group(3)),\
733                    }
734                if vn in links.keys() or vn in lans.keys():
735                    vis_node['type'] = 'lan'
736                else:
737                    vis_node['type'] = 'node'
738                vis_nodes.append(vis_node)
739        rv = dot.wait()
740
741        os.remove(dotname)
742        if rv == 0 : return vis
743        else: return None
744
745
746    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
747            cert_pwd=None):
748        """
749        Release access to testbed through fedd
750        """
751
752        if not uri and tbmap:
753            uri = tbmap.get(tb, None)
754        if not uri:
755            raise service_error(service_error.server_config, 
756                    "Unknown testbed: %s" % tb)
757
758        if self.local_access.has_key(uri):
759            resp = self.local_access[uri].ReleaseAccess(\
760                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
761                    fedid(file=cert_file))
762            resp = { 'ReleaseAccessResponseBody': resp } 
763        else:
764            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
765                    cert_file, cert_pwd, self.trusted_certs)
766
767        # better error coding
768
769    def remote_ns2topdl(self, uri, desc):
770
771        req = {
772                'description' : { 'ns2description': desc },
773            }
774
775        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
776                self.trusted_certs)
777
778        if r.has_key('Ns2TopdlResponseBody'):
779            r = r['Ns2TopdlResponseBody']
780            ed = r.get('experimentdescription', None)
781            if ed.has_key('topdldescription'):
782                return topdl.Topology(**ed['topdldescription'])
783            else:
784                raise service_error(service_error.protocol, 
785                        "Bad splitter response (no output)")
786        else:
787            raise service_error(service_error.protocol, "Bad splitter response")
788
789    class start_segment:
790        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
791                cert_pwd=None, trusted_certs=None, caller=None,
792                log_collector=None):
793            self.log = log
794            self.debug = debug
795            self.cert_file = cert_file
796            self.cert_pwd = cert_pwd
797            self.trusted_certs = None
798            self.caller = caller
799            self.testbed = testbed
800            self.log_collector = log_collector
801            self.response = None
802            self.node = { }
803
804        def make_map(self, resp):
805            for e in resp.get('embedding', []):
806                if 'toponame' in e and 'physname' in e:
807                    self.node[e['toponame']] = e['physname'][0]
808
809        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
810            req = {
811                    'allocID': { 'fedid' : aid }, 
812                    'segmentdescription': { 
813                        'topdldescription': topo.to_dict(),
814                    },
815                }
816
817            if connInfo:
818                req['connection'] = connInfo
819
820            import_svcs = [ s for m in masters.values() \
821                    for s in m if self.testbed in s.importers]
822
823            if import_svcs or self.testbed in masters:
824                req['service'] = []
825
826            for s in import_svcs:
827                for r in s.reqs:
828                    sr = copy.deepcopy(r)
829                    sr['visibility'] = 'import';
830                    req['service'].append(sr)
831
832            for s in masters.get(self.testbed, []):
833                for r in s.reqs:
834                    sr = copy.deepcopy(r)
835                    sr['visibility'] = 'export';
836                    req['service'].append(sr)
837
838            if attrs:
839                req['fedAttr'] = attrs
840
841            try:
842                self.log.debug("Calling StartSegment at %s " % uri)
843                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
844                        self.trusted_certs)
845                if r.has_key('StartSegmentResponseBody'):
846                    lval = r['StartSegmentResponseBody'].get('allocationLog',
847                            None)
848                    if lval and self.log_collector:
849                        for line in  lval.splitlines(True):
850                            self.log_collector.write(line)
851                    self.make_map(r['StartSegmentResponseBody'])
852                    self.response = r
853                else:
854                    raise service_error(service_error.internal, 
855                            "Bad response!?: %s" %r)
856                return True
857            except service_error, e:
858                self.log.error("Start segment failed on %s: %s" % \
859                        (self.testbed, e))
860                return False
861
862
863
864    class terminate_segment:
865        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
866                cert_pwd=None, trusted_certs=None, caller=None):
867            self.log = log
868            self.debug = debug
869            self.cert_file = cert_file
870            self.cert_pwd = cert_pwd
871            self.trusted_certs = None
872            self.caller = caller
873            self.testbed = testbed
874
875        def __call__(self, uri, aid ):
876            req = {
877                    'allocID': aid , 
878                }
879            try:
880                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
881                        self.trusted_certs)
882                return True
883            except service_error, e:
884                self.log.error("Terminate segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888    def allocate_resources(self, allocated, masters, eid, expid, 
889            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
890            attrs=None, connInfo={}, tbmap=None, expcert=None):
891
892        started = { }           # Testbeds where a sub-experiment started
893                                # successfully
894
895        # XXX
896        fail_soft = False
897
898        if tbmap is None: tbmap = { }
899
900        log = alloc_log or self.log
901
902        tp = thread_pool(self.nthreads)
903        threads = [ ]
904        starters = [ ]
905
906        if expcert:
907            cert = expcert
908            pw = None
909        else:
910            cert = self.cert_file
911            pw = self.cert_pwd
912
913        for tb in allocated.keys():
914            # Create and start a thread to start the segment, and save it
915            # to get the return value later
916            tb_attrs = copy.copy(attrs)
917            tp.wait_for_slot()
918            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
919            base, suffix = split_testbed(tb)
920            if suffix:
921                tb_attrs.append({'attribute': 'experiment_name', 
922                    'value': "%s-%s" % (eid, suffix)})
923            else:
924                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
925            if not uri:
926                raise service_error(service_error.internal, 
927                        "Unknown testbed %s !?" % tb)
928
929            if tbparams[tb].has_key('allocID') and \
930                    tbparams[tb]['allocID'].has_key('fedid'):
931                aid = tbparams[tb]['allocID']['fedid']
932            else:
933                raise service_error(service_error.internal, 
934                        "No alloc id for testbed %s !?" % tb)
935
936            s = self.start_segment(log=log, debug=self.debug,
937                    testbed=tb, cert_file=cert,
938                    cert_pwd=pw, trusted_certs=self.trusted_certs,
939                    caller=self.call_StartSegment,
940                    log_collector=log_collector)
941            starters.append(s)
942            t  = pooled_thread(\
943                    target=s, name=tb,
944                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
945                    pdata=tp, trace_file=self.trace_file)
946            threads.append(t)
947            t.start()
948
949        # Wait until all finish (keep pinging the log, though)
950        mins = 0
951        revoked = False
952        while not tp.wait_for_all_done(60.0):
953            mins += 1
954            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
955                    % mins)
956            if not revoked and \
957                    len([ t.getName() for t in threads if t.rv == False]) > 0:
958                # a testbed has failed.  Revoke this experiment's
959                # synchronizarion values so that sub experiments will not
960                # deadlock waiting for synchronization that will never happen
961                self.log.info("A subexperiment has failed to swap in, " + \
962                        "revoking synch keys")
963                var_key = "fedid:%s" % expid
964                for k in self.synch_store.all_keys():
965                    if len(k) > 45 and k[0:46] == var_key:
966                        self.synch_store.revoke_key(k)
967                revoked = True
968
969        failed = [ t.getName() for t in threads if not t.rv ]
970        succeeded = [tb for tb in allocated.keys() if tb not in failed]
971
972        # If one failed clean up, unless fail_soft is set
973        if failed:
974            if not fail_soft:
975                tp.clear()
976                for tb in succeeded:
977                    # Create and start a thread to stop the segment
978                    tp.wait_for_slot()
979                    uri = tbparams[tb]['uri']
980                    t  = pooled_thread(\
981                            target=self.terminate_segment(log=log,
982                                testbed=tb,
983                                cert_file=cert, 
984                                cert_pwd=pw,
985                                trusted_certs=self.trusted_certs,
986                                caller=self.call_TerminateSegment),
987                            args=(uri, tbparams[tb]['federant']['allocID']),
988                            name=tb,
989                            pdata=tp, trace_file=self.trace_file)
990                    t.start()
991                # Wait until all finish (if any are being stopped)
992                if succeeded:
993                    tp.wait_for_all_done()
994
995                # release the allocations
996                for tb in tbparams.keys():
997                    try:
998                        self.release_access(tb, tbparams[tb]['allocID'], 
999                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1000                                cert_file=cert, cert_pwd=pw)
1001                    except service_error, e:
1002                        self.log.warn("Error releasing access: %s" % e.desc)
1003                # Remove the placeholder
1004                self.state_lock.acquire()
1005                self.state[eid]['experimentStatus'] = 'failed'
1006                if self.state_filename: self.write_state()
1007                self.state_lock.release()
1008                # Remove the repo dir
1009                self.remove_dirs("%s/%s" %(self.repodir, expid))
1010                # Walk up tmpdir, deleting as we go
1011                if self.cleanup:
1012                    self.remove_dirs(tmpdir)
1013                else:
1014                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1015
1016
1017                log.error("Swap in failed on %s" % ",".join(failed))
1018                return
1019        else:
1020            # Walk through the successes and gather the virtual to physical
1021            # mapping.
1022            embedding = [ ]
1023            for s in starters:
1024                for k, v in s.node.items():
1025                    embedding.append({
1026                        'toponame': k, 
1027                        'physname': [ v],
1028                        'testbed': s.testbed
1029                        })
1030            log.info("[start_segment]: Experiment %s active" % eid)
1031
1032
1033        # Walk up tmpdir, deleting as we go
1034        if self.cleanup:
1035            self.remove_dirs(tmpdir)
1036        else:
1037            log.debug("[start_experiment]: not removing %s" % tmpdir)
1038
1039        # Insert the experiment into our state and update the disk copy.
1040        self.state_lock.acquire()
1041        self.state[expid]['experimentStatus'] = 'active'
1042        self.state[eid] = self.state[expid]
1043        self.state[eid]['experimentdescription']['topdldescription'] = \
1044                top.to_dict()
1045        self.state[eid]['embedding'] = embedding
1046        if self.state_filename: self.write_state()
1047        self.state_lock.release()
1048        return
1049
1050
1051    def add_kit(self, e, kit):
1052        """
1053        Add a Software object created from the list of (install, location)
1054        tuples passed as kit  to the software attribute of an object e.  We
1055        do this enough to break out the code, but it's kind of a hack to
1056        avoid changing the old tuple rep.
1057        """
1058
1059        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1060
1061        if isinstance(e.software, list): e.software.extend(s)
1062        else: e.software = s
1063
1064    def append_experiment_authorization(self, expid, attrs, 
1065            need_state_lock=True):
1066        """
1067        Append the authorization information to system state
1068        """
1069
1070        for p, a in attrs:
1071            self.auth.set_attribute(p, a)
1072        self.auth.save()
1073
1074        if need_state_lock: self.state_lock.acquire()
1075        self.state[expid]['auth'].update(attrs)
1076        if self.state_filename: self.write_state()
1077        if need_state_lock: self.state_lock.release()
1078
1079    def clear_experiment_authorizaton(self, expid, need_state_lock=True):
1080        """
1081        Attrs is a set of attribute principal pairs that need to be removed
1082        from the authenticator.  Remove them and save the authenticator.
1083        """
1084
1085        if need_state_lock: self.state_lock.acquire()
1086        if expid in self.state and 'auth' in self.state[expid]:
1087            for p, a in self.state[expid]['auth']:
1088                self.auth.unset_attribute(p, a)
1089            self.state[expid]['auth'] = set()
1090        if self.state_filename: self.write_state()
1091        if need_state_lock: self.state_lock.release()
1092        self.auth.save()
1093
1094
1095    def create_experiment_state(self, fid, req, expid, expcert,
1096            state='starting'):
1097        """
1098        Create the initial entry in the experiment's state.  The expid and
1099        expcert are the experiment's fedid and certifacte that represents that
1100        ID, which are installed in the experiment state.  If the request
1101        includes a suggested local name that is used if possible.  If the local
1102        name is already taken by an experiment owned by this user that has
1103        failed, it is overwritten.  Otherwise new letters are added until a
1104        valid localname is found.  The generated local name is returned.
1105        """
1106
1107        if req.has_key('experimentID') and \
1108                req['experimentID'].has_key('localname'):
1109            overwrite = False
1110            eid = req['experimentID']['localname']
1111            # If there's an old failed experiment here with the same local name
1112            # and accessible by this user, we'll overwrite it, otherwise we'll
1113            # fall through and do the collision avoidance.
1114            old_expid = self.get_experiment_fedid(eid)
1115            if old_expid and self.check_experiment_access(fid, old_expid):
1116                self.state_lock.acquire()
1117                status = self.state[eid].get('experimentStatus', None)
1118                if status and status == 'failed':
1119                    # remove the old access attributes
1120                    self.clear_experiment_authorization(eid,
1121                            need_state_lock=False)
1122                    overwrite = True
1123                    del self.state[eid]
1124                    del self.state[old_expid]
1125                self.state_lock.release()
1126            self.state_lock.acquire()
1127            while (self.state.has_key(eid) and not overwrite):
1128                eid += random.choice(string.ascii_letters)
1129            # Initial state
1130            self.state[eid] = {
1131                    'experimentID' : \
1132                            [ { 'localname' : eid }, {'fedid': expid } ],
1133                    'experimentStatus': state,
1134                    'experimentAccess': { 'X509' : expcert },
1135                    'owner': fid,
1136                    'log' : [],
1137                    'auth': set(),
1138                }
1139            self.state[expid] = self.state[eid]
1140            if self.state_filename: self.write_state()
1141            self.state_lock.release()
1142        else:
1143            eid = self.exp_stem
1144            for i in range(0,5):
1145                eid += random.choice(string.ascii_letters)
1146            self.state_lock.acquire()
1147            while (self.state.has_key(eid)):
1148                eid = self.exp_stem
1149                for i in range(0,5):
1150                    eid += random.choice(string.ascii_letters)
1151            # Initial state
1152            self.state[eid] = {
1153                    'experimentID' : \
1154                            [ { 'localname' : eid }, {'fedid': expid } ],
1155                    'experimentStatus': state,
1156                    'experimentAccess': { 'X509' : expcert },
1157                    'owner': fid,
1158                    'log' : [],
1159                    'auth': set(),
1160                }
1161            self.state[expid] = self.state[eid]
1162            if self.state_filename: self.write_state()
1163            self.state_lock.release()
1164
1165        # Let users touch the state.  Authorize this fid and the expid itself
1166        # to touch the experiment, as well as allowing th eoverrides.
1167        self.append_experiment_authorization(eid, 
1168                set([(fid, expid), (expid,expid)] + \
1169                        [ (o, expid) for o in self.overrides]))
1170
1171        return eid
1172
1173
1174    def allocate_ips_to_topo(self, top):
1175        """
1176        Add an ip4_address attribute to all the hosts in the topology, based on
1177        the shared substrates on which they sit.  An /etc/hosts file is also
1178        created and returned as a list of hostfiles entries.  We also return
1179        the allocator, because we may need to allocate IPs to portals
1180        (specifically DRAGON portals).
1181        """
1182        subs = sorted(top.substrates, 
1183                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1184                reverse=True)
1185        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1186        ifs = { }
1187        hosts = [ ]
1188
1189        for idx, s in enumerate(subs):
1190            net_size = len(s.interfaces)+2
1191
1192            a = ips.allocate(net_size)
1193            if a :
1194                base, num = a
1195                if num < net_size: 
1196                    raise service_error(service_error.internal,
1197                            "Allocator returned wrong number of IPs??")
1198            else:
1199                raise service_error(service_error.req, 
1200                        "Cannot allocate IP addresses")
1201            mask = ips.min_alloc
1202            while mask < net_size:
1203                mask *= 2
1204
1205            netmask = ((2**32-1) ^ (mask-1))
1206
1207            base += 1
1208            for i in s.interfaces:
1209                i.attribute.append(
1210                        topdl.Attribute('ip4_address', 
1211                            "%s" % ip_addr(base)))
1212                i.attribute.append(
1213                        topdl.Attribute('ip4_netmask', 
1214                            "%s" % ip_addr(int(netmask))))
1215
1216                hname = i.element.name
1217                if ifs.has_key(hname):
1218                    hosts.append("%s\t%s-%s %s-%d" % \
1219                            (ip_addr(base), hname, s.name, hname,
1220                                ifs[hname]))
1221                else:
1222                    ifs[hname] = 0
1223                    hosts.append("%s\t%s-%s %s-%d %s" % \
1224                            (ip_addr(base), hname, s.name, hname,
1225                                ifs[hname], hname))
1226
1227                ifs[hname] += 1
1228                base += 1
1229        return hosts, ips
1230
1231    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1232            tbparam, masters, tbmap, expid=None, expcert=None):
1233        for tb in testbeds:
1234            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1235                    expcert)
1236            allocated[tb] = 1
1237
1238    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1239            expcert=None):
1240        """
1241        Get access to testbed through fedd and set the parameters for that tb
1242        """
1243        def get_export_project(svcs):
1244            """
1245            Look through for the list of federated_service for this testbed
1246            objects for a project_export service, and extract the project
1247            parameter.
1248            """
1249
1250            pe = [s for s in svcs if s.name=='project_export']
1251            if len(pe) == 1:
1252                return pe[0].params.get('project', None)
1253            elif len(pe) == 0:
1254                return None
1255            else:
1256                raise service_error(service_error.req,
1257                        "More than one project export is not supported")
1258
1259        def add_services(svcs, type, slist):
1260            """
1261            Add the given services to slist.  type is import or export.
1262            """
1263            for i, s in enumerate(svcs):
1264                idx = '%s%d' % (type, i)
1265                sr = {'id': idx, 'name': s.name, 'visibility': type }
1266                if s.params:
1267                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1268                            for k, v in s.params.items()]
1269                slist.append(sr)
1270
1271        uri = tbmap.get(testbed_base(tb), None)
1272        if not uri:
1273            raise service_error(service_error.server_config, 
1274                    "Unknown testbed: %s" % tb)
1275
1276        export_svcs = masters.get(tb,[])
1277        import_svcs = [ s for m in masters.values() \
1278                for s in m \
1279                    if tb in s.importers ]
1280
1281        export_project = get_export_project(export_svcs)
1282        # Compose the credential list so that IDs come before attributes
1283        creds = set()
1284        keys = set()
1285        certs = self.auth.get_creds_for_principal(fid)
1286        if expid:
1287            certs.update(self.auth.get_creds_for_principal(expid))
1288        for c in certs:
1289            keys.add(c.issuer_cert())
1290            creds.add(c.attribute_cert())
1291        creds = list(keys) + list(creds)
1292
1293        if expcert: cert, pw = expcert, None
1294        else: cert, pw = self.cert_file, self.cert_pw
1295
1296        # Request credentials
1297        req = {
1298                'abac_credential': creds,
1299            }
1300        # Make the service request from the services we're importing and
1301        # exporting.  Keep track of the export request ids so we can
1302        # collect the resulting info from the access response.
1303        e_keys = { }
1304        if import_svcs or export_svcs:
1305            slist = []
1306            add_services(import_svcs, 'import', slist)
1307            add_services(export_svcs, 'export', slist)
1308            req['service'] = slist
1309
1310        if self.local_access.has_key(uri):
1311            # Local access call
1312            req = { 'RequestAccessRequestBody' : req }
1313            r = self.local_access[uri].RequestAccess(req, 
1314                    fedid(file=self.cert_file))
1315            r = { 'RequestAccessResponseBody' : r }
1316        else:
1317            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1318
1319        if r.has_key('RequestAccessResponseBody'):
1320            # Through to here we have a valid response, not a fault.
1321            # Access denied is a fault, so something better or worse than
1322            # access denied has happened.
1323            r = r['RequestAccessResponseBody']
1324            self.log.debug("[get_access] Access granted")
1325        else:
1326            raise service_error(service_error.protocol,
1327                        "Bad proxy response")
1328       
1329        tbparam[tb] = { 
1330                "allocID" : r['allocID'],
1331                "uri": uri,
1332                }
1333
1334        # Collect the responses corresponding to the services this testbed
1335        # exports.  These will be the service requests that we will include in
1336        # the start segment requests (with appropriate visibility values) to
1337        # import and export the segments.
1338        for s in r.get('service', []):
1339            id = s.get('id', None)
1340            if id and id in e_keys:
1341                e_keys[id].reqs.append(s)
1342
1343        # Add attributes to parameter space.  We don't allow attributes to
1344        # overlay any parameters already installed.
1345        for a in r.get('fedAttr', []):
1346            try:
1347                if a['attribute'] and \
1348                        isinstance(a['attribute'], basestring)\
1349                        and not tbparam[tb].has_key(a['attribute'].lower()):
1350                    tbparam[tb][a['attribute'].lower()] = a['value']
1351            except KeyError:
1352                self.log.error("Bad attribute in response: %s" % a)
1353
1354
1355    def split_topology(self, top, topo, testbeds):
1356        """
1357        Create the sub-topologies that are needed for experiment instantiation.
1358        """
1359        for tb in testbeds:
1360            topo[tb] = top.clone()
1361            # copy in for loop allows deletions from the original
1362            for e in [ e for e in topo[tb].elements]:
1363                etb = e.get_attribute('testbed')
1364                # NB: elements without a testbed attribute won't appear in any
1365                # sub topologies. 
1366                if not etb or etb != tb:
1367                    for i in e.interface:
1368                        for s in i.subs:
1369                            try:
1370                                s.interfaces.remove(i)
1371                            except ValueError:
1372                                raise service_error(service_error.internal,
1373                                        "Can't remove interface??")
1374                    topo[tb].elements.remove(e)
1375            topo[tb].make_indices()
1376
1377    def wrangle_software(self, expid, top, topo, tbparams):
1378        """
1379        Copy software out to the repository directory, allocate permissions and
1380        rewrite the segment topologies to look for the software in local
1381        places.
1382        """
1383
1384        # Copy the rpms and tarfiles to a distribution directory from
1385        # which the federants can retrieve them
1386        linkpath = "%s/software" %  expid
1387        softdir ="%s/%s" % ( self.repodir, linkpath)
1388        softmap = { }
1389        # These are in a list of tuples format (each kit).  This comprehension
1390        # unwraps them into a single list of tuples that initilaizes the set of
1391        # tuples.
1392        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1393                for p, t in l ])
1394        pkgs.update([x.location for e in top.elements \
1395                for x in e.software])
1396        try:
1397            os.makedirs(softdir)
1398        except EnvironmentError, e:
1399            raise service_error(
1400                    "Cannot create software directory: %s" % e)
1401        # The actual copying.  Everything's converted into a url for copying.
1402        auth_attrs = set()
1403        for pkg in pkgs:
1404            loc = pkg
1405
1406            scheme, host, path = urlparse(loc)[0:3]
1407            dest = os.path.basename(path)
1408            if not scheme:
1409                if not loc.startswith('/'):
1410                    loc = "/%s" % loc
1411                loc = "file://%s" %loc
1412            try:
1413                u = urlopen(loc)
1414            except Exception, e:
1415                raise service_error(service_error.req, 
1416                        "Cannot open %s: %s" % (loc, e))
1417            try:
1418                f = open("%s/%s" % (softdir, dest) , "w")
1419                self.log.debug("Writing %s/%s" % (softdir,dest) )
1420                data = u.read(4096)
1421                while data:
1422                    f.write(data)
1423                    data = u.read(4096)
1424                f.close()
1425                u.close()
1426            except Exception, e:
1427                raise service_error(service_error.internal,
1428                        "Could not copy %s: %s" % (loc, e))
1429            path = re.sub("/tmp", "", linkpath)
1430            # XXX
1431            softmap[pkg] = \
1432                    "%s/%s/%s" %\
1433                    ( self.repo_url, path, dest)
1434
1435            # Allow the individual segments to access the software by assigning
1436            # an attribute to each testbed allocation that encodes the data to
1437            # be released.  This expression collects the data for each run of
1438            # the loop.
1439            auth_attrs.update([
1440                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1441                        for tb in tbparams.keys()])
1442
1443        self.append_experiment_authorization(expid, auth_attrs)
1444
1445        # Convert the software locations in the segments into the local
1446        # copies on this host
1447        for soft in [ s for tb in topo.values() \
1448                for e in tb.elements \
1449                    if getattr(e, 'software', False) \
1450                        for s in e.software ]:
1451            if softmap.has_key(soft.location):
1452                soft.location = softmap[soft.location]
1453
1454
1455    def new_experiment(self, req, fid):
1456        """
1457        The external interface to empty initial experiment creation called from
1458        the dispatcher.
1459
1460        Creates a working directory, splits the incoming description using the
1461        splitter script and parses out the avrious subsections using the
1462        lcasses above.  Once each sub-experiment is created, use pooled threads
1463        to instantiate them and start it all up.
1464        """
1465        req = req.get('NewRequestBody', None)
1466        if not req:
1467            raise service_error(service_error.req,
1468                    "Bad request format (no NewRequestBody)")
1469
1470        if self.auth.import_credentials(data_list=req.get('credential', [])):
1471            self.auth.save()
1472
1473        if not self.auth.check_attribute(fid, 'new'):
1474            raise service_error(service_error.access, "New access denied")
1475
1476        try:
1477            tmpdir = tempfile.mkdtemp(prefix="split-")
1478        except EnvironmentError:
1479            raise service_error(service_error.internal, "Cannot create tmp dir")
1480
1481        try:
1482            access_user = self.accessdb[fid]
1483        except KeyError:
1484            raise service_error(service_error.internal,
1485                    "Access map and authorizer out of sync in " + \
1486                            "new_experiment for fedid %s"  % fid)
1487
1488        # Generate an ID for the experiment (slice) and a certificate that the
1489        # allocator can use to prove they own it.  We'll ship it back through
1490        # the encrypted connection.  If the requester supplied one, use it.
1491        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1492            expcert = req['experimentAccess']['X509']
1493            expid = fedid(certstr=expcert)
1494            self.state_lock.acquire()
1495            if expid in self.state:
1496                self.state_lock.release()
1497                raise service_error(service_error.req, 
1498                        'fedid %s identifies an existing experiment' % expid)
1499            self.state_lock.release()
1500        else:
1501            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1502
1503        #now we're done with the tmpdir, and it should be empty
1504        if self.cleanup:
1505            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1506            os.rmdir(tmpdir)
1507        else:
1508            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1509
1510        eid = self.create_experiment_state(fid, req, expid, expcert, 
1511                state='empty')
1512
1513        rv = {
1514                'experimentID': [
1515                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1516                ],
1517                'experimentStatus': 'empty',
1518                'experimentAccess': { 'X509' : expcert }
1519            }
1520
1521        return rv
1522
1523    # create_experiment sub-functions
1524
1525    @staticmethod
1526    def get_experiment_key(req, field='experimentID'):
1527        """
1528        Parse the experiment identifiers out of the request (the request body
1529        tag has been removed).  Specifically this pulls either the fedid or the
1530        localname out of the experimentID field.  A fedid is preferred.  If
1531        neither is present or the request does not contain the fields,
1532        service_errors are raised.
1533        """
1534        # Get the experiment access
1535        exp = req.get(field, None)
1536        if exp:
1537            if exp.has_key('fedid'):
1538                key = exp['fedid']
1539            elif exp.has_key('localname'):
1540                key = exp['localname']
1541            else:
1542                raise service_error(service_error.req, "Unknown lookup type")
1543        else:
1544            raise service_error(service_error.req, "No request?")
1545
1546        return key
1547
1548    def get_experiment_ids_and_start(self, key, tmpdir):
1549        """
1550        Get the experiment name, id and access certificate from the state, and
1551        set the experiment state to 'starting'.  returns a triple (fedid,
1552        localname, access_cert_file). The access_cert_file is a copy of the
1553        contents of the access certificate, created in the tempdir with
1554        restricted permissions.  If things are confused, raise an exception.
1555        """
1556
1557        expid = eid = None
1558        self.state_lock.acquire()
1559        if self.state.has_key(key):
1560            self.state[key]['experimentStatus'] = "starting"
1561            for e in self.state[key].get('experimentID',[]):
1562                if not expid and e.has_key('fedid'):
1563                    expid = e['fedid']
1564                elif not eid and e.has_key('localname'):
1565                    eid = e['localname']
1566            if 'experimentAccess' in self.state[key] and \
1567                    'X509' in self.state[key]['experimentAccess']:
1568                expcert = self.state[key]['experimentAccess']['X509']
1569            else:
1570                expcert = None
1571        self.state_lock.release()
1572
1573        # make a protected copy of the access certificate so the experiment
1574        # controller can act as the experiment principal.
1575        if expcert:
1576            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1577            if not expcert_file:
1578                raise service_error(service_error.internal, 
1579                        "Cannot create temp cert file?")
1580        else:
1581            expcert_file = None
1582
1583        return (eid, expid, expcert_file)
1584
1585    def get_topology(self, req, tmpdir):
1586        """
1587        Get the ns2 content and put it into a file for parsing.  Call the local
1588        or remote parser and return the topdl.Topology.  Errors result in
1589        exceptions.  req is the request and tmpdir is a work directory.
1590        """
1591
1592        # The tcl parser needs to read a file so put the content into that file
1593        descr=req.get('experimentdescription', None)
1594        if descr:
1595            if 'ns2description' in descr:
1596                file_content=descr['ns2description']
1597            elif 'topdldescription' in descr:
1598                return topdl.Topology(**descr['topdldescription'])
1599            else:
1600                raise service_error(service_error.req, 
1601                        'Unknown experiment description type')
1602        else:
1603            raise service_error(service_error.req, "No experiment description")
1604
1605
1606        if self.splitter_url:
1607            self.log.debug("Calling remote topdl translator at %s" % \
1608                    self.splitter_url)
1609            top = self.remote_ns2topdl(self.splitter_url, file_content)
1610        else:
1611            tclfile = os.path.join(tmpdir, "experiment.tcl")
1612            if file_content:
1613                try:
1614                    f = open(tclfile, 'w')
1615                    f.write(file_content)
1616                    f.close()
1617                except EnvironmentError:
1618                    raise service_error(service_error.internal,
1619                            "Cannot write temp experiment description")
1620            else:
1621                raise service_error(service_error.req, 
1622                        "Only ns2descriptions supported")
1623            pid = "dummy"
1624            gid = "dummy"
1625            eid = "dummy"
1626
1627            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1628                str(self.muxmax), '-m', 'dummy']
1629
1630            tclcmd.extend([pid, gid, eid, tclfile])
1631
1632            self.log.debug("running local splitter %s", " ".join(tclcmd))
1633            # This is just fantastic.  As a side effect the parser copies
1634            # tb_compat.tcl into the current directory, so that directory
1635            # must be writable by the fedd user.  Doing this in the
1636            # temporary subdir ensures this is the case.
1637            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1638                    cwd=tmpdir)
1639            split_data = tclparser.stdout
1640
1641            top = topdl.topology_from_xml(file=split_data, top="experiment")
1642            os.remove(tclfile)
1643
1644        return top
1645
1646    def get_testbed_services(self, req, testbeds):
1647        """
1648        Parse the services section of the request into into two dicts mapping
1649        testbed to lists of federated_service objects.  The first lists all
1650        exporters of services, and the second all exporters of services that
1651        need control portals int the experiment.
1652        """
1653        masters = { }
1654        pmasters = { }
1655        for s in req.get('service', []):
1656            # If this is a service request with the importall field
1657            # set, fill it out.
1658
1659            if s.get('importall', False):
1660                s['import'] = [ tb for tb in testbeds \
1661                        if tb not in s.get('export',[])]
1662                del s['importall']
1663
1664            # Add the service to masters
1665            for tb in s.get('export', []):
1666                if s.get('name', None):
1667
1668                    params = { }
1669                    for a in s.get('fedAttr', []):
1670                        params[a.get('attribute', '')] = a.get('value','')
1671
1672                    fser = federated_service(name=s['name'],
1673                            exporter=tb, importers=s.get('import',[]),
1674                            params=params)
1675                    if fser.name == 'hide_hosts' \
1676                            and 'hosts' not in fser.params:
1677                        fser.params['hosts'] = \
1678                                ",".join(tb_hosts.get(fser.exporter, []))
1679                    if tb in masters: masters[tb].append(fser)
1680                    else: masters[tb] = [fser]
1681
1682                    if fser.portal:
1683                        if tb not in pmasters: pmasters[tb] = [ fser ]
1684                        else: pmasters[tb].append(fser)
1685                else:
1686                    self.log.error('Testbed service does not have name " + \
1687                            "and importers')
1688        return masters, pmasters
1689
1690    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1691        """
1692        Create the ssh keys necessary for interconnecting the potral nodes and
1693        the global hosts file for letting each segment know about the IP
1694        addresses in play.  Save these into the repo.  Add attributes to the
1695        autorizer allowing access controllers to download them and return a set
1696        of attributes that inform the segments where to find this stuff.  Mau
1697        raise service_errors in if there are problems.
1698        """
1699        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1700        gw_secretkey_base = "fed.%s" % self.ssh_type
1701        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1702        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1703
1704        try:
1705            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1706        except ValueError:
1707            raise service_error(service_error.server_config, 
1708                    "Bad key type (%s)" % self.ssh_type)
1709
1710
1711        # Copy configuration files into the remote file store
1712        # The config urlpath
1713        configpath = "/%s/config" % expid
1714        # The config file system location
1715        configdir ="%s%s" % ( self.repodir, configpath)
1716        try:
1717            os.makedirs(configdir)
1718        except EnvironmentError, e:
1719            raise service_error(service_error.internal,
1720                    "Cannot create config directory: %s" % e)
1721        try:
1722            f = open("%s/hosts" % configdir, "w")
1723            print >> f, string.join(hosts, '\n')
1724            f.close()
1725        except EnvironmentError, e:
1726            raise service_error(service_error.internal, 
1727                    "Cannot write hosts file: %s" % e)
1728        try:
1729            copy_file("%s" % gw_pubkey, "%s/%s" % \
1730                    (configdir, gw_pubkey_base))
1731            copy_file("%s" % gw_secretkey, "%s/%s" % \
1732                    (configdir, gw_secretkey_base))
1733        except EnvironmentError, e:
1734            raise service_error(service_error.internal, 
1735                    "Cannot copy keyfiles: %s" % e)
1736
1737        # Allow the individual testbeds to access the configuration files,
1738        # again by setting an attribute for the relevant pathnames on each
1739        # allocation principal.  Yeah, that's a long list comprehension.
1740        self.append_experiment_authorization(expid, set([
1741            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1742                    for tb in tbparams.keys() \
1743                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1744
1745        attrs = [ 
1746                {
1747                    'attribute': 'ssh_pubkey', 
1748                    'value': '%s/%s/config/%s' % \
1749                            (self.repo_url, expid, gw_pubkey_base)
1750                },
1751                {
1752                    'attribute': 'ssh_secretkey', 
1753                    'value': '%s/%s/config/%s' % \
1754                            (self.repo_url, expid, gw_secretkey_base)
1755                },
1756                {
1757                    'attribute': 'hosts', 
1758                    'value': '%s/%s/config/hosts' % \
1759                            (self.repo_url, expid)
1760                },
1761            ]
1762        return attrs
1763
1764
1765    def get_vtopo(self, req, fid):
1766        """
1767        Return the stored virtual topology for this experiment
1768        """
1769        rv = None
1770        state = None
1771
1772        req = req.get('VtopoRequestBody', None)
1773        if not req:
1774            raise service_error(service_error.req,
1775                    "Bad request format (no VtopoRequestBody)")
1776        exp = req.get('experiment', None)
1777        if exp:
1778            if exp.has_key('fedid'):
1779                key = exp['fedid']
1780                keytype = "fedid"
1781            elif exp.has_key('localname'):
1782                key = exp['localname']
1783                keytype = "localname"
1784            else:
1785                raise service_error(service_error.req, "Unknown lookup type")
1786        else:
1787            raise service_error(service_error.req, "No request?")
1788
1789        self.check_experiment_access(fid, key)
1790
1791        self.state_lock.acquire()
1792        if self.state.has_key(key):
1793            if self.state[key].has_key('vtopo'):
1794                rv = { 'experiment' : {keytype: key },\
1795                        'vtopo': self.state[key]['vtopo'],\
1796                    }
1797            else:
1798                state = self.state[key]['experimentStatus']
1799        self.state_lock.release()
1800
1801        if rv: return rv
1802        else: 
1803            if state:
1804                raise service_error(service_error.partial, 
1805                        "Not ready: %s" % state)
1806            else:
1807                raise service_error(service_error.req, "No such experiment")
1808
1809    def get_vis(self, req, fid):
1810        """
1811        Return the stored visualization for this experiment
1812        """
1813        rv = None
1814        state = None
1815
1816        req = req.get('VisRequestBody', None)
1817        if not req:
1818            raise service_error(service_error.req,
1819                    "Bad request format (no VisRequestBody)")
1820        exp = req.get('experiment', None)
1821        if exp:
1822            if exp.has_key('fedid'):
1823                key = exp['fedid']
1824                keytype = "fedid"
1825            elif exp.has_key('localname'):
1826                key = exp['localname']
1827                keytype = "localname"
1828            else:
1829                raise service_error(service_error.req, "Unknown lookup type")
1830        else:
1831            raise service_error(service_error.req, "No request?")
1832
1833        self.check_experiment_access(fid, key)
1834
1835        self.state_lock.acquire()
1836        if self.state.has_key(key):
1837            if self.state[key].has_key('vis'):
1838                rv =  { 'experiment' : {keytype: key },\
1839                        'vis': self.state[key]['vis'],\
1840                        }
1841            else:
1842                state = self.state[key]['experimentStatus']
1843        self.state_lock.release()
1844
1845        if rv: return rv
1846        else:
1847            if state:
1848                raise service_error(service_error.partial, 
1849                        "Not ready: %s" % state)
1850            else:
1851                raise service_error(service_error.req, "No such experiment")
1852
1853   
1854    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1855            top):
1856        """
1857        Store the various data that have changed in the experiment state
1858        between when it was started and the beginning of resource allocation.
1859        This is basically the information about each local allocation.  This
1860        fills in the values of the placeholder allocation in the state.
1861        """
1862        # save federant information
1863        for k in allocated.keys():
1864            tbparams[k]['federant'] = {
1865                    'name': [ { 'localname' : eid} ],
1866                    'allocID' : tbparams[k]['allocID'],
1867                    'uri': tbparams[k]['uri'],
1868                }
1869
1870        self.state_lock.acquire()
1871        self.state[eid]['vtopo'] = vtopo
1872        self.state[eid]['vis'] = vis
1873        self.state[eid]['experimentdescription'] = \
1874                { 'topdldescription': top.to_dict() }
1875        self.state[eid]['federant'] = \
1876                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1877                    if tbparams[tb].has_key('federant') ]
1878        if self.state_filename: 
1879            self.write_state()
1880        self.state_lock.release()
1881
1882    def clear_placeholder(self, eid, expid, tmpdir):
1883        """
1884        Clear the placeholder and remove any allocated temporary dir.
1885        """
1886
1887        self.state_lock.acquire()
1888        del self.state[eid]
1889        del self.state[expid]
1890        if self.state_filename: self.write_state()
1891        self.state_lock.release()
1892        if tmpdir and self.cleanup:
1893            self.remove_dirs(tmpdir)
1894
1895    # end of create_experiment sub-functions
1896
1897    def create_experiment(self, req, fid):
1898        """
1899        The external interface to experiment creation called from the
1900        dispatcher.
1901
1902        Creates a working directory, splits the incoming description using the
1903        splitter script and parses out the various subsections using the
1904        classes above.  Once each sub-experiment is created, use pooled threads
1905        to instantiate them and start it all up.
1906        """
1907
1908        req = req.get('CreateRequestBody', None)
1909        if req:
1910            key = self.get_experiment_key(req)
1911        else:
1912            raise service_error(service_error.req,
1913                    "Bad request format (no CreateRequestBody)")
1914
1915        # Import information from the requester
1916        if self.auth.import_credentials(data_list=req.get('credential', [])):
1917            self.auth.save()
1918
1919        # Make sure that the caller can talk to us
1920        self.check_experiment_access(fid, key)
1921
1922        # Install the testbed map entries supplied with the request into a copy
1923        # of the testbed map.
1924        tbmap = dict(self.tbmap)
1925        for m in req.get('testbedmap', []):
1926            if 'testbed' in m and 'uri' in m:
1927                tbmap[m['testbed']] = m['uri']
1928
1929        # a place to work
1930        try:
1931            tmpdir = tempfile.mkdtemp(prefix="split-")
1932            os.mkdir(tmpdir+"/keys")
1933        except EnvironmentError:
1934            raise service_error(service_error.internal, "Cannot create tmp dir")
1935
1936        tbparams = { }
1937
1938        eid, expid, expcert_file = \
1939                self.get_experiment_ids_and_start(key, tmpdir)
1940
1941        # This catches exceptions to clear the placeholder if necessary
1942        try: 
1943            if not (eid and expid):
1944                raise service_error(service_error.internal, 
1945                        "Cannot find local experiment info!?")
1946
1947            top = self.get_topology(req, tmpdir)
1948            # Assign the IPs
1949            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1950            # Find the testbeds to look up
1951            tb_hosts = { }
1952            testbeds = [ ]
1953            for e in top.elements:
1954                if isinstance(e, topdl.Computer):
1955                    tb = e.get_attribute('testbed') or 'default'
1956                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1957                    else: 
1958                        tb_hosts[tb] = [ e.name ]
1959                        testbeds.append(tb)
1960
1961            masters, pmasters = self.get_testbed_services(req, testbeds)
1962            allocated = { }         # Testbeds we can access
1963            topo ={ }               # Sub topologies
1964            connInfo = { }          # Connection information
1965
1966            self.get_access_to_testbeds(testbeds, fid, allocated, 
1967                    tbparams, masters, tbmap, expid, expcert_file)
1968
1969            self.split_topology(top, topo, testbeds)
1970
1971            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
1972
1973            part = experiment_partition(self.auth, self.store_url, tbmap,
1974                    self.muxmax, self.direct_transit)
1975            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1976                    connInfo, expid)
1977
1978            auth_attrs = set()
1979            # Now get access to the dynamic testbeds (those added above)
1980            for tb in [ t for t in topo if t not in allocated]:
1981                self.get_access(tb, tbparams, fid, masters, tbmap, 
1982                        expid, expcert_file)
1983                allocated[tb] = 1
1984                store_keys = topo[tb].get_attribute('store_keys')
1985                # Give the testbed access to keys it exports or imports
1986                if store_keys:
1987                    auth_keys.update(set([
1988                        (tbparams[tb]['allocID']['fedid'], sk) \
1989                                for sk in store_keys.split(" ")]))
1990
1991            if auth_attrs:
1992                self.append_experiment_authorization(expid, auth_attrs)
1993
1994            # transit and disconnected testbeds may not have a connInfo entry.
1995            # Fill in the blanks.
1996            for t in allocated.keys():
1997                if not connInfo.has_key(t):
1998                    connInfo[t] = { }
1999
2000            self.wrangle_software(expid, top, topo, tbparams)
2001
2002            vtopo = topdl.topology_to_vtopo(top)
2003            vis = self.genviz(vtopo)
2004            self.save_federant_information(allocated, tbparams, eid, vtopo, 
2005                    vis, top)
2006        except service_error, e:
2007            # If something goes wrong in the parse (usually an access error)
2008            # clear the placeholder state.  From here on out the code delays
2009            # exceptions.  Failing at this point returns a fault to the remote
2010            # caller.
2011            self.clear_placeholder(eid, expid, tmpdir)
2012            raise e
2013
2014        # Start the background swapper and return the starting state.  From
2015        # here on out, the state will stick around a while.
2016
2017        # XXX: I think this is redundant
2018        # Let users touch the state
2019        # self.auth.set_attribute(fid, expid)
2020        # self.auth.set_attribute(expid, expid)
2021        # Override fedids can manipulate state as well
2022        # for o in self.overrides:
2023            # self.auth.set_attribute(o, expid)
2024        # self.auth.save()
2025
2026        # Create a logger that logs to the experiment's state object as well as
2027        # to the main log file.
2028        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2029        alloc_collector = self.list_log(self.state[eid]['log'])
2030        h = logging.StreamHandler(alloc_collector)
2031        # XXX: there should be a global one of these rather than repeating the
2032        # code.
2033        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2034                    '%d %b %y %H:%M:%S'))
2035        alloc_log.addHandler(h)
2036
2037        # Start a thread to do the resource allocation
2038        t  = Thread(target=self.allocate_resources,
2039                args=(allocated, masters, eid, expid, tbparams, 
2040                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2041                    connInfo, tbmap, expcert_file),
2042                name=eid)
2043        t.start()
2044
2045        rv = {
2046                'experimentID': [
2047                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2048                ],
2049                'experimentStatus': 'starting',
2050            }
2051
2052        return rv
2053   
2054    def get_experiment_fedid(self, key):
2055        """
2056        find the fedid associated with the localname key in the state database.
2057        """
2058
2059        rv = None
2060        self.state_lock.acquire()
2061        if self.state.has_key(key):
2062            if isinstance(self.state[key], dict):
2063                try:
2064                    kl = [ f['fedid'] for f in \
2065                            self.state[key]['experimentID']\
2066                                if f.has_key('fedid') ]
2067                except KeyError:
2068                    self.state_lock.release()
2069                    raise service_error(service_error.internal, 
2070                            "No fedid for experiment %s when getting "+\
2071                                    "fedid(!?)" % key)
2072                if len(kl) == 1:
2073                    rv = kl[0]
2074                else:
2075                    self.state_lock.release()
2076                    raise service_error(service_error.internal, 
2077                            "multiple fedids for experiment %s when " +\
2078                                    "getting fedid(!?)" % key)
2079            else:
2080                self.state_lock.release()
2081                raise service_error(service_error.internal, 
2082                        "Unexpected state for %s" % key)
2083        self.state_lock.release()
2084        return rv
2085
2086    def check_experiment_access(self, fid, key):
2087        """
2088        Confirm that the fid has access to the experiment.  Though a request
2089        may be made in terms of a local name, the access attribute is always
2090        the experiment's fedid.
2091        """
2092        if not isinstance(key, fedid):
2093            key = self.get_experiment_fedid(key)
2094
2095        if self.auth.check_attribute(fid, key):
2096            return True
2097        else:
2098            raise service_error(service_error.access, "Access Denied")
2099
2100
2101    def get_handler(self, path, fid):
2102        """
2103        Perhaps surprisingly named, this function handles HTTP GET requests to
2104        this server (SOAP requests are POSTs).
2105        """
2106        self.log.info("Get handler %s %s" % (path, fid))
2107        if self.auth.check_attribute(fid, path):
2108            return ("%s/%s" % (self.repodir, path), "application/binary")
2109        else:
2110            return (None, None)
2111
2112    def clean_info_response(self, rv):
2113        """
2114        Remove the information in the experiment's state object that is not in
2115        the info response.
2116        """
2117        # Remove the owner info (should always be there, but...)
2118        if rv.has_key('owner'): del rv['owner']
2119        if 'auth' in rv: del rv['auth']
2120
2121        # Convert the log into the allocationLog parameter and remove the
2122        # log entry (with defensive programming)
2123        if rv.has_key('log'):
2124            rv['allocationLog'] = "".join(rv['log'])
2125            del rv['log']
2126        else:
2127            rv['allocationLog'] = ""
2128
2129        if rv['experimentStatus'] != 'active':
2130            if rv.has_key('federant'): del rv['federant']
2131        else:
2132            # remove the allocationID and uri info from each federant
2133            for f in rv.get('federant', []):
2134                if f.has_key('allocID'): del f['allocID']
2135                if f.has_key('uri'): del f['uri']
2136
2137        return rv
2138
2139    def get_info(self, req, fid):
2140        """
2141        Return all the stored info about this experiment
2142        """
2143        rv = None
2144
2145        req = req.get('InfoRequestBody', None)
2146        if not req:
2147            raise service_error(service_error.req,
2148                    "Bad request format (no InfoRequestBody)")
2149        exp = req.get('experiment', None)
2150        if exp:
2151            if exp.has_key('fedid'):
2152                key = exp['fedid']
2153                keytype = "fedid"
2154            elif exp.has_key('localname'):
2155                key = exp['localname']
2156                keytype = "localname"
2157            else:
2158                raise service_error(service_error.req, "Unknown lookup type")
2159        else:
2160            raise service_error(service_error.req, "No request?")
2161
2162        self.check_experiment_access(fid, key)
2163
2164        # The state may be massaged by the service function that called
2165        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2166        # state.
2167        self.state_lock.acquire()
2168        if self.state.has_key(key):
2169            rv = copy.deepcopy(self.state[key])
2170        self.state_lock.release()
2171
2172        if rv:
2173            return self.clean_info_response(rv)
2174        else:
2175            raise service_error(service_error.req, "No such experiment")
2176
2177    def get_multi_info(self, req, fid):
2178        """
2179        Return all the stored info that this fedid can access
2180        """
2181        rv = { 'info': [ ] }
2182
2183        self.state_lock.acquire()
2184        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2185            try:
2186                self.check_experiment_access(fid, key)
2187            except service_error, e:
2188                if e.code == service_error.access:
2189                    continue
2190                else:
2191                    self.state_lock.release()
2192                    raise e
2193
2194            if self.state.has_key(key):
2195                e = copy.deepcopy(self.state[key])
2196                e = self.clean_info_response(e)
2197                rv['info'].append(e)
2198        self.state_lock.release()
2199        return rv
2200
2201    def check_termination_status(self, fed_exp, force):
2202        """
2203        Confirm that the experiment is sin a valid state to stop (or force it)
2204        return the state - invalid states for deletion and force settings cause
2205        exceptions.
2206        """
2207        self.state_lock.acquire()
2208        status = fed_exp.get('experimentStatus', None)
2209
2210        if status:
2211            if status in ('starting', 'terminating'):
2212                if not force:
2213                    self.state_lock.release()
2214                    raise service_error(service_error.partial, 
2215                            'Experiment still being created or destroyed')
2216                else:
2217                    self.log.warning('Experiment in %s state ' % status + \
2218                            'being terminated by force.')
2219            self.state_lock.release()
2220            return status
2221        else:
2222            # No status??? trouble
2223            self.state_lock.release()
2224            raise service_error(service_error.internal,
2225                    "Experiment has no status!?")
2226
2227
2228    def get_termination_info(self, fed_exp):
2229        ids = []
2230        term_params = { }
2231        self.state_lock.acquire()
2232        #  experimentID is a list of dicts that are self-describing
2233        #  identifiers.  This finds all the fedids and localnames - the
2234        #  keys of self.state - and puts them into ids, which is used to delete
2235        #  the state after everything is swapped out.
2236        for id in fed_exp.get('experimentID', []):
2237            if 'fedid' in id: 
2238                ids.append(id['fedid'])
2239                repo = "%s" % id['fedid']
2240            if 'localname' in id: ids.append(id['localname'])
2241
2242        # Get the experimentAccess - the principal for this experiment.  It
2243        # is this principal to which credentials have been delegated, and
2244        # as which the experiment controller must act.
2245        if 'experimentAccess' in fed_exp and \
2246                'X509' in fed_exp['experimentAccess']:
2247            expcert = fed_exp['experimentAccess']['X509']
2248        else:
2249            expcert = None
2250
2251        # Collect the allocation/segment ids into a dict keyed by the fedid
2252        # of the allocation (or a monotonically increasing integer) that
2253        # contains a tuple of uri, aid (which is a dict...)
2254        for i, fed in enumerate(fed_exp.get('federant', [])):
2255            try:
2256                uri = fed['uri']
2257                aid = fed['allocID']
2258                k = fed['allocID'].get('fedid', i)
2259            except KeyError, e:
2260                continue
2261            term_params[k] = (uri, aid)
2262        # Change the experiment state
2263        fed_exp['experimentStatus'] = 'terminating'
2264        if self.state_filename: self.write_state()
2265        self.state_lock.release()
2266
2267        return ids, term_params, expcert, repo
2268
2269
2270    def deallocate_resources(self, term_params, expcert, status, force, 
2271            dealloc_log):
2272        tmpdir = None
2273        # This try block makes sure the tempdir is cleared
2274        try:
2275            # If no expcert, try the deallocation as the experiment
2276            # controller instance.
2277            if expcert and self.auth_type != 'legacy': 
2278                try:
2279                    tmpdir = tempfile.mkdtemp(prefix="term-")
2280                except EnvironmentError:
2281                    raise service_error(service_error.internal, 
2282                            "Cannot create tmp dir")
2283                cert_file = self.make_temp_certfile(expcert, tmpdir)
2284                pw = None
2285            else: 
2286                cert_file = self.cert_file
2287                pw = self.cert_pwd
2288
2289            # Stop everyone.  NB, wait_for_all waits until a thread starts
2290            # and then completes, so we can't wait if nothing starts.  So,
2291            # no tbparams, no start.
2292            if len(term_params) > 0:
2293                tp = thread_pool(self.nthreads)
2294                for k, (uri, aid) in term_params.items():
2295                    # Create and start a thread to stop the segment
2296                    tp.wait_for_slot()
2297                    t  = pooled_thread(\
2298                            target=self.terminate_segment(log=dealloc_log,
2299                                testbed=uri,
2300                                cert_file=cert_file, 
2301                                cert_pwd=pw,
2302                                trusted_certs=self.trusted_certs,
2303                                caller=self.call_TerminateSegment),
2304                            args=(uri, aid), name=k,
2305                            pdata=tp, trace_file=self.trace_file)
2306                    t.start()
2307                # Wait for completions
2308                tp.wait_for_all_done()
2309
2310            # release the allocations (failed experiments have done this
2311            # already, and starting experiments may be in odd states, so we
2312            # ignore errors releasing those allocations
2313            try: 
2314                for k, (uri, aid)  in term_params.items():
2315                    self.release_access(None, aid, uri=uri,
2316                            cert_file=cert_file, cert_pwd=pw)
2317            except service_error, e:
2318                if status != 'failed' and not force:
2319                    raise e
2320
2321        # Clean up the tmpdir no matter what
2322        finally:
2323            if tmpdir: self.remove_dirs(tmpdir)
2324
2325
2326    def terminate_experiment(self, req, fid):
2327        """
2328        Swap this experiment out on the federants and delete the shared
2329        information
2330        """
2331        tbparams = { }
2332        req = req.get('TerminateRequestBody', None)
2333        if not req:
2334            raise service_error(service_error.req,
2335                    "Bad request format (no TerminateRequestBody)")
2336
2337        key = self.get_experiment_key(req, 'experiment')
2338        self.check_experiment_access(fid, key)
2339        exp = req.get('experiment', False)
2340        force = req.get('force', False)
2341
2342        dealloc_list = [ ]
2343
2344
2345        # Create a logger that logs to the dealloc_list as well as to the main
2346        # log file.
2347        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2348        h = logging.StreamHandler(self.list_log(dealloc_list))
2349        # XXX: there should be a global one of these rather than repeating the
2350        # code.
2351        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2352                    '%d %b %y %H:%M:%S'))
2353        dealloc_log.addHandler(h)
2354
2355        self.state_lock.acquire()
2356        fed_exp = self.state.get(key, None)
2357        self.state_lock.release()
2358        repo = None
2359
2360        if fed_exp:
2361            status = self.check_termination_status(fed_exp, force)
2362            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2363            self.deallocate_resources(term_params, expcert, status, force, 
2364                    dealloc_log)
2365
2366            # Remove the terminated experiment
2367            self.state_lock.acquire()
2368            for id in ids:
2369                self.clear_experiment_authorizaton(id, need_state_lock=False)
2370                if id in self.state: del self.state[id]
2371
2372            if self.state_filename: self.write_state()
2373            self.state_lock.release()
2374
2375            # Delete any synch points associated with this experiment.  All
2376            # synch points begin with the fedid of the experiment.
2377            fedid_keys = set(["fedid:%s" % f for f in ids \
2378                    if isinstance(f, fedid)])
2379            for k in self.synch_store.all_keys():
2380                try:
2381                    if len(k) > 45 and k[0:46] in fedid_keys:
2382                        self.synch_store.del_value(k)
2383                except synch_store.BadDeletionError:
2384                    pass
2385            self.write_store()
2386
2387            # Remove software and other cached stuff from the filesystem.
2388            if repo:
2389                self.remove_dirs("%s/%s" % (self.repodir, repo))
2390               
2391            return { 
2392                    'experiment': exp , 
2393                    'deallocationLog': string.join(dealloc_list, ''),
2394                    }
2395        else:
2396            raise service_error(service_error.req, "No saved state")
2397
2398
2399    def GetValue(self, req, fid):
2400        """
2401        Get a value from the synchronized store
2402        """
2403        req = req.get('GetValueRequestBody', None)
2404        if not req:
2405            raise service_error(service_error.req,
2406                    "Bad request format (no GetValueRequestBody)")
2407       
2408        name = req.get('name', None)
2409        wait = req.get('wait', False)
2410        rv = { 'name': name }
2411
2412        if name and self.auth.check_attribute(fid, name):
2413            self.log.debug("[GetValue] asking for %s " % name)
2414            try:
2415                v = self.synch_store.get_value(name, wait)
2416            except synch_store.RevokedKeyError:
2417                # No more synch on this key
2418                raise service_error(service_error.federant, 
2419                        "Synch key %s revoked" % name)
2420            if v is not None:
2421                rv['value'] = v
2422            self.log.debug("[GetValue] got %s from %s" % (v, name))
2423            return rv
2424        else:
2425            raise service_error(service_error.access, "Access Denied")
2426       
2427
2428    def SetValue(self, req, fid):
2429        """
2430        Set a value in the synchronized store
2431        """
2432        req = req.get('SetValueRequestBody', None)
2433        if not req:
2434            raise service_error(service_error.req,
2435                    "Bad request format (no SetValueRequestBody)")
2436       
2437        name = req.get('name', None)
2438        v = req.get('value', '')
2439
2440        if name and self.auth.check_attribute(fid, name):
2441            try:
2442                self.synch_store.set_value(name, v)
2443                self.write_store()
2444                self.log.debug("[SetValue] set %s to %s" % (name, v))
2445            except synch_store.CollisionError:
2446                # Translate into a service_error
2447                raise service_error(service_error.req,
2448                        "Value already set: %s" %name)
2449            except synch_store.RevokedKeyError:
2450                # No more synch on this key
2451                raise service_error(service_error.federant, 
2452                        "Synch key %s revoked" % name)
2453            return { 'name': name, 'value': v }
2454        else:
2455            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.