source: fedd/federation/experiment_control.py @ 5ecb9a3

axis_examplecompt_changesinfo-ops
Last change on this file since 5ecb9a3 was 5ecb9a3, checked in by Ted Faber <faber@…>, 13 years ago

Checkpoint along the path to #10

Several create_experiment subtasks have been brokern out and the
get_access and get_legacy access signatures have been unified.

Part of #10

  • Property mode set to 100644
File size: 80.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205            self.get_access = self.legacy_get_access
206        elif self.auth_type == 'abac':
207            self.auth = abac_authorizer(load=self.auth_dir)
208        else:
209            raise service_error(service_error.internal, 
210                    "Unknown auth_type: %s" % self.auth_type)
211
212
213        if self.ssh_pubkey_file:
214            try:
215                f = open(self.ssh_pubkey_file, 'r')
216                self.ssh_pubkey = f.read()
217                f.close()
218            except EnvironmentError:
219                raise service_error(service_error.internal,
220                        "Cannot read sshpubkey")
221        else:
222            raise service_error(service_error.internal, 
223                    "No SSH public key file?")
224
225        if not self.ssh_privkey_file:
226            raise service_error(service_error.internal, 
227                    "No SSH public key file?")
228
229
230        if mapdb_file:
231            self.read_mapdb(mapdb_file)
232        else:
233            self.log.warn("[experiment_control] No testbed map, using defaults")
234            self.tbmap = { 
235                    'deter':'https://users.isi.deterlab.net:23235',
236                    'emulab':'https://users.isi.deterlab.net:23236',
237                    'ucb':'https://users.isi.deterlab.net:23237',
238                    }
239
240        if accessdb_file:
241                self.read_accessdb(accessdb_file)
242        else:
243            raise service_error(service_error.internal,
244                    "No accessdb specified in config")
245
246        # Grab saved state.  OK to do this w/o locking because it's read only
247        # and only one thread should be in existence that can see self.state at
248        # this point.
249        if self.state_filename:
250            self.read_state()
251
252        if self.store_filename:
253            self.read_store()
254        else:
255            self.log.warning("No saved synch store")
256            self.synch_store = synch_store
257
258        # Dispatch tables
259        self.soap_services = {\
260                'New': soap_handler('New', self.new_experiment),
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
266                'Terminate': soap_handler('Terminate', 
267                    self.terminate_experiment),
268                'GetValue': soap_handler('GetValue', self.GetValue),
269                'SetValue': soap_handler('SetValue', self.SetValue),
270        }
271
272        self.xmlrpc_services = {\
273                'New': xmlrpc_handler('New', self.new_experiment),
274                'Create': xmlrpc_handler('Create', self.create_experiment),
275                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
276                'Vis': xmlrpc_handler('Vis', self.get_vis),
277                'Info': xmlrpc_handler('Info', self.get_info),
278                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
279                'Terminate': xmlrpc_handler('Terminate',
280                    self.terminate_experiment),
281                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
282                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
283        }
284
285    # Call while holding self.state_lock
286    def write_state(self):
287        """
288        Write a new copy of experiment state after copying the existing state
289        to a backup.
290
291        State format is a simple pickling of the state dictionary.
292        """
293        if os.access(self.state_filename, os.W_OK):
294            copy_file(self.state_filename, \
295                    "%s.bak" % self.state_filename)
296        try:
297            f = open(self.state_filename, 'w')
298            pickle.dump(self.state, f)
299        except EnvironmentError, e:
300            self.log.error("Can't write file %s: %s" % \
301                    (self.state_filename, e))
302        except pickle.PicklingError, e:
303            self.log.error("Pickling problem: %s" % e)
304        except TypeError, e:
305            self.log.error("Pickling problem (TypeError): %s" % e)
306
307    @staticmethod
308    def get_alloc_ids(state):
309        """
310        Pull the fedids of the identifiers of each allocation from the
311        state.  Again, a dict dive that's best isolated.
312
313        Used by read_store and read state
314        """
315
316        return [ f['allocID']['fedid'] 
317                for f in state.get('federant',[]) \
318                    if f.has_key('allocID') and \
319                        f['allocID'].has_key('fedid')]
320
321    # Call while holding self.state_lock
322    def read_state(self):
323        """
324        Read a new copy of experiment state.  Old state is overwritten.
325
326        State format is a simple pickling of the state dictionary.
327        """
328       
329        def get_experiment_id(state):
330            """
331            Pull the fedid experimentID out of the saved state.  This is kind
332            of a gross walk through the dict.
333            """
334
335            if state.has_key('experimentID'):
336                for e in state['experimentID']:
337                    if e.has_key('fedid'):
338                        return e['fedid']
339                else:
340                    return None
341            else:
342                return None
343
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except EnvironmentError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355       
356        for s in self.state.values():
357            try:
358
359                eid = get_experiment_id(s)
360                if eid : 
361                    if self.auth_type == 'legacy':
362                        # XXX: legacy
363                        # Give the owner rights to the experiment
364                        self.auth.set_attribute(s['owner'], eid)
365                        # And holders of the eid as well
366                        self.auth.set_attribute(eid, eid)
367                        # allow overrides to control experiments as well
368                        for o in self.overrides:
369                            self.auth.set_attribute(o, eid)
370                        # Set permissions to allow reading of the software
371                        # repo, if any, as well.
372                        for a in self.get_alloc_ids(s):
373                            self.auth.set_attribute(a, 'repo/%s' % eid)
374                else:
375                    raise KeyError("No experiment id")
376            except KeyError, e:
377                self.log.warning("[read_state]: State ownership or identity " +\
378                        "misformatted in %s: %s" % (self.state_filename, e))
379
380
381    def read_accessdb(self, accessdb_file):
382        """
383        Read the mapping from fedids that can create experiments to their name
384        in the 3-level access namespace.  All will be asserted from this
385        testbed and can include the local username and porject that will be
386        asserted on their behalf by this fedd.  Each fedid is also added to the
387        authorization system with the "create" attribute.
388        """
389        self.accessdb = {}
390        # These are the regexps for parsing the db
391        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
392        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
394        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
395                "\s*->\s*(" + name_expr + ")\s*$")
396        lineno = 0
397
398        # Parse the mappings and store in self.authdb, a dict of
399        # fedid -> (proj, user)
400        try:
401            f = open(accessdb_file, "r")
402            for line in f:
403                lineno += 1
404                line = line.strip()
405                if len(line) == 0 or line.startswith('#'):
406                    continue
407                m = project_line.match(line)
408                if m:
409                    fid = fedid(hexstr=m.group(1))
410                    project, user = m.group(2,3)
411                    if not self.accessdb.has_key(fid):
412                        self.accessdb[fid] = []
413                    self.accessdb[fid].append((project, user))
414                    continue
415
416                m = user_line.match(line)
417                if m:
418                    fid = fedid(hexstr=m.group(1))
419                    project = None
420                    user = m.group(2)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425                self.log.warn("[experiment_control] Error parsing access " +\
426                        "db %s at line %d" %  (accessdb_file, lineno))
427        except EnvironmentError:
428            raise service_error(service_error.internal,
429                    ("Error opening/reading %s as experiment " +\
430                            "control accessdb") %  accessdb_file)
431        f.close()
432
433        # Initialize the authorization attributes
434        # XXX: legacy
435        if self.auth_type == 'legacy':
436            for fid in self.accessdb.keys():
437                self.auth.set_attribute(fid, 'create')
438                self.auth.set_attribute(fid, 'new')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except EnvironmentError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def read_store(self):
467        try:
468            self.synch_store = synch_store()
469            self.synch_store.load(self.store_filename)
470            self.log.debug("[read_store]: Read store from %s" % \
471                    self.store_filename)
472        except EnvironmentError, e:
473            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
474                    % (self.state_filename, e))
475            self.synch_store = synch_store()
476
477        # Set the initial permissions on data in the store.  XXX: This ad hoc
478        # authorization attribute initialization is getting out of hand.
479        # XXX: legacy
480        if self.auth_type == 'legacy':
481            for k in self.synch_store.all_keys():
482                try:
483                    if k.startswith('fedid:'):
484                        fid = fedid(hexstr=k[6:46])
485                        if self.state.has_key(fid):
486                            for a in self.get_alloc_ids(self.state[fid]):
487                                self.auth.set_attribute(a, k)
488                except ValueError, e:
489                    self.log.warn("Cannot deduce permissions for %s" % k)
490
491
492    def write_store(self):
493        """
494        Write a new copy of synch_store after writing current state
495        to a backup.  We use the internal synch_store pickle method to avoid
496        incinsistent data.
497
498        State format is a simple pickling of the store.
499        """
500        if os.access(self.store_filename, os.W_OK):
501            copy_file(self.store_filename, \
502                    "%s.bak" % self.store_filename)
503        try:
504            self.synch_store.save(self.store_filename)
505        except EnvironmentError, e:
506            self.log.error("Can't write file %s: %s" % \
507                    (self.store_filename, e))
508        except TypeError, e:
509            self.log.error("Pickling problem (TypeError): %s" % e)
510
511       
512    def generate_ssh_keys(self, dest, type="rsa" ):
513        """
514        Generate a set of keys for the gateways to use to talk.
515
516        Keys are of type type and are stored in the required dest file.
517        """
518        valid_types = ("rsa", "dsa")
519        t = type.lower();
520        if t not in valid_types: raise ValueError
521        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
522
523        try:
524            trace = open("/dev/null", "w")
525        except EnvironmentError:
526            raise service_error(service_error.internal,
527                    "Cannot open /dev/null??");
528
529        # May raise CalledProcessError
530        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
531        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
532        if rv != 0:
533            raise service_error(service_error.internal, 
534                    "Cannot generate nonce ssh keys.  %s return code %d" \
535                            % (self.ssh_keygen, rv))
536
537    def gentopo(self, str):
538        """
539        Generate the topology data structure from the splitter's XML
540        representation of it.
541
542        The topology XML looks like:
543            <experiment>
544                <nodes>
545                    <node><vname></vname><ips>ip1:ip2</ips></node>
546                </nodes>
547                <lans>
548                    <lan>
549                        <vname></vname><vnode></vnode><ip></ip>
550                        <bandwidth></bandwidth><member>node:port</member>
551                    </lan>
552                </lans>
553        """
554        class topo_parse:
555            """
556            Parse the topology XML and create the dats structure.
557            """
558            def __init__(self):
559                # Typing of the subelements for data conversion
560                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
561                self.int_subelements = ( 'bandwidth',)
562                self.float_subelements = ( 'delay',)
563                # The final data structure
564                self.nodes = [ ]
565                self.lans =  [ ]
566                self.topo = { \
567                        'node': self.nodes,\
568                        'lan' : self.lans,\
569                    }
570                self.element = { }  # Current element being created
571                self.chars = ""     # Last text seen
572
573            def end_element(self, name):
574                # After each sub element the contents is added to the current
575                # element or to the appropriate list.
576                if name == 'node':
577                    self.nodes.append(self.element)
578                    self.element = { }
579                elif name == 'lan':
580                    self.lans.append(self.element)
581                    self.element = { }
582                elif name in self.str_subelements:
583                    self.element[name] = self.chars
584                    self.chars = ""
585                elif name in self.int_subelements:
586                    self.element[name] = int(self.chars)
587                    self.chars = ""
588                elif name in self.float_subelements:
589                    self.element[name] = float(self.chars)
590                    self.chars = ""
591
592            def found_chars(self, data):
593                self.chars += data.rstrip()
594
595
596        tp = topo_parse();
597        parser = xml.parsers.expat.ParserCreate()
598        parser.EndElementHandler = tp.end_element
599        parser.CharacterDataHandler = tp.found_chars
600
601        parser.Parse(str)
602
603        return tp.topo
604       
605
606    def genviz(self, topo):
607        """
608        Generate the visualization the virtual topology
609        """
610
611        neato = "/usr/local/bin/neato"
612        # These are used to parse neato output and to create the visualization
613        # file.
614        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
615        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
616                "%s</type></node>"
617
618        try:
619            # Node names
620            nodes = [ n['vname'] for n in topo['node'] ]
621            topo_lans = topo['lan']
622        except KeyError, e:
623            raise service_error(service_error.internal, "Bad topology: %s" %e)
624
625        lans = { }
626        links = { }
627
628        # Walk through the virtual topology, organizing the connections into
629        # 2-node connections (links) and more-than-2-node connections (lans).
630        # When a lan is created, it's added to the list of nodes (there's a
631        # node in the visualization for the lan).
632        for l in topo_lans:
633            if links.has_key(l['vname']):
634                if len(links[l['vname']]) < 2:
635                    links[l['vname']].append(l['vnode'])
636                else:
637                    nodes.append(l['vname'])
638                    lans[l['vname']] = links[l['vname']]
639                    del links[l['vname']]
640                    lans[l['vname']].append(l['vnode'])
641            elif lans.has_key(l['vname']):
642                lans[l['vname']].append(l['vnode'])
643            else:
644                links[l['vname']] = [ l['vnode'] ]
645
646
647        # Open up a temporary file for dot to turn into a visualization
648        try:
649            df, dotname = tempfile.mkstemp()
650            dotfile = os.fdopen(df, 'w')
651        except EnvironmentError:
652            raise service_error(service_error.internal,
653                    "Failed to open file in genviz")
654
655        try:
656            dnull = open('/dev/null', 'w')
657        except EnvironmentError:
658            service_error(service_error.internal,
659                    "Failed to open /dev/null in genviz")
660
661        # Generate a dot/neato input file from the links, nodes and lans
662        try:
663            print >>dotfile, "graph G {"
664            for n in nodes:
665                print >>dotfile, '\t"%s"' % n
666            for l in links.keys():
667                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
668            for l in lans.keys():
669                for n in lans[l]:
670                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
671            print >>dotfile, "}"
672            dotfile.close()
673        except TypeError:
674            raise service_error(service_error.internal,
675                    "Single endpoint link in vtopo")
676        except EnvironmentError:
677            raise service_error(service_error.internal, "Cannot write dot file")
678
679        # Use dot to create a visualization
680        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
681                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
682                close_fds=True)
683        dnull.close()
684
685        # Translate dot to vis format
686        vis_nodes = [ ]
687        vis = { 'node': vis_nodes }
688        for line in dot.stdout:
689            m = vis_re.match(line)
690            if m:
691                vn = m.group(1)
692                vis_node = {'name': vn, \
693                        'x': float(m.group(2)),\
694                        'y' : float(m.group(3)),\
695                    }
696                if vn in links.keys() or vn in lans.keys():
697                    vis_node['type'] = 'lan'
698                else:
699                    vis_node['type'] = 'node'
700                vis_nodes.append(vis_node)
701        rv = dot.wait()
702
703        os.remove(dotname)
704        if rv == 0 : return vis
705        else: return None
706
707
708    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
709            cert_pwd=None):
710        """
711        Release access to testbed through fedd
712        """
713
714        if not uri and tbmap:
715            uri = tbmap.get(tb, None)
716        if not uri:
717            raise service_error(service_error.server_config, 
718                    "Unknown testbed: %s" % tb)
719
720        if self.local_access.has_key(uri):
721            resp = self.local_access[uri].ReleaseAccess(\
722                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
723                    fedid(file=cert_file))
724            resp = { 'ReleaseAccessResponseBody': resp } 
725        else:
726            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
727                    cert_file, cert_pwd, self.trusted_certs)
728
729        # better error coding
730
731    def remote_ns2topdl(self, uri, desc):
732
733        req = {
734                'description' : { 'ns2description': desc },
735            }
736
737        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
738                self.trusted_certs)
739
740        if r.has_key('Ns2TopdlResponseBody'):
741            r = r['Ns2TopdlResponseBody']
742            ed = r.get('experimentdescription', None)
743            if ed.has_key('topdldescription'):
744                return topdl.Topology(**ed['topdldescription'])
745            else:
746                raise service_error(service_error.protocol, 
747                        "Bad splitter response (no output)")
748        else:
749            raise service_error(service_error.protocol, "Bad splitter response")
750
751    class start_segment:
752        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
753                cert_pwd=None, trusted_certs=None, caller=None,
754                log_collector=None):
755            self.log = log
756            self.debug = debug
757            self.cert_file = cert_file
758            self.cert_pwd = cert_pwd
759            self.trusted_certs = None
760            self.caller = caller
761            self.testbed = testbed
762            self.log_collector = log_collector
763            self.response = None
764            self.node = { }
765
766        def make_map(self, resp):
767            for e in resp.get('embedding', []):
768                if 'toponame' in e and 'physname' in e:
769                    self.node[e['toponame']] = e['physname'][0]
770
771        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
772            req = {
773                    'allocID': { 'fedid' : aid }, 
774                    'segmentdescription': { 
775                        'topdldescription': topo.to_dict(),
776                    },
777                }
778
779            if connInfo:
780                req['connection'] = connInfo
781
782            import_svcs = [ s for m in masters.values() \
783                    for s in m if self.testbed in s.importers]
784
785            if import_svcs or self.testbed in masters:
786                req['service'] = []
787
788            for s in import_svcs:
789                for r in s.reqs:
790                    sr = copy.deepcopy(r)
791                    sr['visibility'] = 'import';
792                    req['service'].append(sr)
793
794            for s in masters.get(self.testbed, []):
795                for r in s.reqs:
796                    sr = copy.deepcopy(r)
797                    sr['visibility'] = 'export';
798                    req['service'].append(sr)
799
800            if attrs:
801                req['fedAttr'] = attrs
802
803            try:
804                self.log.debug("Calling StartSegment at %s " % uri)
805                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
806                        self.trusted_certs)
807                if r.has_key('StartSegmentResponseBody'):
808                    lval = r['StartSegmentResponseBody'].get('allocationLog',
809                            None)
810                    if lval and self.log_collector:
811                        for line in  lval.splitlines(True):
812                            self.log_collector.write(line)
813                    self.make_map(r['StartSegmentResponseBody'])
814                    self.response = r
815                else:
816                    raise service_error(service_error.internal, 
817                            "Bad response!?: %s" %r)
818                return True
819            except service_error, e:
820                self.log.error("Start segment failed on %s: %s" % \
821                        (self.testbed, e))
822                return False
823
824
825
826    class terminate_segment:
827        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
828                cert_pwd=None, trusted_certs=None, caller=None):
829            self.log = log
830            self.debug = debug
831            self.cert_file = cert_file
832            self.cert_pwd = cert_pwd
833            self.trusted_certs = None
834            self.caller = caller
835            self.testbed = testbed
836
837        def __call__(self, uri, aid ):
838            req = {
839                    'allocID': aid , 
840                }
841            try:
842                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
843                        self.trusted_certs)
844                return True
845            except service_error, e:
846                self.log.error("Terminate segment failed on %s: %s" % \
847                        (self.testbed, e))
848                return False
849   
850
851    def allocate_resources(self, allocated, masters, eid, expid, 
852            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
853            attrs=None, connInfo={}, tbmap=None, expcert=None):
854
855        started = { }           # Testbeds where a sub-experiment started
856                                # successfully
857
858        # XXX
859        fail_soft = False
860
861        if tbmap is None: tbmap = { }
862
863        log = alloc_log or self.log
864
865        tp = thread_pool(self.nthreads)
866        threads = [ ]
867        starters = [ ]
868
869        if expcert:
870            cert = expcert
871            pw = None
872        else:
873            cert = self.cert_file
874            pw = self.cert_pwd
875
876        for tb in allocated.keys():
877            # Create and start a thread to start the segment, and save it
878            # to get the return value later
879            tb_attrs = copy.copy(attrs)
880            tp.wait_for_slot()
881            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
882            base, suffix = split_testbed(tb)
883            if suffix:
884                tb_attrs.append({'attribute': 'experiment_name', 
885                    'value': "%s-%s" % (eid, suffix)})
886            else:
887                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
888            if not uri:
889                raise service_error(service_error.internal, 
890                        "Unknown testbed %s !?" % tb)
891
892            if tbparams[tb].has_key('allocID') and \
893                    tbparams[tb]['allocID'].has_key('fedid'):
894                aid = tbparams[tb]['allocID']['fedid']
895            else:
896                raise service_error(service_error.internal, 
897                        "No alloc id for testbed %s !?" % tb)
898
899            s = self.start_segment(log=log, debug=self.debug,
900                    testbed=tb, cert_file=cert,
901                    cert_pwd=pw, trusted_certs=self.trusted_certs,
902                    caller=self.call_StartSegment,
903                    log_collector=log_collector)
904            starters.append(s)
905            t  = pooled_thread(\
906                    target=s, name=tb,
907                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
908                    pdata=tp, trace_file=self.trace_file)
909            threads.append(t)
910            t.start()
911
912        # Wait until all finish (keep pinging the log, though)
913        mins = 0
914        revoked = False
915        while not tp.wait_for_all_done(60.0):
916            mins += 1
917            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
918                    % mins)
919            if not revoked and \
920                    len([ t.getName() for t in threads if t.rv == False]) > 0:
921                # a testbed has failed.  Revoke this experiment's
922                # synchronizarion values so that sub experiments will not
923                # deadlock waiting for synchronization that will never happen
924                self.log.info("A subexperiment has failed to swap in, " + \
925                        "revoking synch keys")
926                var_key = "fedid:%s" % expid
927                for k in self.synch_store.all_keys():
928                    if len(k) > 45 and k[0:46] == var_key:
929                        self.synch_store.revoke_key(k)
930                revoked = True
931
932        failed = [ t.getName() for t in threads if not t.rv ]
933        succeeded = [tb for tb in allocated.keys() if tb not in failed]
934
935        # If one failed clean up, unless fail_soft is set
936        if failed:
937            if not fail_soft:
938                tp.clear()
939                for tb in succeeded:
940                    # Create and start a thread to stop the segment
941                    tp.wait_for_slot()
942                    uri = tbparams[tb]['uri']
943                    t  = pooled_thread(\
944                            target=self.terminate_segment(log=log,
945                                testbed=tb,
946                                cert_file=cert, 
947                                cert_pwd=pw,
948                                trusted_certs=self.trusted_certs,
949                                caller=self.call_TerminateSegment),
950                            args=(uri, tbparams[tb]['federant']['allocID']),
951                            name=tb,
952                            pdata=tp, trace_file=self.trace_file)
953                    t.start()
954                # Wait until all finish (if any are being stopped)
955                if succeeded:
956                    tp.wait_for_all_done()
957
958                # release the allocations
959                for tb in tbparams.keys():
960                    try:
961                        self.release_access(tb, tbparams[tb]['allocID'], 
962                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
963                                cert_file=cert, cert_pwd=pw)
964                    except service_error, e:
965                        self.log.warn("Error releasing access: %s" % e.desc)
966                # Remove the placeholder
967                self.state_lock.acquire()
968                self.state[eid]['experimentStatus'] = 'failed'
969                if self.state_filename: self.write_state()
970                self.state_lock.release()
971                # Remove the repo dir
972                self.remove_dirs("%s/%s" %(self.repodir, expid))
973                # Walk up tmpdir, deleting as we go
974                if self.cleanup:
975                    self.remove_dirs(tmpdir)
976                else:
977                    log.debug("[start_experiment]: not removing %s" % tmpdir)
978
979
980                log.error("Swap in failed on %s" % ",".join(failed))
981                return
982        else:
983            # Walk through the successes and gather the virtual to physical
984            # mapping.
985            embedding = [ ]
986            for s in starters:
987                for k, v in s.node.items():
988                    embedding.append({
989                        'toponame': k, 
990                        'physname': [ v],
991                        'testbed': s.testbed
992                        })
993            log.info("[start_segment]: Experiment %s active" % eid)
994
995
996        # Walk up tmpdir, deleting as we go
997        if self.cleanup:
998            self.remove_dirs(tmpdir)
999        else:
1000            log.debug("[start_experiment]: not removing %s" % tmpdir)
1001
1002        # Insert the experiment into our state and update the disk copy.
1003        self.state_lock.acquire()
1004        self.state[expid]['experimentStatus'] = 'active'
1005        self.state[eid] = self.state[expid]
1006        self.state[eid]['experimentdescription']['topdldescription'] = \
1007                top.to_dict()
1008        self.state[eid]['embedding'] = embedding
1009        if self.state_filename: self.write_state()
1010        self.state_lock.release()
1011        return
1012
1013
1014    def add_kit(self, e, kit):
1015        """
1016        Add a Software object created from the list of (install, location)
1017        tuples passed as kit  to the software attribute of an object e.  We
1018        do this enough to break out the code, but it's kind of a hack to
1019        avoid changing the old tuple rep.
1020        """
1021
1022        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1023
1024        if isinstance(e.software, list): e.software.extend(s)
1025        else: e.software = s
1026
1027
1028    def create_experiment_state(self, fid, req, expid, expcert,
1029            state='starting'):
1030        """
1031        Create the initial entry in the experiment's state.  The expid and
1032        expcert are the experiment's fedid and certifacte that represents that
1033        ID, which are installed in the experiment state.  If the request
1034        includes a suggested local name that is used if possible.  If the local
1035        name is already taken by an experiment owned by this user that has
1036        failed, it is overwritten.  Otherwise new letters are added until a
1037        valid localname is found.  The generated local name is returned.
1038        """
1039
1040        if req.has_key('experimentID') and \
1041                req['experimentID'].has_key('localname'):
1042            overwrite = False
1043            eid = req['experimentID']['localname']
1044            # If there's an old failed experiment here with the same local name
1045            # and accessible by this user, we'll overwrite it, otherwise we'll
1046            # fall through and do the collision avoidance.
1047            old_expid = self.get_experiment_fedid(eid)
1048            if old_expid and self.check_experiment_access(fid, old_expid):
1049                self.state_lock.acquire()
1050                status = self.state[eid].get('experimentStatus', None)
1051                if status and status == 'failed':
1052                    # remove the old access attribute
1053                    self.auth.unset_attribute(fid, old_expid)
1054                    self.auth.save()
1055                    overwrite = True
1056                    del self.state[eid]
1057                    del self.state[old_expid]
1058                self.state_lock.release()
1059            self.state_lock.acquire()
1060            while (self.state.has_key(eid) and not overwrite):
1061                eid += random.choice(string.ascii_letters)
1062            # Initial state
1063            self.state[eid] = {
1064                    'experimentID' : \
1065                            [ { 'localname' : eid }, {'fedid': expid } ],
1066                    'experimentStatus': state,
1067                    'experimentAccess': { 'X509' : expcert },
1068                    'owner': fid,
1069                    'log' : [],
1070                }
1071            self.state[expid] = self.state[eid]
1072            if self.state_filename: self.write_state()
1073            self.state_lock.release()
1074        else:
1075            eid = self.exp_stem
1076            for i in range(0,5):
1077                eid += random.choice(string.ascii_letters)
1078            self.state_lock.acquire()
1079            while (self.state.has_key(eid)):
1080                eid = self.exp_stem
1081                for i in range(0,5):
1082                    eid += random.choice(string.ascii_letters)
1083            # Initial state
1084            self.state[eid] = {
1085                    'experimentID' : \
1086                            [ { 'localname' : eid }, {'fedid': expid } ],
1087                    'experimentStatus': state,
1088                    'experimentAccess': { 'X509' : expcert },
1089                    'owner': fid,
1090                    'log' : [],
1091                }
1092            self.state[expid] = self.state[eid]
1093            if self.state_filename: self.write_state()
1094            self.state_lock.release()
1095
1096        return eid
1097
1098
1099    def allocate_ips_to_topo(self, top):
1100        """
1101        Add an ip4_address attribute to all the hosts in the topology, based on
1102        the shared substrates on which they sit.  An /etc/hosts file is also
1103        created and returned as a list of hostfiles entries.  We also return
1104        the allocator, because we may need to allocate IPs to portals
1105        (specifically DRAGON portals).
1106        """
1107        subs = sorted(top.substrates, 
1108                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1109                reverse=True)
1110        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1111        ifs = { }
1112        hosts = [ ]
1113
1114        for idx, s in enumerate(subs):
1115            net_size = len(s.interfaces)+2
1116
1117            a = ips.allocate(net_size)
1118            if a :
1119                base, num = a
1120                if num < net_size: 
1121                    raise service_error(service_error.internal,
1122                            "Allocator returned wrong number of IPs??")
1123            else:
1124                raise service_error(service_error.req, 
1125                        "Cannot allocate IP addresses")
1126            mask = ips.min_alloc
1127            while mask < net_size:
1128                mask *= 2
1129
1130            netmask = ((2**32-1) ^ (mask-1))
1131
1132            base += 1
1133            for i in s.interfaces:
1134                i.attribute.append(
1135                        topdl.Attribute('ip4_address', 
1136                            "%s" % ip_addr(base)))
1137                i.attribute.append(
1138                        topdl.Attribute('ip4_netmask', 
1139                            "%s" % ip_addr(int(netmask))))
1140
1141                hname = i.element.name
1142                if ifs.has_key(hname):
1143                    hosts.append("%s\t%s-%s %s-%d" % \
1144                            (ip_addr(base), hname, s.name, hname,
1145                                ifs[hname]))
1146                else:
1147                    ifs[hname] = 0
1148                    hosts.append("%s\t%s-%s %s-%d %s" % \
1149                            (ip_addr(base), hname, s.name, hname,
1150                                ifs[hname], hname))
1151
1152                ifs[hname] += 1
1153                base += 1
1154        return hosts, ips
1155
1156    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1157            tbparam, masters, tbmap, expid=None, expcert=None):
1158        for tb in testbeds:
1159            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1160                    expcert)
1161            allocated[tb] = 1
1162
1163    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1164            expcert=None):
1165        """
1166        Get access to testbed through fedd and set the parameters for that tb
1167        """
1168        def get_export_project(svcs):
1169            """
1170            Look through for the list of federated_service for this testbed
1171            objects for a project_export service, and extract the project
1172            parameter.
1173            """
1174
1175            pe = [s for s in svcs if s.name=='project_export']
1176            if len(pe) == 1:
1177                return pe[0].params.get('project', None)
1178            elif len(pe) == 0:
1179                return None
1180            else:
1181                raise service_error(service_error.req,
1182                        "More than one project export is not supported")
1183
1184        def add_services(svcs, type, slist):
1185            """
1186            Add the given services to slist.  type is import or export.
1187            """
1188            for i, s in enumerate(svcs):
1189                idx = '%s%d' % (type, i)
1190                sr = {'id': idx, 'name': s.name, 'visibility': type }
1191                if s.params:
1192                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1193                            for k, v in s.params.items()]
1194                slist.append(sr)
1195
1196        uri = tbmap.get(testbed_base(tb), None)
1197        if not uri:
1198            raise service_error(service_error.server_config, 
1199                    "Unknown testbed: %s" % tb)
1200
1201        export_svcs = masters.get(tb,[])
1202        import_svcs = [ s for m in masters.values() \
1203                for s in m \
1204                    if tb in s.importers ]
1205
1206        export_project = get_export_project(export_svcs)
1207        # Compose the credential list so that IDs come before attributes
1208        creds = set()
1209        keys = set()
1210        certs = self.auth.get_creds_for_principal(fid)
1211        if expid:
1212            certs.update(self.auth.get_creds_for_principal(expid))
1213        for c in certs:
1214            keys.add(c.issuer_cert())
1215            creds.add(c.attribute_cert())
1216        creds = list(keys) + list(creds)
1217
1218        if expcert: cert, pw = expcert, None
1219        else: cert, pw = self.cert_file, self.cert_pw
1220
1221        # Request credentials
1222        req = {
1223                'abac_credential': creds,
1224            }
1225        # Make the service request from the services we're importing and
1226        # exporting.  Keep track of the export request ids so we can
1227        # collect the resulting info from the access response.
1228        e_keys = { }
1229        if import_svcs or export_svcs:
1230            slist = []
1231            add_services(import_svcs, 'import', slist)
1232            add_services(export_svcs, 'export', slist)
1233            req['service'] = slist
1234
1235        if self.local_access.has_key(uri):
1236            # Local access call
1237            req = { 'RequestAccessRequestBody' : req }
1238            r = self.local_access[uri].RequestAccess(req, 
1239                    fedid(file=self.cert_file))
1240            r = { 'RequestAccessResponseBody' : r }
1241        else:
1242            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1243
1244        if r.has_key('RequestAccessResponseBody'):
1245            # Through to here we have a valid response, not a fault.
1246            # Access denied is a fault, so something better or worse than
1247            # access denied has happened.
1248            r = r['RequestAccessResponseBody']
1249            self.log.debug("[get_access] Access granted")
1250        else:
1251            raise service_error(service_error.protocol,
1252                        "Bad proxy response")
1253       
1254        tbparam[tb] = { 
1255                "allocID" : r['allocID'],
1256                "uri": uri,
1257                }
1258
1259        # Collect the responses corresponding to the services this testbed
1260        # exports.  These will be the service requests that we will include in
1261        # the start segment requests (with appropriate visibility values) to
1262        # import and export the segments.
1263        for s in r.get('service', []):
1264            id = s.get('id', None)
1265            if id and id in e_keys:
1266                e_keys[id].reqs.append(s)
1267
1268        # Add attributes to parameter space.  We don't allow attributes to
1269        # overlay any parameters already installed.
1270        for a in r.get('fedAttr', []):
1271            try:
1272                if a['attribute'] and \
1273                        isinstance(a['attribute'], basestring)\
1274                        and not tbparam[tb].has_key(a['attribute'].lower()):
1275                    tbparam[tb][a['attribute'].lower()] = a['value']
1276            except KeyError:
1277                self.log.error("Bad attribute in response: %s" % a)
1278
1279
1280    def split_topology(self, top, topo, testbeds):
1281        """
1282        Create the sub-topologies that are needed for experiment instantiation.
1283        """
1284        for tb in testbeds:
1285            topo[tb] = top.clone()
1286            # copy in for loop allows deletions from the original
1287            for e in [ e for e in topo[tb].elements]:
1288                etb = e.get_attribute('testbed')
1289                # NB: elements without a testbed attribute won't appear in any
1290                # sub topologies. 
1291                if not etb or etb != tb:
1292                    for i in e.interface:
1293                        for s in i.subs:
1294                            try:
1295                                s.interfaces.remove(i)
1296                            except ValueError:
1297                                raise service_error(service_error.internal,
1298                                        "Can't remove interface??")
1299                    topo[tb].elements.remove(e)
1300            topo[tb].make_indices()
1301
1302    def wrangle_software(self, expid, top, topo, tbparams):
1303        """
1304        Copy software out to the repository directory, allocate permissions and
1305        rewrite the segment topologies to look for the software in local
1306        places.
1307        """
1308
1309        # Copy the rpms and tarfiles to a distribution directory from
1310        # which the federants can retrieve them
1311        linkpath = "%s/software" %  expid
1312        softdir ="%s/%s" % ( self.repodir, linkpath)
1313        softmap = { }
1314        # These are in a list of tuples format (each kit).  This comprehension
1315        # unwraps them into a single list of tuples that initilaizes the set of
1316        # tuples.
1317        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1318                for p, t in l ])
1319        pkgs.update([x.location for e in top.elements \
1320                for x in e.software])
1321        try:
1322            os.makedirs(softdir)
1323        except EnvironmentError, e:
1324            raise service_error(
1325                    "Cannot create software directory: %s" % e)
1326        # The actual copying.  Everything's converted into a url for copying.
1327        for pkg in pkgs:
1328            loc = pkg
1329
1330            scheme, host, path = urlparse(loc)[0:3]
1331            dest = os.path.basename(path)
1332            if not scheme:
1333                if not loc.startswith('/'):
1334                    loc = "/%s" % loc
1335                loc = "file://%s" %loc
1336            try:
1337                u = urlopen(loc)
1338            except Exception, e:
1339                raise service_error(service_error.req, 
1340                        "Cannot open %s: %s" % (loc, e))
1341            try:
1342                f = open("%s/%s" % (softdir, dest) , "w")
1343                self.log.debug("Writing %s/%s" % (softdir,dest) )
1344                data = u.read(4096)
1345                while data:
1346                    f.write(data)
1347                    data = u.read(4096)
1348                f.close()
1349                u.close()
1350            except Exception, e:
1351                raise service_error(service_error.internal,
1352                        "Could not copy %s: %s" % (loc, e))
1353            path = re.sub("/tmp", "", linkpath)
1354            # XXX
1355            softmap[pkg] = \
1356                    "%s/%s/%s" %\
1357                    ( self.repo_url, path, dest)
1358
1359            # Allow the individual segments to access the software.
1360            for tb in tbparams.keys():
1361                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1362                        "/%s/%s" % ( path, dest))
1363            self.auth.save()
1364
1365        # Convert the software locations in the segments into the local
1366        # copies on this host
1367        for soft in [ s for tb in topo.values() \
1368                for e in tb.elements \
1369                    if getattr(e, 'software', False) \
1370                        for s in e.software ]:
1371            if softmap.has_key(soft.location):
1372                soft.location = softmap[soft.location]
1373
1374
1375    def new_experiment(self, req, fid):
1376        """
1377        The external interface to empty initial experiment creation called from
1378        the dispatcher.
1379
1380        Creates a working directory, splits the incoming description using the
1381        splitter script and parses out the avrious subsections using the
1382        lcasses above.  Once each sub-experiment is created, use pooled threads
1383        to instantiate them and start it all up.
1384        """
1385        req = req.get('NewRequestBody', None)
1386        if not req:
1387            raise service_error(service_error.req,
1388                    "Bad request format (no NewRequestBody)")
1389
1390        if self.auth.import_credentials(data_list=req.get('credential', [])):
1391            self.auth.save()
1392
1393        if not self.auth.check_attribute(fid, 'new'):
1394            raise service_error(service_error.access, "New access denied")
1395
1396        try:
1397            tmpdir = tempfile.mkdtemp(prefix="split-")
1398        except EnvironmentError:
1399            raise service_error(service_error.internal, "Cannot create tmp dir")
1400
1401        try:
1402            access_user = self.accessdb[fid]
1403        except KeyError:
1404            raise service_error(service_error.internal,
1405                    "Access map and authorizer out of sync in " + \
1406                            "new_experiment for fedid %s"  % fid)
1407
1408        pid = "dummy"
1409        gid = "dummy"
1410
1411        # Generate an ID for the experiment (slice) and a certificate that the
1412        # allocator can use to prove they own it.  We'll ship it back through
1413        # the encrypted connection.  If the requester supplied one, use it.
1414        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1415            expcert = req['experimentAccess']['X509']
1416            expid = fedid(certstr=expcert)
1417            self.state_lock.acquire()
1418            if expid in self.state:
1419                self.state_lock.release()
1420                raise service_error(service_error.req, 
1421                        'fedid %s identifies an existing experiment' % expid)
1422            self.state_lock.release()
1423        else:
1424            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1425
1426        #now we're done with the tmpdir, and it should be empty
1427        if self.cleanup:
1428            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1429            os.rmdir(tmpdir)
1430        else:
1431            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1432
1433        eid = self.create_experiment_state(fid, req, expid, expcert, 
1434                state='empty')
1435
1436        # Let users touch the state
1437        self.auth.set_attribute(fid, expid)
1438        self.auth.set_attribute(expid, expid)
1439        # Override fedids can manipulate state as well
1440        for o in self.overrides:
1441            self.auth.set_attribute(o, expid)
1442        self.auth.save()
1443
1444        rv = {
1445                'experimentID': [
1446                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1447                ],
1448                'experimentStatus': 'empty',
1449                'experimentAccess': { 'X509' : expcert }
1450            }
1451
1452        return rv
1453
1454    @staticmethod
1455    def get_create_key(req):
1456        """
1457        Parse the experiment identifiers out of the request (the request body
1458        tag has been removed).  Specifically this pulls either the fedid or the
1459        localname out of the experimentID field.  A fedid is preferred.  If
1460        neither is present or the request does not contain the fields,
1461        service_errors are raised.
1462        """
1463        # Get the experiment access
1464        exp = req.get('experimentID', None)
1465        if exp:
1466            if exp.has_key('fedid'):
1467                key = exp['fedid']
1468            elif exp.has_key('localname'):
1469                key = exp['localname']
1470            else:
1471                raise service_error(service_error.req, "Unknown lookup type")
1472        else:
1473            raise service_error(service_error.req, "No request?")
1474
1475        return key
1476
1477    def get_experiment_ids_and_start(self, key, tmpdir):
1478        """
1479        Get the experiment name, id and access certificate from the state, and
1480        set the experiment state to 'starting'.  returns a triple (fedid,
1481        localname, access_cert_file). The access_cert_file is a copy of the
1482        contents of the access certificate, created in the tempdir with
1483        restricted permissions.  If things are confused, raise an exception.
1484        """
1485
1486        expid = eid = None
1487        self.state_lock.acquire()
1488        if self.state.has_key(key):
1489            self.state[key]['experimentStatus'] = "starting"
1490            for e in self.state[key].get('experimentID',[]):
1491                if not expid and e.has_key('fedid'):
1492                    expid = e['fedid']
1493                elif not eid and e.has_key('localname'):
1494                    eid = e['localname']
1495            if 'experimentAccess' in self.state[key] and \
1496                    'X509' in self.state[key]['experimentAccess']:
1497                expcert = self.state[key]['experimentAccess']['X509']
1498            else:
1499                expcert = None
1500        self.state_lock.release()
1501
1502        # make a protected copy of the access certificate so the experiment
1503        # controller can act as the experiment principal.
1504        if expcert:
1505            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1506            if not expcert_file:
1507                raise service_error(service_error.internal, 
1508                        "Cannot create temp cert file?")
1509        else:
1510            expcert_file = None
1511
1512        return (eid, expid, expcert_file)
1513
1514    def get_topology(self, req, tmpdir):
1515        """
1516        Get the ns2 content and put it into a file for parsing.  Call the local
1517        or remote parser and return the topdl.Topology.  Errors result in
1518        exceptions.  req is the request and tmpdir is a work directory.
1519        """
1520
1521        # The tcl parser needs to read a file so put the content into that file
1522        descr=req.get('experimentdescription', None)
1523        if descr:
1524            file_content=descr.get('ns2description', None)
1525        else:
1526            raise service_error(service_error.req, "No experiment description")
1527
1528
1529        if self.splitter_url:
1530            self.log.debug("Calling remote topdl translator at %s" % \
1531                    self.splitter_url)
1532            top = self.remote_ns2topdl(self.splitter_url, file_content)
1533        else:
1534            tclfile = os.path.join(tmpdir, "experiment.tcl")
1535            if file_content:
1536                try:
1537                    f = open(tclfile, 'w')
1538                    f.write(file_content)
1539                    f.close()
1540                except EnvironmentError:
1541                    raise service_error(service_error.internal,
1542                            "Cannot write temp experiment description")
1543            else:
1544                raise service_error(service_error.req, 
1545                        "Only ns2descriptions supported")
1546            pid = "dummy"
1547            gid = "dummy"
1548            eid = "dummy"
1549
1550            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1551                str(self.muxmax), '-m', 'dummy']
1552
1553            tclcmd.extend([pid, gid, eid, tclfile])
1554
1555            self.log.debug("running local splitter %s", " ".join(tclcmd))
1556            # This is just fantastic.  As a side effect the parser copies
1557            # tb_compat.tcl into the current directory, so that directory
1558            # must be writable by the fedd user.  Doing this in the
1559            # temporary subdir ensures this is the case.
1560            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1561                    cwd=tmpdir)
1562            split_data = tclparser.stdout
1563
1564            top = topdl.topology_from_xml(file=split_data, top="experiment")
1565            os.remove(tclfile)
1566
1567        return top
1568
1569    def get_testbed_services(self, req):
1570        """
1571        Parse the services section of the request into into two dicts mapping
1572        testbed to lists of federated_service objects.  The first lists all
1573        exporters of services, and the second all exporters of services that
1574        need control portals int the experiment.
1575        """
1576        masters = { }
1577        pmasters = { }
1578        for s in req.get('service', []):
1579            # If this is a service request with the importall field
1580            # set, fill it out.
1581
1582            if s.get('importall', False):
1583                s['import'] = [ tb for tb in testbeds \
1584                        if tb not in s.get('export',[])]
1585                del s['importall']
1586
1587            # Add the service to masters
1588            for tb in s.get('export', []):
1589                if s.get('name', None):
1590
1591                    params = { }
1592                    for a in s.get('fedAttr', []):
1593                        params[a.get('attribute', '')] = a.get('value','')
1594
1595                    fser = federated_service(name=s['name'],
1596                            exporter=tb, importers=s.get('import',[]),
1597                            params=params)
1598                    if fser.name == 'hide_hosts' \
1599                            and 'hosts' not in fser.params:
1600                        fser.params['hosts'] = \
1601                                ",".join(tb_hosts.get(fser.exporter, []))
1602                    if tb in masters: masters[tb].append(fser)
1603                    else: masters[tb] = [fser]
1604
1605                    if fser.portal:
1606                        if tb not in pmasters: pmasters[tb] = [ fser ]
1607                        else: pmasters[tb].append(fser)
1608                else:
1609                    self.log.error('Testbed service does not have name " + \
1610                            "and importers')
1611        return masters, pmasters
1612
1613
1614    def create_experiment(self, req, fid):
1615        """
1616        The external interface to experiment creation called from the
1617        dispatcher.
1618
1619        Creates a working directory, splits the incoming description using the
1620        splitter script and parses out the various subsections using the
1621        classes above.  Once each sub-experiment is created, use pooled threads
1622        to instantiate them and start it all up.
1623        """
1624
1625        req = req.get('CreateRequestBody', None)
1626        if req:
1627            key = self.get_create_key(req)
1628        else:
1629            raise service_error(service_error.req,
1630                    "Bad request format (no CreateRequestBody)")
1631
1632        # Import information from the requester
1633        if self.auth.import_credentials(data_list=req.get('credential', [])):
1634            self.auth.save()
1635
1636        # Make sure that the caller can talk to us
1637        self.check_experiment_access(fid, key)
1638
1639        # Install the testbed map entries supplied with the request into a copy
1640        # of the testbed map.
1641        tbmap = dict(self.tbmap)
1642        for m in req.get('testbedmap', []):
1643            if 'testbed' in m and 'uri' in m:
1644                tbmap[m['testbed']] = m['uri']
1645
1646        # a place to work
1647        try:
1648            tmpdir = tempfile.mkdtemp(prefix="split-")
1649            os.mkdir(tmpdir+"/keys")
1650        except EnvironmentError:
1651            raise service_error(service_error.internal, "Cannot create tmp dir")
1652
1653        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1654        gw_secretkey_base = "fed.%s" % self.ssh_type
1655        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1656        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1657        tbparams = { }
1658
1659        eid, expid, expcert_file = \
1660                self.get_experiment_ids_and_start(key, tmpdir)
1661
1662        # This catches exceptions to clear the placeholder if necessary
1663        try: 
1664            if not (eid and expid):
1665                raise service_error(service_error.internal, 
1666                        "Cannot find local experiment info!?")
1667            try:
1668                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1669            except ValueError:
1670                raise service_error(service_error.server_config, 
1671                        "Bad key type (%s)" % self.ssh_type)
1672
1673            top = self.get_topology(req, tmpdir)
1674
1675            # Assign the IPs
1676            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1677            # Find the testbeds to look up
1678            tb_hosts = { }
1679            testbeds = [ ]
1680            for e in top.elements:
1681                if isinstance(e, topdl.Computer):
1682                    tb = e.get_attribute('testbed') or 'default'
1683                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1684                    else: 
1685                        tb_hosts[tb] = [ e.name ]
1686                        testbeds.append(tb)
1687
1688            masters, pmasters = self.get_testbed_services(req)
1689            allocated = { }         # Testbeds we can access
1690            topo ={ }               # Sub topologies
1691            connInfo = { }          # Connection information
1692
1693            self.get_access_to_testbeds(testbeds, fid, allocated, 
1694                    tbparams, masters, tbmap, expid, expcert_file)
1695
1696            self.split_topology(top, topo, testbeds)
1697
1698            # Copy configuration files into the remote file store
1699            # The config urlpath
1700            configpath = "/%s/config" % expid
1701            # The config file system location
1702            configdir ="%s%s" % ( self.repodir, configpath)
1703            try:
1704                os.makedirs(configdir)
1705            except EnvironmentError, e:
1706                raise service_error(service_error.internal,
1707                        "Cannot create config directory: %s" % e)
1708            try:
1709                f = open("%s/hosts" % configdir, "w")
1710                f.write('\n'.join(hosts))
1711                f.close()
1712            except EnvironmentError, e:
1713                raise service_error(service_error.internal, 
1714                        "Cannot write hosts file: %s" % e)
1715            try:
1716                copy_file("%s" % gw_pubkey, "%s/%s" % \
1717                        (configdir, gw_pubkey_base))
1718                copy_file("%s" % gw_secretkey, "%s/%s" % \
1719                        (configdir, gw_secretkey_base))
1720            except EnvironmentError, e:
1721                raise service_error(service_error.internal, 
1722                        "Cannot copy keyfiles: %s" % e)
1723
1724            # Allow the individual testbeds to access the configuration files.
1725            for tb in tbparams.keys():
1726                asignee = tbparams[tb]['allocID']['fedid']
1727                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1728                    self.auth.set_attribute(asignee, "%s/%s" % \
1729                            (configpath, f))
1730
1731            part = experiment_partition(self.auth, self.store_url, tbmap,
1732                    self.muxmax, self.direct_transit)
1733            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1734                    connInfo, expid)
1735            # Now get access to the dynamic testbeds (those added above)
1736            for tb in [ t for t in topo if t not in allocated]:
1737                #XXX: ABAC
1738                if self.auth_type =='legacy':
1739                    self.get_legacy_access(tb, None, tbparams, access_user, 
1740                            masters, tbmap)
1741                elif self.auth_type == 'abac':
1742                    self.get_access(tb, tbparams, fid, masters, tbmap, 
1743                            expid, expcert_file)
1744                else:
1745                    raise service_error(service_error.internal, 
1746                            "Unknown auth_type %s" % self.auth_type)
1747                allocated[tb] = 1
1748                store_keys = topo[tb].get_attribute('store_keys')
1749                # Give the testbed access to keys it exports or imports
1750                if store_keys:
1751                    for sk in store_keys.split(" "):
1752                        self.auth.set_attribute(\
1753                                tbparams[tb]['allocID']['fedid'], sk)
1754            self.auth.save()
1755
1756            self.wrangle_software(expid, top, topo, tbparams)
1757
1758            vtopo = topdl.topology_to_vtopo(top)
1759            vis = self.genviz(vtopo)
1760
1761            # save federant information
1762            for k in allocated.keys():
1763                tbparams[k]['federant'] = {
1764                        'name': [ { 'localname' : eid} ],
1765                        'allocID' : tbparams[k]['allocID'],
1766                        'uri': tbparams[k]['uri'],
1767                    }
1768
1769            self.state_lock.acquire()
1770            self.state[eid]['vtopo'] = vtopo
1771            self.state[eid]['vis'] = vis
1772            self.state[eid]['experimentdescription'] = \
1773                    { 'topdldescription': top.to_dict() }
1774            self.state[eid]['federant'] = \
1775                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1776                        if tbparams[tb].has_key('federant') ]
1777            if self.state_filename: 
1778                self.write_state()
1779            self.state_lock.release()
1780        except service_error, e:
1781            # If something goes wrong in the parse (usually an access error)
1782            # clear the placeholder state.  From here on out the code delays
1783            # exceptions.  Failing at this point returns a fault to the remote
1784            # caller.
1785
1786            self.state_lock.acquire()
1787            del self.state[eid]
1788            del self.state[expid]
1789            if self.state_filename: self.write_state()
1790            self.state_lock.release()
1791            if tmpdir and self.cleanup:
1792                self.remove_dirs(tmpdir)
1793            raise e
1794
1795
1796        # Start the background swapper and return the starting state.  From
1797        # here on out, the state will stick around a while.
1798
1799        # Let users touch the state
1800        self.auth.set_attribute(fid, expid)
1801        self.auth.set_attribute(expid, expid)
1802        # Override fedids can manipulate state as well
1803        for o in self.overrides:
1804            self.auth.set_attribute(o, expid)
1805        self.auth.save()
1806
1807        # Create a logger that logs to the experiment's state object as well as
1808        # to the main log file.
1809        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1810        alloc_collector = self.list_log(self.state[eid]['log'])
1811        h = logging.StreamHandler(alloc_collector)
1812        # XXX: there should be a global one of these rather than repeating the
1813        # code.
1814        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1815                    '%d %b %y %H:%M:%S'))
1816        alloc_log.addHandler(h)
1817       
1818        attrs = [ 
1819                {
1820                    'attribute': 'ssh_pubkey', 
1821                    'value': '%s/%s/config/%s' % \
1822                            (self.repo_url, expid, gw_pubkey_base)
1823                },
1824                {
1825                    'attribute': 'ssh_secretkey', 
1826                    'value': '%s/%s/config/%s' % \
1827                            (self.repo_url, expid, gw_secretkey_base)
1828                },
1829                {
1830                    'attribute': 'hosts', 
1831                    'value': '%s/%s/config/hosts' % \
1832                            (self.repo_url, expid)
1833                },
1834            ]
1835
1836        # transit and disconnected testbeds may not have a connInfo entry.
1837        # Fill in the blanks.
1838        for t in allocated.keys():
1839            if not connInfo.has_key(t):
1840                connInfo[t] = { }
1841
1842        # Start a thread to do the resource allocation
1843        t  = Thread(target=self.allocate_resources,
1844                args=(allocated, masters, eid, expid, tbparams, 
1845                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1846                    connInfo, tbmap, expcert_file),
1847                name=eid)
1848        t.start()
1849
1850        rv = {
1851                'experimentID': [
1852                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1853                ],
1854                'experimentStatus': 'starting',
1855            }
1856
1857        return rv
1858   
1859    def get_experiment_fedid(self, key):
1860        """
1861        find the fedid associated with the localname key in the state database.
1862        """
1863
1864        rv = None
1865        self.state_lock.acquire()
1866        if self.state.has_key(key):
1867            if isinstance(self.state[key], dict):
1868                try:
1869                    kl = [ f['fedid'] for f in \
1870                            self.state[key]['experimentID']\
1871                                if f.has_key('fedid') ]
1872                except KeyError:
1873                    self.state_lock.release()
1874                    raise service_error(service_error.internal, 
1875                            "No fedid for experiment %s when getting "+\
1876                                    "fedid(!?)" % key)
1877                if len(kl) == 1:
1878                    rv = kl[0]
1879                else:
1880                    self.state_lock.release()
1881                    raise service_error(service_error.internal, 
1882                            "multiple fedids for experiment %s when " +\
1883                                    "getting fedid(!?)" % key)
1884            else:
1885                self.state_lock.release()
1886                raise service_error(service_error.internal, 
1887                        "Unexpected state for %s" % key)
1888        self.state_lock.release()
1889        return rv
1890
1891    def check_experiment_access(self, fid, key):
1892        """
1893        Confirm that the fid has access to the experiment.  Though a request
1894        may be made in terms of a local name, the access attribute is always
1895        the experiment's fedid.
1896        """
1897        if not isinstance(key, fedid):
1898            key = self.get_experiment_fedid(key)
1899
1900        if self.auth.check_attribute(fid, key):
1901            return True
1902        else:
1903            raise service_error(service_error.access, "Access Denied")
1904
1905
1906    def get_handler(self, path, fid):
1907        self.log.info("Get handler %s %s" % (path, fid))
1908        if self.auth.check_attribute(fid, path):
1909            return ("%s/%s" % (self.repodir, path), "application/binary")
1910        else:
1911            return (None, None)
1912
1913    def get_vtopo(self, req, fid):
1914        """
1915        Return the stored virtual topology for this experiment
1916        """
1917        rv = None
1918        state = None
1919
1920        req = req.get('VtopoRequestBody', None)
1921        if not req:
1922            raise service_error(service_error.req,
1923                    "Bad request format (no VtopoRequestBody)")
1924        exp = req.get('experiment', None)
1925        if exp:
1926            if exp.has_key('fedid'):
1927                key = exp['fedid']
1928                keytype = "fedid"
1929            elif exp.has_key('localname'):
1930                key = exp['localname']
1931                keytype = "localname"
1932            else:
1933                raise service_error(service_error.req, "Unknown lookup type")
1934        else:
1935            raise service_error(service_error.req, "No request?")
1936
1937        self.check_experiment_access(fid, key)
1938
1939        self.state_lock.acquire()
1940        if self.state.has_key(key):
1941            if self.state[key].has_key('vtopo'):
1942                rv = { 'experiment' : {keytype: key },\
1943                        'vtopo': self.state[key]['vtopo'],\
1944                    }
1945            else:
1946                state = self.state[key]['experimentStatus']
1947        self.state_lock.release()
1948
1949        if rv: return rv
1950        else: 
1951            if state:
1952                raise service_error(service_error.partial, 
1953                        "Not ready: %s" % state)
1954            else:
1955                raise service_error(service_error.req, "No such experiment")
1956
1957    def get_vis(self, req, fid):
1958        """
1959        Return the stored visualization for this experiment
1960        """
1961        rv = None
1962        state = None
1963
1964        req = req.get('VisRequestBody', None)
1965        if not req:
1966            raise service_error(service_error.req,
1967                    "Bad request format (no VisRequestBody)")
1968        exp = req.get('experiment', None)
1969        if exp:
1970            if exp.has_key('fedid'):
1971                key = exp['fedid']
1972                keytype = "fedid"
1973            elif exp.has_key('localname'):
1974                key = exp['localname']
1975                keytype = "localname"
1976            else:
1977                raise service_error(service_error.req, "Unknown lookup type")
1978        else:
1979            raise service_error(service_error.req, "No request?")
1980
1981        self.check_experiment_access(fid, key)
1982
1983        self.state_lock.acquire()
1984        if self.state.has_key(key):
1985            if self.state[key].has_key('vis'):
1986                rv =  { 'experiment' : {keytype: key },\
1987                        'vis': self.state[key]['vis'],\
1988                        }
1989            else:
1990                state = self.state[key]['experimentStatus']
1991        self.state_lock.release()
1992
1993        if rv: return rv
1994        else:
1995            if state:
1996                raise service_error(service_error.partial, 
1997                        "Not ready: %s" % state)
1998            else:
1999                raise service_error(service_error.req, "No such experiment")
2000
2001    def clean_info_response(self, rv):
2002        """
2003        Remove the information in the experiment's state object that is not in
2004        the info response.
2005        """
2006        # Remove the owner info (should always be there, but...)
2007        if rv.has_key('owner'): del rv['owner']
2008
2009        # Convert the log into the allocationLog parameter and remove the
2010        # log entry (with defensive programming)
2011        if rv.has_key('log'):
2012            rv['allocationLog'] = "".join(rv['log'])
2013            del rv['log']
2014        else:
2015            rv['allocationLog'] = ""
2016
2017        if rv['experimentStatus'] != 'active':
2018            if rv.has_key('federant'): del rv['federant']
2019        else:
2020            # remove the allocationID and uri info from each federant
2021            for f in rv.get('federant', []):
2022                if f.has_key('allocID'): del f['allocID']
2023                if f.has_key('uri'): del f['uri']
2024
2025        return rv
2026
2027    def get_info(self, req, fid):
2028        """
2029        Return all the stored info about this experiment
2030        """
2031        rv = None
2032
2033        req = req.get('InfoRequestBody', None)
2034        if not req:
2035            raise service_error(service_error.req,
2036                    "Bad request format (no InfoRequestBody)")
2037        exp = req.get('experiment', None)
2038        if exp:
2039            if exp.has_key('fedid'):
2040                key = exp['fedid']
2041                keytype = "fedid"
2042            elif exp.has_key('localname'):
2043                key = exp['localname']
2044                keytype = "localname"
2045            else:
2046                raise service_error(service_error.req, "Unknown lookup type")
2047        else:
2048            raise service_error(service_error.req, "No request?")
2049
2050        self.check_experiment_access(fid, key)
2051
2052        # The state may be massaged by the service function that called
2053        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2054        # state.
2055        self.state_lock.acquire()
2056        if self.state.has_key(key):
2057            rv = copy.deepcopy(self.state[key])
2058        self.state_lock.release()
2059
2060        if rv:
2061            return self.clean_info_response(rv)
2062        else:
2063            raise service_error(service_error.req, "No such experiment")
2064
2065    def get_multi_info(self, req, fid):
2066        """
2067        Return all the stored info that this fedid can access
2068        """
2069        rv = { 'info': [ ] }
2070
2071        self.state_lock.acquire()
2072        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2073            try:
2074                self.check_experiment_access(fid, key)
2075            except service_error, e:
2076                if e.code == service_error.access:
2077                    continue
2078                else:
2079                    self.state_lock.release()
2080                    raise e
2081
2082            if self.state.has_key(key):
2083                e = copy.deepcopy(self.state[key])
2084                e = self.clean_info_response(e)
2085                rv['info'].append(e)
2086        self.state_lock.release()
2087        return rv
2088
2089    def remove_dirs(self, dir):
2090        """
2091        Remove the directory tree and all files rooted at dir.  Log any errors,
2092        but continue.
2093        """
2094        self.log.debug("[removedirs]: removing %s" % dir)
2095        try:
2096            for path, dirs, files in os.walk(dir, topdown=False):
2097                for f in files:
2098                    os.remove(os.path.join(path, f))
2099                for d in dirs:
2100                    os.rmdir(os.path.join(path, d))
2101            os.rmdir(dir)
2102        except EnvironmentError, e:
2103            self.log.error("Error deleting directory tree in %s" % e);
2104
2105    @staticmethod
2106    def make_temp_certfile(expcert, tmpdir):
2107        """
2108        make a protected copy of the access certificate so the experiment
2109        controller can act as the experiment principal.  mkstemp is the most
2110        secure way to do that. The directory should be created by
2111        mkdtemp.  Return the filename.
2112        """
2113        if expcert and tmpdir:
2114            try:
2115                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
2116                f = os.fdopen(certf, 'w')
2117                print >> f, expcert
2118                f.close()
2119            except EnvironmentError, e:
2120                raise service_error(service_error.internal, 
2121                        "Cannot create temp cert file?")
2122            return certfn
2123        else:
2124            return None
2125
2126    def terminate_experiment(self, req, fid):
2127        """
2128        Swap this experiment out on the federants and delete the shared
2129        information
2130        """
2131        tbparams = { }
2132        req = req.get('TerminateRequestBody', None)
2133        if not req:
2134            raise service_error(service_error.req,
2135                    "Bad request format (no TerminateRequestBody)")
2136        force = req.get('force', False)
2137        exp = req.get('experiment', None)
2138        if exp:
2139            if exp.has_key('fedid'):
2140                key = exp['fedid']
2141                keytype = "fedid"
2142            elif exp.has_key('localname'):
2143                key = exp['localname']
2144                keytype = "localname"
2145            else:
2146                raise service_error(service_error.req, "Unknown lookup type")
2147        else:
2148            raise service_error(service_error.req, "No request?")
2149
2150        self.check_experiment_access(fid, key)
2151
2152        dealloc_list = [ ]
2153
2154
2155        # Create a logger that logs to the dealloc_list as well as to the main
2156        # log file.
2157        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2158        h = logging.StreamHandler(self.list_log(dealloc_list))
2159        # XXX: there should be a global one of these rather than repeating the
2160        # code.
2161        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2162                    '%d %b %y %H:%M:%S'))
2163        dealloc_log.addHandler(h)
2164
2165        self.state_lock.acquire()
2166        fed_exp = self.state.get(key, None)
2167        repo = None
2168
2169        if fed_exp:
2170            # This branch of the conditional holds the lock to generate a
2171            # consistent temporary tbparams variable to deallocate experiments.
2172            # It releases the lock to do the deallocations and reacquires it to
2173            # remove the experiment state when the termination is complete.
2174
2175            # First make sure that the experiment creation is complete.
2176            status = fed_exp.get('experimentStatus', None)
2177
2178            if status:
2179                if status in ('starting', 'terminating'):
2180                    if not force:
2181                        self.state_lock.release()
2182                        raise service_error(service_error.partial, 
2183                                'Experiment still being created or destroyed')
2184                    else:
2185                        self.log.warning('Experiment in %s state ' % status + \
2186                                'being terminated by force.')
2187            else:
2188                # No status??? trouble
2189                self.state_lock.release()
2190                raise service_error(service_error.internal,
2191                        "Experiment has no status!?")
2192
2193            ids = []
2194            #  experimentID is a list of dicts that are self-describing
2195            #  identifiers.  This finds all the fedids and localnames - the
2196            #  keys of self.state - and puts them into ids.
2197            for id in fed_exp.get('experimentID', []):
2198                if id.has_key('fedid'): 
2199                    ids.append(id['fedid'])
2200                    repo = "%s" % id['fedid']
2201                if id.has_key('localname'): ids.append(id['localname'])
2202
2203            # Get the experimentAccess - the principal for this experiment.  It
2204            # is this principal to which credentials have been delegated, and
2205            # as which the experiment controller must act.
2206            if 'experimentAccess' in self.state[key] and \
2207                    'X509' in self.state[key]['experimentAccess']:
2208                expcert = self.state[key]['experimentAccess']['X509']
2209            else:
2210                expcert = None
2211            # Collect the allocation/segment ids into a dict keyed by the fedid
2212            # of the allocation (or a monotonically increasing integer) that
2213            # contains a tuple of uri, aid (which is a dict...)
2214            for i, fed in enumerate(fed_exp.get('federant', [])):
2215                try:
2216                    uri = fed['uri']
2217                    aid = fed['allocID']
2218                    k = fed['allocID'].get('fedid', i)
2219                except KeyError, e:
2220                    continue
2221                tbparams[k] = (uri, aid)
2222            fed_exp['experimentStatus'] = 'terminating'
2223            if self.state_filename: self.write_state()
2224            self.state_lock.release()
2225
2226            try:
2227                tmpdir = tempfile.mkdtemp(prefix="split-")
2228            except EnvironmentError:
2229                raise service_error(service_error.internal, 
2230                        "Cannot create tmp dir")
2231            # This try block makes sure the tempdir is cleared
2232            try:
2233                # If no expcert, try the deallocation as the experiment
2234                # controller instance.
2235                if expcert and self.auth_type != 'legacy': 
2236                    cert_file = self.make_temp_certfile(expcert, tmpdir)
2237                    pw = None
2238                else: 
2239                    cert_file = self.cert_file
2240                    pw = self.cert_pwd
2241
2242                # Stop everyone.  NB, wait_for_all waits until a thread starts
2243                # and then completes, so we can't wait if nothing starts.  So,
2244                # no tbparams, no start.
2245                if len(tbparams) > 0:
2246                    tp = thread_pool(self.nthreads)
2247                    for k in tbparams.keys():
2248                        # Create and start a thread to stop the segment
2249                        tp.wait_for_slot()
2250                        uri, aid = tbparams[k]
2251                        t  = pooled_thread(\
2252                                target=self.terminate_segment(log=dealloc_log,
2253                                    testbed=uri,
2254                                    cert_file=cert_file, 
2255                                    cert_pwd=pw,
2256                                    trusted_certs=self.trusted_certs,
2257                                    caller=self.call_TerminateSegment),
2258                                args=(uri, aid), name=k,
2259                                pdata=tp, trace_file=self.trace_file)
2260                        t.start()
2261                    # Wait for completions
2262                    tp.wait_for_all_done()
2263
2264                # release the allocations (failed experiments have done this
2265                # already, and starting experiments may be in odd states, so we
2266                # ignore errors releasing those allocations
2267                try: 
2268                    for k in tbparams.keys():
2269                        # This releases access by uri
2270                        uri, aid = tbparams[k]
2271                        self.release_access(None, aid, uri=uri,
2272                                cert_file=cert_file, cert_pwd=pw)
2273                except service_error, e:
2274                    if status != 'failed' and not force:
2275                        raise e
2276
2277            # Clean up the tmpdir no matter what
2278            finally:
2279                self.remove_dirs(tmpdir)
2280
2281            # Remove the terminated experiment
2282            self.state_lock.acquire()
2283            for id in ids:
2284                if self.state.has_key(id): del self.state[id]
2285
2286            if self.state_filename: self.write_state()
2287            self.state_lock.release()
2288
2289            # Delete any synch points associated with this experiment.  All
2290            # synch points begin with the fedid of the experiment.
2291            fedid_keys = set(["fedid:%s" % f for f in ids \
2292                    if isinstance(f, fedid)])
2293            for k in self.synch_store.all_keys():
2294                try:
2295                    if len(k) > 45 and k[0:46] in fedid_keys:
2296                        self.synch_store.del_value(k)
2297                except synch_store.BadDeletionError:
2298                    pass
2299            self.write_store()
2300
2301            # Remove software and other cached stuff from the filesystem.
2302            if repo:
2303                self.remove_dirs("%s/%s" % (self.repodir, repo))
2304               
2305            return { 
2306                    'experiment': exp , 
2307                    'deallocationLog': "".join(dealloc_list),
2308                    }
2309        else:
2310            # Don't forget to release the lock
2311            self.state_lock.release()
2312            raise service_error(service_error.req, "No saved state")
2313
2314
2315    def GetValue(self, req, fid):
2316        """
2317        Get a value from the synchronized store
2318        """
2319        req = req.get('GetValueRequestBody', None)
2320        if not req:
2321            raise service_error(service_error.req,
2322                    "Bad request format (no GetValueRequestBody)")
2323       
2324        name = req['name']
2325        wait = req['wait']
2326        rv = { 'name': name }
2327
2328        if self.auth.check_attribute(fid, name):
2329            self.log.debug("[GetValue] asking for %s " % name)
2330            try:
2331                v = self.synch_store.get_value(name, wait)
2332            except synch_store.RevokedKeyError:
2333                # No more synch on this key
2334                raise service_error(service_error.federant, 
2335                        "Synch key %s revoked" % name)
2336            if v is not None:
2337                rv['value'] = v
2338            self.log.debug("[GetValue] got %s from %s" % (v, name))
2339            return rv
2340        else:
2341            raise service_error(service_error.access, "Access Denied")
2342       
2343
2344    def SetValue(self, req, fid):
2345        """
2346        Set a value in the synchronized store
2347        """
2348        req = req.get('SetValueRequestBody', None)
2349        if not req:
2350            raise service_error(service_error.req,
2351                    "Bad request format (no SetValueRequestBody)")
2352       
2353        name = req['name']
2354        v = req['value']
2355
2356        if self.auth.check_attribute(fid, name):
2357            try:
2358                self.synch_store.set_value(name, v)
2359                self.write_store()
2360                self.log.debug("[SetValue] set %s to %s" % (name, v))
2361            except synch_store.CollisionError:
2362                # Translate into a service_error
2363                raise service_error(service_error.req,
2364                        "Value already set: %s" %name)
2365            except synch_store.RevokedKeyError:
2366                # No more synch on this key
2367                raise service_error(service_error.federant, 
2368                        "Synch key %s revoked" % name)
2369            return { 'name': name, 'value': v }
2370        else:
2371            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.