source: fedd/federation/experiment_control.py @ 63c6664

axis_examplecompt_changesinfo-ops
Last change on this file since 63c6664 was 63c6664, checked in by Ted Faber <faber@…>, 13 years ago

Consolidate some code. More #10.

  • Property mode set to 100644
File size: 79.6 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205        elif self.auth_type == 'abac':
206            self.auth = abac_authorizer(load=self.auth_dir)
207        else:
208            raise service_error(service_error.internal, 
209                    "Unknown auth_type: %s" % self.auth_type)
210
211
212        if self.ssh_pubkey_file:
213            try:
214                f = open(self.ssh_pubkey_file, 'r')
215                self.ssh_pubkey = f.read()
216                f.close()
217            except EnvironmentError:
218                raise service_error(service_error.internal,
219                        "Cannot read sshpubkey")
220        else:
221            raise service_error(service_error.internal, 
222                    "No SSH public key file?")
223
224        if not self.ssh_privkey_file:
225            raise service_error(service_error.internal, 
226                    "No SSH public key file?")
227
228
229        if mapdb_file:
230            self.read_mapdb(mapdb_file)
231        else:
232            self.log.warn("[experiment_control] No testbed map, using defaults")
233            self.tbmap = { 
234                    'deter':'https://users.isi.deterlab.net:23235',
235                    'emulab':'https://users.isi.deterlab.net:23236',
236                    'ucb':'https://users.isi.deterlab.net:23237',
237                    }
238
239        if accessdb_file:
240                self.read_accessdb(accessdb_file)
241        else:
242            raise service_error(service_error.internal,
243                    "No accessdb specified in config")
244
245        # Grab saved state.  OK to do this w/o locking because it's read only
246        # and only one thread should be in existence that can see self.state at
247        # this point.
248        if self.state_filename:
249            self.read_state()
250
251        if self.store_filename:
252            self.read_store()
253        else:
254            self.log.warning("No saved synch store")
255            self.synch_store = synch_store
256
257        # Dispatch tables
258        self.soap_services = {\
259                'New': soap_handler('New', self.new_experiment),
260                'Create': soap_handler('Create', self.create_experiment),
261                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
262                'Vis': soap_handler('Vis', self.get_vis),
263                'Info': soap_handler('Info', self.get_info),
264                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
265                'Terminate': soap_handler('Terminate', 
266                    self.terminate_experiment),
267                'GetValue': soap_handler('GetValue', self.GetValue),
268                'SetValue': soap_handler('SetValue', self.SetValue),
269        }
270
271        self.xmlrpc_services = {\
272                'New': xmlrpc_handler('New', self.new_experiment),
273                'Create': xmlrpc_handler('Create', self.create_experiment),
274                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
275                'Vis': xmlrpc_handler('Vis', self.get_vis),
276                'Info': xmlrpc_handler('Info', self.get_info),
277                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
278                'Terminate': xmlrpc_handler('Terminate',
279                    self.terminate_experiment),
280                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
281                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
282        }
283
284    # Call while holding self.state_lock
285    def write_state(self):
286        """
287        Write a new copy of experiment state after copying the existing state
288        to a backup.
289
290        State format is a simple pickling of the state dictionary.
291        """
292        if os.access(self.state_filename, os.W_OK):
293            copy_file(self.state_filename, \
294                    "%s.bak" % self.state_filename)
295        try:
296            f = open(self.state_filename, 'w')
297            pickle.dump(self.state, f)
298        except EnvironmentError, e:
299            self.log.error("Can't write file %s: %s" % \
300                    (self.state_filename, e))
301        except pickle.PicklingError, e:
302            self.log.error("Pickling problem: %s" % e)
303        except TypeError, e:
304            self.log.error("Pickling problem (TypeError): %s" % e)
305
306    @staticmethod
307    def get_alloc_ids(state):
308        """
309        Pull the fedids of the identifiers of each allocation from the
310        state.  Again, a dict dive that's best isolated.
311
312        Used by read_store and read state
313        """
314
315        return [ f['allocID']['fedid'] 
316                for f in state.get('federant',[]) \
317                    if f.has_key('allocID') and \
318                        f['allocID'].has_key('fedid')]
319
320    # Call while holding self.state_lock
321    def read_state(self):
322        """
323        Read a new copy of experiment state.  Old state is overwritten.
324
325        State format is a simple pickling of the state dictionary.
326        """
327       
328        def get_experiment_id(state):
329            """
330            Pull the fedid experimentID out of the saved state.  This is kind
331            of a gross walk through the dict.
332            """
333
334            if state.has_key('experimentID'):
335                for e in state['experimentID']:
336                    if e.has_key('fedid'):
337                        return e['fedid']
338                else:
339                    return None
340            else:
341                return None
342
343        try:
344            f = open(self.state_filename, "r")
345            self.state = pickle.load(f)
346            self.log.debug("[read_state]: Read state from %s" % \
347                    self.state_filename)
348        except EnvironmentError, e:
349            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
350                    % (self.state_filename, e))
351        except pickle.UnpicklingError, e:
352            self.log.warning(("[read_state]: No saved state: " + \
353                    "Unpickling failed: %s") % e)
354       
355        for s in self.state.values():
356            try:
357
358                eid = get_experiment_id(s)
359                if eid : 
360                    if self.auth_type == 'legacy':
361                        # XXX: legacy
362                        # Give the owner rights to the experiment
363                        self.auth.set_attribute(s['owner'], eid)
364                        # And holders of the eid as well
365                        self.auth.set_attribute(eid, eid)
366                        # allow overrides to control experiments as well
367                        for o in self.overrides:
368                            self.auth.set_attribute(o, eid)
369                        # Set permissions to allow reading of the software
370                        # repo, if any, as well.
371                        for a in self.get_alloc_ids(s):
372                            self.auth.set_attribute(a, 'repo/%s' % eid)
373                else:
374                    raise KeyError("No experiment id")
375            except KeyError, e:
376                self.log.warning("[read_state]: State ownership or identity " +\
377                        "misformatted in %s: %s" % (self.state_filename, e))
378
379
380    def read_accessdb(self, accessdb_file):
381        """
382        Read the mapping from fedids that can create experiments to their name
383        in the 3-level access namespace.  All will be asserted from this
384        testbed and can include the local username and porject that will be
385        asserted on their behalf by this fedd.  Each fedid is also added to the
386        authorization system with the "create" attribute.
387        """
388        self.accessdb = {}
389        # These are the regexps for parsing the db
390        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
391        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
392                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
393        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
394                "\s*->\s*(" + name_expr + ")\s*$")
395        lineno = 0
396
397        # Parse the mappings and store in self.authdb, a dict of
398        # fedid -> (proj, user)
399        try:
400            f = open(accessdb_file, "r")
401            for line in f:
402                lineno += 1
403                line = line.strip()
404                if len(line) == 0 or line.startswith('#'):
405                    continue
406                m = project_line.match(line)
407                if m:
408                    fid = fedid(hexstr=m.group(1))
409                    project, user = m.group(2,3)
410                    if not self.accessdb.has_key(fid):
411                        self.accessdb[fid] = []
412                    self.accessdb[fid].append((project, user))
413                    continue
414
415                m = user_line.match(line)
416                if m:
417                    fid = fedid(hexstr=m.group(1))
418                    project = None
419                    user = m.group(2)
420                    if not self.accessdb.has_key(fid):
421                        self.accessdb[fid] = []
422                    self.accessdb[fid].append((project, user))
423                    continue
424                self.log.warn("[experiment_control] Error parsing access " +\
425                        "db %s at line %d" %  (accessdb_file, lineno))
426        except EnvironmentError:
427            raise service_error(service_error.internal,
428                    ("Error opening/reading %s as experiment " +\
429                            "control accessdb") %  accessdb_file)
430        f.close()
431
432        # Initialize the authorization attributes
433        # XXX: legacy
434        if self.auth_type == 'legacy':
435            for fid in self.accessdb.keys():
436                self.auth.set_attribute(fid, 'create')
437                self.auth.set_attribute(fid, 'new')
438
439    def read_mapdb(self, file):
440        """
441        Read a simple colon separated list of mappings for the
442        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
443        """
444
445        self.tbmap = { }
446        lineno =0
447        try:
448            f = open(file, "r")
449            for line in f:
450                lineno += 1
451                line = line.strip()
452                if line.startswith('#') or len(line) == 0:
453                    continue
454                try:
455                    label, url = line.split(':', 1)
456                    self.tbmap[label] = url
457                except ValueError, e:
458                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
459                            "map db: %s %s" % (lineno, line, e))
460        except EnvironmentError, e:
461            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
462                    "open %s: %s" % (file, e))
463        f.close()
464
465    def read_store(self):
466        try:
467            self.synch_store = synch_store()
468            self.synch_store.load(self.store_filename)
469            self.log.debug("[read_store]: Read store from %s" % \
470                    self.store_filename)
471        except EnvironmentError, e:
472            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
473                    % (self.state_filename, e))
474            self.synch_store = synch_store()
475
476        # Set the initial permissions on data in the store.  XXX: This ad hoc
477        # authorization attribute initialization is getting out of hand.
478        # XXX: legacy
479        if self.auth_type == 'legacy':
480            for k in self.synch_store.all_keys():
481                try:
482                    if k.startswith('fedid:'):
483                        fid = fedid(hexstr=k[6:46])
484                        if self.state.has_key(fid):
485                            for a in self.get_alloc_ids(self.state[fid]):
486                                self.auth.set_attribute(a, k)
487                except ValueError, e:
488                    self.log.warn("Cannot deduce permissions for %s" % k)
489
490
491    def write_store(self):
492        """
493        Write a new copy of synch_store after writing current state
494        to a backup.  We use the internal synch_store pickle method to avoid
495        incinsistent data.
496
497        State format is a simple pickling of the store.
498        """
499        if os.access(self.store_filename, os.W_OK):
500            copy_file(self.store_filename, \
501                    "%s.bak" % self.store_filename)
502        try:
503            self.synch_store.save(self.store_filename)
504        except EnvironmentError, e:
505            self.log.error("Can't write file %s: %s" % \
506                    (self.store_filename, e))
507        except TypeError, e:
508            self.log.error("Pickling problem (TypeError): %s" % e)
509
510       
511    def generate_ssh_keys(self, dest, type="rsa" ):
512        """
513        Generate a set of keys for the gateways to use to talk.
514
515        Keys are of type type and are stored in the required dest file.
516        """
517        valid_types = ("rsa", "dsa")
518        t = type.lower();
519        if t not in valid_types: raise ValueError
520        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
521
522        try:
523            trace = open("/dev/null", "w")
524        except EnvironmentError:
525            raise service_error(service_error.internal,
526                    "Cannot open /dev/null??");
527
528        # May raise CalledProcessError
529        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
530        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
531        if rv != 0:
532            raise service_error(service_error.internal, 
533                    "Cannot generate nonce ssh keys.  %s return code %d" \
534                            % (self.ssh_keygen, rv))
535
536    def gentopo(self, str):
537        """
538        Generate the topology data structure from the splitter's XML
539        representation of it.
540
541        The topology XML looks like:
542            <experiment>
543                <nodes>
544                    <node><vname></vname><ips>ip1:ip2</ips></node>
545                </nodes>
546                <lans>
547                    <lan>
548                        <vname></vname><vnode></vnode><ip></ip>
549                        <bandwidth></bandwidth><member>node:port</member>
550                    </lan>
551                </lans>
552        """
553        class topo_parse:
554            """
555            Parse the topology XML and create the dats structure.
556            """
557            def __init__(self):
558                # Typing of the subelements for data conversion
559                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
560                self.int_subelements = ( 'bandwidth',)
561                self.float_subelements = ( 'delay',)
562                # The final data structure
563                self.nodes = [ ]
564                self.lans =  [ ]
565                self.topo = { \
566                        'node': self.nodes,\
567                        'lan' : self.lans,\
568                    }
569                self.element = { }  # Current element being created
570                self.chars = ""     # Last text seen
571
572            def end_element(self, name):
573                # After each sub element the contents is added to the current
574                # element or to the appropriate list.
575                if name == 'node':
576                    self.nodes.append(self.element)
577                    self.element = { }
578                elif name == 'lan':
579                    self.lans.append(self.element)
580                    self.element = { }
581                elif name in self.str_subelements:
582                    self.element[name] = self.chars
583                    self.chars = ""
584                elif name in self.int_subelements:
585                    self.element[name] = int(self.chars)
586                    self.chars = ""
587                elif name in self.float_subelements:
588                    self.element[name] = float(self.chars)
589                    self.chars = ""
590
591            def found_chars(self, data):
592                self.chars += data.rstrip()
593
594
595        tp = topo_parse();
596        parser = xml.parsers.expat.ParserCreate()
597        parser.EndElementHandler = tp.end_element
598        parser.CharacterDataHandler = tp.found_chars
599
600        parser.Parse(str)
601
602        return tp.topo
603       
604
605    def genviz(self, topo):
606        """
607        Generate the visualization the virtual topology
608        """
609
610        neato = "/usr/local/bin/neato"
611        # These are used to parse neato output and to create the visualization
612        # file.
613        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
614        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
615                "%s</type></node>"
616
617        try:
618            # Node names
619            nodes = [ n['vname'] for n in topo['node'] ]
620            topo_lans = topo['lan']
621        except KeyError, e:
622            raise service_error(service_error.internal, "Bad topology: %s" %e)
623
624        lans = { }
625        links = { }
626
627        # Walk through the virtual topology, organizing the connections into
628        # 2-node connections (links) and more-than-2-node connections (lans).
629        # When a lan is created, it's added to the list of nodes (there's a
630        # node in the visualization for the lan).
631        for l in topo_lans:
632            if links.has_key(l['vname']):
633                if len(links[l['vname']]) < 2:
634                    links[l['vname']].append(l['vnode'])
635                else:
636                    nodes.append(l['vname'])
637                    lans[l['vname']] = links[l['vname']]
638                    del links[l['vname']]
639                    lans[l['vname']].append(l['vnode'])
640            elif lans.has_key(l['vname']):
641                lans[l['vname']].append(l['vnode'])
642            else:
643                links[l['vname']] = [ l['vnode'] ]
644
645
646        # Open up a temporary file for dot to turn into a visualization
647        try:
648            df, dotname = tempfile.mkstemp()
649            dotfile = os.fdopen(df, 'w')
650        except EnvironmentError:
651            raise service_error(service_error.internal,
652                    "Failed to open file in genviz")
653
654        try:
655            dnull = open('/dev/null', 'w')
656        except EnvironmentError:
657            service_error(service_error.internal,
658                    "Failed to open /dev/null in genviz")
659
660        # Generate a dot/neato input file from the links, nodes and lans
661        try:
662            print >>dotfile, "graph G {"
663            for n in nodes:
664                print >>dotfile, '\t"%s"' % n
665            for l in links.keys():
666                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
667            for l in lans.keys():
668                for n in lans[l]:
669                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
670            print >>dotfile, "}"
671            dotfile.close()
672        except TypeError:
673            raise service_error(service_error.internal,
674                    "Single endpoint link in vtopo")
675        except EnvironmentError:
676            raise service_error(service_error.internal, "Cannot write dot file")
677
678        # Use dot to create a visualization
679        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
680                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
681                close_fds=True)
682        dnull.close()
683
684        # Translate dot to vis format
685        vis_nodes = [ ]
686        vis = { 'node': vis_nodes }
687        for line in dot.stdout:
688            m = vis_re.match(line)
689            if m:
690                vn = m.group(1)
691                vis_node = {'name': vn, \
692                        'x': float(m.group(2)),\
693                        'y' : float(m.group(3)),\
694                    }
695                if vn in links.keys() or vn in lans.keys():
696                    vis_node['type'] = 'lan'
697                else:
698                    vis_node['type'] = 'node'
699                vis_nodes.append(vis_node)
700        rv = dot.wait()
701
702        os.remove(dotname)
703        if rv == 0 : return vis
704        else: return None
705
706
707    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
708            cert_pwd=None):
709        """
710        Release access to testbed through fedd
711        """
712
713        if not uri and tbmap:
714            uri = tbmap.get(tb, None)
715        if not uri:
716            raise service_error(service_error.server_config, 
717                    "Unknown testbed: %s" % tb)
718
719        if self.local_access.has_key(uri):
720            resp = self.local_access[uri].ReleaseAccess(\
721                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
722                    fedid(file=cert_file))
723            resp = { 'ReleaseAccessResponseBody': resp } 
724        else:
725            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
726                    cert_file, cert_pwd, self.trusted_certs)
727
728        # better error coding
729
730    def remote_ns2topdl(self, uri, desc):
731
732        req = {
733                'description' : { 'ns2description': desc },
734            }
735
736        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
737                self.trusted_certs)
738
739        if r.has_key('Ns2TopdlResponseBody'):
740            r = r['Ns2TopdlResponseBody']
741            ed = r.get('experimentdescription', None)
742            if ed.has_key('topdldescription'):
743                return topdl.Topology(**ed['topdldescription'])
744            else:
745                raise service_error(service_error.protocol, 
746                        "Bad splitter response (no output)")
747        else:
748            raise service_error(service_error.protocol, "Bad splitter response")
749
750    class start_segment:
751        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
752                cert_pwd=None, trusted_certs=None, caller=None,
753                log_collector=None):
754            self.log = log
755            self.debug = debug
756            self.cert_file = cert_file
757            self.cert_pwd = cert_pwd
758            self.trusted_certs = None
759            self.caller = caller
760            self.testbed = testbed
761            self.log_collector = log_collector
762            self.response = None
763            self.node = { }
764
765        def make_map(self, resp):
766            for e in resp.get('embedding', []):
767                if 'toponame' in e and 'physname' in e:
768                    self.node[e['toponame']] = e['physname'][0]
769
770        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
771            req = {
772                    'allocID': { 'fedid' : aid }, 
773                    'segmentdescription': { 
774                        'topdldescription': topo.to_dict(),
775                    },
776                }
777
778            if connInfo:
779                req['connection'] = connInfo
780
781            import_svcs = [ s for m in masters.values() \
782                    for s in m if self.testbed in s.importers]
783
784            if import_svcs or self.testbed in masters:
785                req['service'] = []
786
787            for s in import_svcs:
788                for r in s.reqs:
789                    sr = copy.deepcopy(r)
790                    sr['visibility'] = 'import';
791                    req['service'].append(sr)
792
793            for s in masters.get(self.testbed, []):
794                for r in s.reqs:
795                    sr = copy.deepcopy(r)
796                    sr['visibility'] = 'export';
797                    req['service'].append(sr)
798
799            if attrs:
800                req['fedAttr'] = attrs
801
802            try:
803                self.log.debug("Calling StartSegment at %s " % uri)
804                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
805                        self.trusted_certs)
806                if r.has_key('StartSegmentResponseBody'):
807                    lval = r['StartSegmentResponseBody'].get('allocationLog',
808                            None)
809                    if lval and self.log_collector:
810                        for line in  lval.splitlines(True):
811                            self.log_collector.write(line)
812                    self.make_map(r['StartSegmentResponseBody'])
813                    self.response = r
814                else:
815                    raise service_error(service_error.internal, 
816                            "Bad response!?: %s" %r)
817                return True
818            except service_error, e:
819                self.log.error("Start segment failed on %s: %s" % \
820                        (self.testbed, e))
821                return False
822
823
824
825    class terminate_segment:
826        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
827                cert_pwd=None, trusted_certs=None, caller=None):
828            self.log = log
829            self.debug = debug
830            self.cert_file = cert_file
831            self.cert_pwd = cert_pwd
832            self.trusted_certs = None
833            self.caller = caller
834            self.testbed = testbed
835
836        def __call__(self, uri, aid ):
837            req = {
838                    'allocID': aid , 
839                }
840            try:
841                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
842                        self.trusted_certs)
843                return True
844            except service_error, e:
845                self.log.error("Terminate segment failed on %s: %s" % \
846                        (self.testbed, e))
847                return False
848   
849
850    def allocate_resources(self, allocated, masters, eid, expid, 
851            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
852            attrs=None, connInfo={}, tbmap=None, expcert=None):
853
854        started = { }           # Testbeds where a sub-experiment started
855                                # successfully
856
857        # XXX
858        fail_soft = False
859
860        if tbmap is None: tbmap = { }
861
862        log = alloc_log or self.log
863
864        tp = thread_pool(self.nthreads)
865        threads = [ ]
866        starters = [ ]
867
868        if expcert:
869            cert = expcert
870            pw = None
871        else:
872            cert = self.cert_file
873            pw = self.cert_pwd
874
875        for tb in allocated.keys():
876            # Create and start a thread to start the segment, and save it
877            # to get the return value later
878            tb_attrs = copy.copy(attrs)
879            tp.wait_for_slot()
880            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
881            base, suffix = split_testbed(tb)
882            if suffix:
883                tb_attrs.append({'attribute': 'experiment_name', 
884                    'value': "%s-%s" % (eid, suffix)})
885            else:
886                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
887            if not uri:
888                raise service_error(service_error.internal, 
889                        "Unknown testbed %s !?" % tb)
890
891            if tbparams[tb].has_key('allocID') and \
892                    tbparams[tb]['allocID'].has_key('fedid'):
893                aid = tbparams[tb]['allocID']['fedid']
894            else:
895                raise service_error(service_error.internal, 
896                        "No alloc id for testbed %s !?" % tb)
897
898            s = self.start_segment(log=log, debug=self.debug,
899                    testbed=tb, cert_file=cert,
900                    cert_pwd=pw, trusted_certs=self.trusted_certs,
901                    caller=self.call_StartSegment,
902                    log_collector=log_collector)
903            starters.append(s)
904            t  = pooled_thread(\
905                    target=s, name=tb,
906                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
907                    pdata=tp, trace_file=self.trace_file)
908            threads.append(t)
909            t.start()
910
911        # Wait until all finish (keep pinging the log, though)
912        mins = 0
913        revoked = False
914        while not tp.wait_for_all_done(60.0):
915            mins += 1
916            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
917                    % mins)
918            if not revoked and \
919                    len([ t.getName() for t in threads if t.rv == False]) > 0:
920                # a testbed has failed.  Revoke this experiment's
921                # synchronizarion values so that sub experiments will not
922                # deadlock waiting for synchronization that will never happen
923                self.log.info("A subexperiment has failed to swap in, " + \
924                        "revoking synch keys")
925                var_key = "fedid:%s" % expid
926                for k in self.synch_store.all_keys():
927                    if len(k) > 45 and k[0:46] == var_key:
928                        self.synch_store.revoke_key(k)
929                revoked = True
930
931        failed = [ t.getName() for t in threads if not t.rv ]
932        succeeded = [tb for tb in allocated.keys() if tb not in failed]
933
934        # If one failed clean up, unless fail_soft is set
935        if failed:
936            if not fail_soft:
937                tp.clear()
938                for tb in succeeded:
939                    # Create and start a thread to stop the segment
940                    tp.wait_for_slot()
941                    uri = tbparams[tb]['uri']
942                    t  = pooled_thread(\
943                            target=self.terminate_segment(log=log,
944                                testbed=tb,
945                                cert_file=cert, 
946                                cert_pwd=pw,
947                                trusted_certs=self.trusted_certs,
948                                caller=self.call_TerminateSegment),
949                            args=(uri, tbparams[tb]['federant']['allocID']),
950                            name=tb,
951                            pdata=tp, trace_file=self.trace_file)
952                    t.start()
953                # Wait until all finish (if any are being stopped)
954                if succeeded:
955                    tp.wait_for_all_done()
956
957                # release the allocations
958                for tb in tbparams.keys():
959                    try:
960                        self.release_access(tb, tbparams[tb]['allocID'], 
961                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
962                                cert_file=cert, cert_pwd=pw)
963                    except service_error, e:
964                        self.log.warn("Error releasing access: %s" % e.desc)
965                # Remove the placeholder
966                self.state_lock.acquire()
967                self.state[eid]['experimentStatus'] = 'failed'
968                if self.state_filename: self.write_state()
969                self.state_lock.release()
970                # Remove the repo dir
971                self.remove_dirs("%s/%s" %(self.repodir, expid))
972                # Walk up tmpdir, deleting as we go
973                if self.cleanup:
974                    self.remove_dirs(tmpdir)
975                else:
976                    log.debug("[start_experiment]: not removing %s" % tmpdir)
977
978
979                log.error("Swap in failed on %s" % ",".join(failed))
980                return
981        else:
982            # Walk through the successes and gather the virtual to physical
983            # mapping.
984            embedding = [ ]
985            for s in starters:
986                for k, v in s.node.items():
987                    embedding.append({
988                        'toponame': k, 
989                        'physname': [ v],
990                        'testbed': s.testbed
991                        })
992            log.info("[start_segment]: Experiment %s active" % eid)
993
994
995        # Walk up tmpdir, deleting as we go
996        if self.cleanup:
997            self.remove_dirs(tmpdir)
998        else:
999            log.debug("[start_experiment]: not removing %s" % tmpdir)
1000
1001        # Insert the experiment into our state and update the disk copy.
1002        self.state_lock.acquire()
1003        self.state[expid]['experimentStatus'] = 'active'
1004        self.state[eid] = self.state[expid]
1005        self.state[eid]['experimentdescription']['topdldescription'] = \
1006                top.to_dict()
1007        self.state[eid]['embedding'] = embedding
1008        if self.state_filename: self.write_state()
1009        self.state_lock.release()
1010        return
1011
1012
1013    def add_kit(self, e, kit):
1014        """
1015        Add a Software object created from the list of (install, location)
1016        tuples passed as kit  to the software attribute of an object e.  We
1017        do this enough to break out the code, but it's kind of a hack to
1018        avoid changing the old tuple rep.
1019        """
1020
1021        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1022
1023        if isinstance(e.software, list): e.software.extend(s)
1024        else: e.software = s
1025
1026
1027    def create_experiment_state(self, fid, req, expid, expcert,
1028            state='starting'):
1029        """
1030        Create the initial entry in the experiment's state.  The expid and
1031        expcert are the experiment's fedid and certifacte that represents that
1032        ID, which are installed in the experiment state.  If the request
1033        includes a suggested local name that is used if possible.  If the local
1034        name is already taken by an experiment owned by this user that has
1035        failed, it is overwritten.  Otherwise new letters are added until a
1036        valid localname is found.  The generated local name is returned.
1037        """
1038
1039        if req.has_key('experimentID') and \
1040                req['experimentID'].has_key('localname'):
1041            overwrite = False
1042            eid = req['experimentID']['localname']
1043            # If there's an old failed experiment here with the same local name
1044            # and accessible by this user, we'll overwrite it, otherwise we'll
1045            # fall through and do the collision avoidance.
1046            old_expid = self.get_experiment_fedid(eid)
1047            if old_expid and self.check_experiment_access(fid, old_expid):
1048                self.state_lock.acquire()
1049                status = self.state[eid].get('experimentStatus', None)
1050                if status and status == 'failed':
1051                    # remove the old access attribute
1052                    self.auth.unset_attribute(fid, old_expid)
1053                    self.auth.save()
1054                    overwrite = True
1055                    del self.state[eid]
1056                    del self.state[old_expid]
1057                self.state_lock.release()
1058            self.state_lock.acquire()
1059            while (self.state.has_key(eid) and not overwrite):
1060                eid += random.choice(string.ascii_letters)
1061            # Initial state
1062            self.state[eid] = {
1063                    'experimentID' : \
1064                            [ { 'localname' : eid }, {'fedid': expid } ],
1065                    'experimentStatus': state,
1066                    'experimentAccess': { 'X509' : expcert },
1067                    'owner': fid,
1068                    'log' : [],
1069                }
1070            self.state[expid] = self.state[eid]
1071            if self.state_filename: self.write_state()
1072            self.state_lock.release()
1073        else:
1074            eid = self.exp_stem
1075            for i in range(0,5):
1076                eid += random.choice(string.ascii_letters)
1077            self.state_lock.acquire()
1078            while (self.state.has_key(eid)):
1079                eid = self.exp_stem
1080                for i in range(0,5):
1081                    eid += random.choice(string.ascii_letters)
1082            # Initial state
1083            self.state[eid] = {
1084                    'experimentID' : \
1085                            [ { 'localname' : eid }, {'fedid': expid } ],
1086                    'experimentStatus': state,
1087                    'experimentAccess': { 'X509' : expcert },
1088                    'owner': fid,
1089                    'log' : [],
1090                }
1091            self.state[expid] = self.state[eid]
1092            if self.state_filename: self.write_state()
1093            self.state_lock.release()
1094
1095        return eid
1096
1097
1098    def allocate_ips_to_topo(self, top):
1099        """
1100        Add an ip4_address attribute to all the hosts in the topology, based on
1101        the shared substrates on which they sit.  An /etc/hosts file is also
1102        created and returned as a list of hostfiles entries.  We also return
1103        the allocator, because we may need to allocate IPs to portals
1104        (specifically DRAGON portals).
1105        """
1106        subs = sorted(top.substrates, 
1107                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1108                reverse=True)
1109        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1110        ifs = { }
1111        hosts = [ ]
1112
1113        for idx, s in enumerate(subs):
1114            net_size = len(s.interfaces)+2
1115
1116            a = ips.allocate(net_size)
1117            if a :
1118                base, num = a
1119                if num < net_size: 
1120                    raise service_error(service_error.internal,
1121                            "Allocator returned wrong number of IPs??")
1122            else:
1123                raise service_error(service_error.req, 
1124                        "Cannot allocate IP addresses")
1125            mask = ips.min_alloc
1126            while mask < net_size:
1127                mask *= 2
1128
1129            netmask = ((2**32-1) ^ (mask-1))
1130
1131            base += 1
1132            for i in s.interfaces:
1133                i.attribute.append(
1134                        topdl.Attribute('ip4_address', 
1135                            "%s" % ip_addr(base)))
1136                i.attribute.append(
1137                        topdl.Attribute('ip4_netmask', 
1138                            "%s" % ip_addr(int(netmask))))
1139
1140                hname = i.element.name
1141                if ifs.has_key(hname):
1142                    hosts.append("%s\t%s-%s %s-%d" % \
1143                            (ip_addr(base), hname, s.name, hname,
1144                                ifs[hname]))
1145                else:
1146                    ifs[hname] = 0
1147                    hosts.append("%s\t%s-%s %s-%d %s" % \
1148                            (ip_addr(base), hname, s.name, hname,
1149                                ifs[hname], hname))
1150
1151                ifs[hname] += 1
1152                base += 1
1153        return hosts, ips
1154
1155    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1156            tbparam, masters, tbmap, expid=None, expcert=None):
1157        for tb in testbeds:
1158            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1159                    expcert)
1160            allocated[tb] = 1
1161
1162    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1163            expcert=None):
1164        """
1165        Get access to testbed through fedd and set the parameters for that tb
1166        """
1167        def get_export_project(svcs):
1168            """
1169            Look through for the list of federated_service for this testbed
1170            objects for a project_export service, and extract the project
1171            parameter.
1172            """
1173
1174            pe = [s for s in svcs if s.name=='project_export']
1175            if len(pe) == 1:
1176                return pe[0].params.get('project', None)
1177            elif len(pe) == 0:
1178                return None
1179            else:
1180                raise service_error(service_error.req,
1181                        "More than one project export is not supported")
1182
1183        def add_services(svcs, type, slist):
1184            """
1185            Add the given services to slist.  type is import or export.
1186            """
1187            for i, s in enumerate(svcs):
1188                idx = '%s%d' % (type, i)
1189                sr = {'id': idx, 'name': s.name, 'visibility': type }
1190                if s.params:
1191                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1192                            for k, v in s.params.items()]
1193                slist.append(sr)
1194
1195        uri = tbmap.get(testbed_base(tb), None)
1196        if not uri:
1197            raise service_error(service_error.server_config, 
1198                    "Unknown testbed: %s" % tb)
1199
1200        export_svcs = masters.get(tb,[])
1201        import_svcs = [ s for m in masters.values() \
1202                for s in m \
1203                    if tb in s.importers ]
1204
1205        export_project = get_export_project(export_svcs)
1206        # Compose the credential list so that IDs come before attributes
1207        creds = set()
1208        keys = set()
1209        certs = self.auth.get_creds_for_principal(fid)
1210        if expid:
1211            certs.update(self.auth.get_creds_for_principal(expid))
1212        for c in certs:
1213            keys.add(c.issuer_cert())
1214            creds.add(c.attribute_cert())
1215        creds = list(keys) + list(creds)
1216
1217        if expcert: cert, pw = expcert, None
1218        else: cert, pw = self.cert_file, self.cert_pw
1219
1220        # Request credentials
1221        req = {
1222                'abac_credential': creds,
1223            }
1224        # Make the service request from the services we're importing and
1225        # exporting.  Keep track of the export request ids so we can
1226        # collect the resulting info from the access response.
1227        e_keys = { }
1228        if import_svcs or export_svcs:
1229            slist = []
1230            add_services(import_svcs, 'import', slist)
1231            add_services(import_svcs, 'export', slist)
1232            req['service'] = slist
1233
1234        if self.local_access.has_key(uri):
1235            # Local access call
1236            req = { 'RequestAccessRequestBody' : req }
1237            r = self.local_access[uri].RequestAccess(req, 
1238                    fedid(file=self.cert_file))
1239            r = { 'RequestAccessResponseBody' : r }
1240        else:
1241            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1242
1243        if r.has_key('RequestAccessResponseBody'):
1244            # Through to here we have a valid response, not a fault.
1245            # Access denied is a fault, so something better or worse than
1246            # access denied has happened.
1247            r = r['RequestAccessResponseBody']
1248            self.log.debug("[get_access] Access granted")
1249        else:
1250            raise service_error(service_error.protocol,
1251                        "Bad proxy response")
1252       
1253        tbparam[tb] = { 
1254                "allocID" : r['allocID'],
1255                "uri": uri,
1256                }
1257
1258        # Collect the responses corresponding to the services this testbed
1259        # exports.  These will be the service requests that we will include in
1260        # the start segment requests (with appropriate visibility values) to
1261        # import and export the segments.
1262        for s in r.get('service', []):
1263            id = s.get('id', None)
1264            if id and id in e_keys:
1265                e_keys[id].reqs.append(s)
1266
1267        # Add attributes to parameter space.  We don't allow attributes to
1268        # overlay any parameters already installed.
1269        for a in r.get('fedAttr', []):
1270            try:
1271                if a['attribute'] and \
1272                        isinstance(a['attribute'], basestring)\
1273                        and not tbparam[tb].has_key(a['attribute'].lower()):
1274                    tbparam[tb][a['attribute'].lower()] = a['value']
1275            except KeyError:
1276                self.log.error("Bad attribute in response: %s" % a)
1277
1278
1279    def split_topology(self, top, topo, testbeds):
1280        """
1281        Create the sub-topologies that are needed for experiment instantiation.
1282        """
1283        for tb in testbeds:
1284            topo[tb] = top.clone()
1285            # copy in for loop allows deletions from the original
1286            for e in [ e for e in topo[tb].elements]:
1287                etb = e.get_attribute('testbed')
1288                # NB: elements without a testbed attribute won't appear in any
1289                # sub topologies. 
1290                if not etb or etb != tb:
1291                    for i in e.interface:
1292                        for s in i.subs:
1293                            try:
1294                                s.interfaces.remove(i)
1295                            except ValueError:
1296                                raise service_error(service_error.internal,
1297                                        "Can't remove interface??")
1298                    topo[tb].elements.remove(e)
1299            topo[tb].make_indices()
1300
1301    def wrangle_software(self, expid, top, topo, tbparams):
1302        """
1303        Copy software out to the repository directory, allocate permissions and
1304        rewrite the segment topologies to look for the software in local
1305        places.
1306        """
1307
1308        # Copy the rpms and tarfiles to a distribution directory from
1309        # which the federants can retrieve them
1310        linkpath = "%s/software" %  expid
1311        softdir ="%s/%s" % ( self.repodir, linkpath)
1312        softmap = { }
1313        # These are in a list of tuples format (each kit).  This comprehension
1314        # unwraps them into a single list of tuples that initilaizes the set of
1315        # tuples.
1316        pkgs = set([ t for l in [self.fedkit, self.gatewaykit] \
1317                for p, t in l ])
1318        pkgs.update([x.location for e in top.elements \
1319                for x in e.software])
1320        try:
1321            os.makedirs(softdir)
1322        except EnvironmentError, e:
1323            raise service_error(
1324                    "Cannot create software directory: %s" % e)
1325        # The actual copying.  Everything's converted into a url for copying.
1326        for pkg in pkgs:
1327            loc = pkg
1328
1329            scheme, host, path = urlparse(loc)[0:3]
1330            dest = os.path.basename(path)
1331            if not scheme:
1332                if not loc.startswith('/'):
1333                    loc = "/%s" % loc
1334                loc = "file://%s" %loc
1335            try:
1336                u = urlopen(loc)
1337            except Exception, e:
1338                raise service_error(service_error.req, 
1339                        "Cannot open %s: %s" % (loc, e))
1340            try:
1341                f = open("%s/%s" % (softdir, dest) , "w")
1342                self.log.debug("Writing %s/%s" % (softdir,dest) )
1343                data = u.read(4096)
1344                while data:
1345                    f.write(data)
1346                    data = u.read(4096)
1347                f.close()
1348                u.close()
1349            except Exception, e:
1350                raise service_error(service_error.internal,
1351                        "Could not copy %s: %s" % (loc, e))
1352            path = re.sub("/tmp", "", linkpath)
1353            # XXX
1354            softmap[pkg] = \
1355                    "%s/%s/%s" %\
1356                    ( self.repo_url, path, dest)
1357
1358            # Allow the individual segments to access the software.
1359            for tb in tbparams.keys():
1360                self.auth.set_attribute(tbparams[tb]['allocID']['fedid'], 
1361                        "/%s/%s" % ( path, dest))
1362            self.auth.save()
1363
1364        # Convert the software locations in the segments into the local
1365        # copies on this host
1366        for soft in [ s for tb in topo.values() \
1367                for e in tb.elements \
1368                    if getattr(e, 'software', False) \
1369                        for s in e.software ]:
1370            if softmap.has_key(soft.location):
1371                soft.location = softmap[soft.location]
1372
1373
1374    def new_experiment(self, req, fid):
1375        """
1376        The external interface to empty initial experiment creation called from
1377        the dispatcher.
1378
1379        Creates a working directory, splits the incoming description using the
1380        splitter script and parses out the avrious subsections using the
1381        lcasses above.  Once each sub-experiment is created, use pooled threads
1382        to instantiate them and start it all up.
1383        """
1384        req = req.get('NewRequestBody', None)
1385        if not req:
1386            raise service_error(service_error.req,
1387                    "Bad request format (no NewRequestBody)")
1388
1389        if self.auth.import_credentials(data_list=req.get('credential', [])):
1390            self.auth.save()
1391
1392        if not self.auth.check_attribute(fid, 'new'):
1393            raise service_error(service_error.access, "New access denied")
1394
1395        try:
1396            tmpdir = tempfile.mkdtemp(prefix="split-")
1397        except EnvironmentError:
1398            raise service_error(service_error.internal, "Cannot create tmp dir")
1399
1400        try:
1401            access_user = self.accessdb[fid]
1402        except KeyError:
1403            raise service_error(service_error.internal,
1404                    "Access map and authorizer out of sync in " + \
1405                            "new_experiment for fedid %s"  % fid)
1406
1407        pid = "dummy"
1408        gid = "dummy"
1409
1410        # Generate an ID for the experiment (slice) and a certificate that the
1411        # allocator can use to prove they own it.  We'll ship it back through
1412        # the encrypted connection.  If the requester supplied one, use it.
1413        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1414            expcert = req['experimentAccess']['X509']
1415            expid = fedid(certstr=expcert)
1416            self.state_lock.acquire()
1417            if expid in self.state:
1418                self.state_lock.release()
1419                raise service_error(service_error.req, 
1420                        'fedid %s identifies an existing experiment' % expid)
1421            self.state_lock.release()
1422        else:
1423            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1424
1425        #now we're done with the tmpdir, and it should be empty
1426        if self.cleanup:
1427            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1428            os.rmdir(tmpdir)
1429        else:
1430            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1431
1432        eid = self.create_experiment_state(fid, req, expid, expcert, 
1433                state='empty')
1434
1435        # Let users touch the state
1436        self.auth.set_attribute(fid, expid)
1437        self.auth.set_attribute(expid, expid)
1438        # Override fedids can manipulate state as well
1439        for o in self.overrides:
1440            self.auth.set_attribute(o, expid)
1441        self.auth.save()
1442
1443        rv = {
1444                'experimentID': [
1445                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1446                ],
1447                'experimentStatus': 'empty',
1448                'experimentAccess': { 'X509' : expcert }
1449            }
1450
1451        return rv
1452
1453    def create_experiment(self, req, fid):
1454        """
1455        The external interface to experiment creation called from the
1456        dispatcher.
1457
1458        Creates a working directory, splits the incoming description using the
1459        splitter script and parses out the various subsections using the
1460        classes above.  Once each sub-experiment is created, use pooled threads
1461        to instantiate them and start it all up.
1462        """
1463
1464        req = req.get('CreateRequestBody', None)
1465        if not req:
1466            raise service_error(service_error.req,
1467                    "Bad request format (no CreateRequestBody)")
1468
1469        # Get the experiment access
1470        exp = req.get('experimentID', None)
1471        if exp:
1472            if exp.has_key('fedid'):
1473                key = exp['fedid']
1474                expid = key
1475                eid = None
1476            elif exp.has_key('localname'):
1477                key = exp['localname']
1478                eid = key
1479                expid = None
1480            else:
1481                raise service_error(service_error.req, "Unknown lookup type")
1482        else:
1483            raise service_error(service_error.req, "No request?")
1484
1485        # Import information from the requester
1486        if self.auth.import_credentials(data_list=req.get('credential', [])):
1487            self.auth.save()
1488
1489        self.check_experiment_access(fid, key)
1490
1491        # Install the testbed map entries supplied with the request into a copy
1492        # of the testbed map.
1493        tbmap = dict(self.tbmap)
1494        for m in req.get('testbedmap', []):
1495            if 'testbed' in m and 'uri' in m:
1496                tbmap[m['testbed']] = m['uri']
1497
1498        try:
1499            tmpdir = tempfile.mkdtemp(prefix="split-")
1500            os.mkdir(tmpdir+"/keys")
1501        except EnvironmentError:
1502            raise service_error(service_error.internal, "Cannot create tmp dir")
1503
1504        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1505        gw_secretkey_base = "fed.%s" % self.ssh_type
1506        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1507        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1508        tclfile = tmpdir + "/experiment.tcl"
1509        tbparams = { }
1510        try:
1511            access_user = self.accessdb[fid]
1512        except KeyError:
1513            raise service_error(service_error.internal,
1514                    "Access map and authorizer out of sync in " + \
1515                            "create_experiment for fedid %s"  % fid)
1516
1517        pid = "dummy"
1518        gid = "dummy"
1519
1520        # The tcl parser needs to read a file so put the content into that file
1521        descr=req.get('experimentdescription', None)
1522        if descr:
1523            file_content=descr.get('ns2description', None)
1524            if file_content:
1525                try:
1526                    f = open(tclfile, 'w')
1527                    f.write(file_content)
1528                    f.close()
1529                except EnvironmentError:
1530                    raise service_error(service_error.internal,
1531                            "Cannot write temp experiment description")
1532            else:
1533                raise service_error(service_error.req, 
1534                        "Only ns2descriptions supported")
1535        else:
1536            raise service_error(service_error.req, "No experiment description")
1537
1538        self.state_lock.acquire()
1539        if self.state.has_key(key):
1540            self.state[key]['experimentStatus'] = "starting"
1541            for e in self.state[key].get('experimentID',[]):
1542                if not expid and e.has_key('fedid'):
1543                    expid = e['fedid']
1544                elif not eid and e.has_key('localname'):
1545                    eid = e['localname']
1546            if 'experimentAccess' in self.state[key] and \
1547                    'X509' in self.state[key]['experimentAccess']:
1548                expcert = self.state[key]['experimentAccess']['X509']
1549            else:
1550                expcert = None
1551        self.state_lock.release()
1552
1553        if not (eid and expid):
1554            raise service_error(service_error.internal, 
1555                    "Cannot find local experiment info!?")
1556
1557        # make a protected copy of the access certificate so the experiment
1558        # controller can act as the experiment principal.
1559        if expcert and self.auth_type != 'legacy':
1560            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1561            if not expcert_file:
1562                raise service_error(service_error.internal, 
1563                        "Cannot create temp cert file?")
1564        else:
1565            expcert_file = None
1566
1567        try: 
1568            # This catches exceptions to clear the placeholder if necessary
1569            try:
1570                self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1571            except ValueError:
1572                raise service_error(service_error.server_config, 
1573                        "Bad key type (%s)" % self.ssh_type)
1574
1575            # Copy the service request
1576            tb_services = [ s for s in req.get('service',[]) ]
1577            # Translate to topdl
1578            if self.splitter_url:
1579                self.log.debug("Calling remote topdl translator at %s" % \
1580                        self.splitter_url)
1581                top = self.remote_ns2topdl(self.splitter_url, file_content)
1582            else:
1583                tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1584                    str(self.muxmax), '-m', 'dummy']
1585
1586                tclcmd.extend([pid, gid, eid, tclfile])
1587
1588                self.log.debug("running local splitter %s", " ".join(tclcmd))
1589                # This is just fantastic.  As a side effect the parser copies
1590                # tb_compat.tcl into the current directory, so that directory
1591                # must be writable by the fedd user.  Doing this in the
1592                # temporary subdir ensures this is the case.
1593                tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1594                        cwd=tmpdir)
1595                split_data = tclparser.stdout
1596
1597                top = topdl.topology_from_xml(file=split_data, top="experiment")
1598
1599            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1600            # Find the testbeds to look up
1601            testbeds = set([ a.value for e in top.elements \
1602                    for a in e.attribute \
1603                        if a.attribute == 'testbed'])
1604
1605            tb_hosts = { }
1606            for tb in testbeds:
1607                tb_hosts[tb] = [ e.name for e in top.elements \
1608                        if isinstance(e, topdl.Computer) and \
1609                            e.get_attribute('testbed') and \
1610                            e.get_attribute('testbed') == tb]
1611
1612            masters = { }           # testbeds exporting services
1613            pmasters = { }          # Testbeds exporting services that
1614                                    # need portals
1615            for s in tb_services:
1616                # If this is a service request with the importall field
1617                # set, fill it out.
1618
1619                if s.get('importall', False):
1620                    s['import'] = [ tb for tb in testbeds \
1621                            if tb not in s.get('export',[])]
1622                    del s['importall']
1623
1624                # Add the service to masters
1625                for tb in s.get('export', []):
1626                    if s.get('name', None):
1627                        if tb not in masters:
1628                            masters[tb] = [ ]
1629
1630                        params = { }
1631                        if 'fedAttr' in s:
1632                            for a in s['fedAttr']:
1633                                params[a.get('attribute', '')] = \
1634                                        a.get('value','')
1635
1636                        fser = federated_service(name=s['name'],
1637                                exporter=tb, importers=s.get('import',[]),
1638                                params=params)
1639                        if fser.name == 'hide_hosts' \
1640                                and 'hosts' not in fser.params:
1641                            fser.params['hosts'] = \
1642                                    ",".join(tb_hosts.get(fser.exporter, []))
1643                        masters[tb].append(fser)
1644
1645                        if fser.portal:
1646                            if tb not in pmasters: pmasters[tb] = [ fser ]
1647                            else: pmasters[tb].append(fser)
1648                    else:
1649                        self.log.error('Testbed service does not have name " + \
1650                                "and importers')
1651
1652
1653            allocated = { }         # Testbeds we can access
1654            topo ={ }               # Sub topologies
1655            connInfo = { }          # Connection information
1656
1657            if self.auth_type == 'legacy':
1658                self.get_legcay_access_to_testbeds(testbeds, access_user, 
1659                        allocated, tbparams, masters, tbmap)
1660            elif self.auth_type == 'abac':
1661                self.get_access_to_testbeds(testbeds, fid, allocated, 
1662                        tbparams, masters, tbmap, expid, expcert_file)
1663            else:
1664                raise service_error(service_error.internal, 
1665                        "Unknown auth_type %s" % self.auth_type)
1666
1667            self.split_topology(top, topo, testbeds)
1668
1669            # Copy configuration files into the remote file store
1670            # The config urlpath
1671            configpath = "/%s/config" % expid
1672            # The config file system location
1673            configdir ="%s%s" % ( self.repodir, configpath)
1674            try:
1675                os.makedirs(configdir)
1676            except EnvironmentError, e:
1677                raise service_error(service_error.internal,
1678                        "Cannot create config directory: %s" % e)
1679            try:
1680                f = open("%s/hosts" % configdir, "w")
1681                f.write('\n'.join(hosts))
1682                f.close()
1683            except EnvironmentError, e:
1684                raise service_error(service_error.internal, 
1685                        "Cannot write hosts file: %s" % e)
1686            try:
1687                copy_file("%s" % gw_pubkey, "%s/%s" % \
1688                        (configdir, gw_pubkey_base))
1689                copy_file("%s" % gw_secretkey, "%s/%s" % \
1690                        (configdir, gw_secretkey_base))
1691            except EnvironmentError, e:
1692                raise service_error(service_error.internal, 
1693                        "Cannot copy keyfiles: %s" % e)
1694
1695            # Allow the individual testbeds to access the configuration files.
1696            for tb in tbparams.keys():
1697                asignee = tbparams[tb]['allocID']['fedid']
1698                for f in ("hosts", gw_secretkey_base, gw_pubkey_base):
1699                    self.auth.set_attribute(asignee, "%s/%s" % \
1700                            (configpath, f))
1701
1702            part = experiment_partition(self.auth, self.store_url, tbmap,
1703                    self.muxmax, self.direct_transit)
1704            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
1705                    connInfo, expid)
1706            # Now get access to the dynamic testbeds (those added above)
1707            for tb in [ t for t in topo if t not in allocated]:
1708                #XXX: ABAC
1709                if self.auth_type =='legacy':
1710                    self.get_legacy_access(tb, None, tbparams, access_user, 
1711                            masters, tbmap)
1712                elif self.auth_type == 'abac':
1713                    self.get_access(tb, tbparams, fid, masters, tbmap, 
1714                            expid, expcert_file)
1715                else:
1716                    raise service_error(service_error.internal, 
1717                            "Unknown auth_type %s" % self.auth_type)
1718                allocated[tb] = 1
1719                store_keys = topo[tb].get_attribute('store_keys')
1720                # Give the testbed access to keys it exports or imports
1721                if store_keys:
1722                    for sk in store_keys.split(" "):
1723                        self.auth.set_attribute(\
1724                                tbparams[tb]['allocID']['fedid'], sk)
1725            self.auth.save()
1726
1727            self.wrangle_software(expid, top, topo, tbparams)
1728
1729            vtopo = topdl.topology_to_vtopo(top)
1730            vis = self.genviz(vtopo)
1731
1732            # save federant information
1733            for k in allocated.keys():
1734                tbparams[k]['federant'] = {
1735                        'name': [ { 'localname' : eid} ],
1736                        'allocID' : tbparams[k]['allocID'],
1737                        'uri': tbparams[k]['uri'],
1738                    }
1739
1740            self.state_lock.acquire()
1741            self.state[eid]['vtopo'] = vtopo
1742            self.state[eid]['vis'] = vis
1743            self.state[eid]['experimentdescription'] = \
1744                    { 'topdldescription': top.to_dict() }
1745            self.state[eid]['federant'] = \
1746                    [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1747                        if tbparams[tb].has_key('federant') ]
1748            if self.state_filename: 
1749                self.write_state()
1750            self.state_lock.release()
1751        except service_error, e:
1752            # If something goes wrong in the parse (usually an access error)
1753            # clear the placeholder state.  From here on out the code delays
1754            # exceptions.  Failing at this point returns a fault to the remote
1755            # caller.
1756
1757            self.state_lock.acquire()
1758            del self.state[eid]
1759            del self.state[expid]
1760            if self.state_filename: self.write_state()
1761            self.state_lock.release()
1762            if tmpdir and self.cleanup:
1763                self.remove_dirs(tmpdir)
1764            raise e
1765
1766
1767        # Start the background swapper and return the starting state.  From
1768        # here on out, the state will stick around a while.
1769
1770        # Let users touch the state
1771        self.auth.set_attribute(fid, expid)
1772        self.auth.set_attribute(expid, expid)
1773        # Override fedids can manipulate state as well
1774        for o in self.overrides:
1775            self.auth.set_attribute(o, expid)
1776        self.auth.save()
1777
1778        # Create a logger that logs to the experiment's state object as well as
1779        # to the main log file.
1780        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
1781        alloc_collector = self.list_log(self.state[eid]['log'])
1782        h = logging.StreamHandler(alloc_collector)
1783        # XXX: there should be a global one of these rather than repeating the
1784        # code.
1785        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
1786                    '%d %b %y %H:%M:%S'))
1787        alloc_log.addHandler(h)
1788       
1789        attrs = [ 
1790                {
1791                    'attribute': 'ssh_pubkey', 
1792                    'value': '%s/%s/config/%s' % \
1793                            (self.repo_url, expid, gw_pubkey_base)
1794                },
1795                {
1796                    'attribute': 'ssh_secretkey', 
1797                    'value': '%s/%s/config/%s' % \
1798                            (self.repo_url, expid, gw_secretkey_base)
1799                },
1800                {
1801                    'attribute': 'hosts', 
1802                    'value': '%s/%s/config/hosts' % \
1803                            (self.repo_url, expid)
1804                },
1805            ]
1806
1807        # transit and disconnected testbeds may not have a connInfo entry.
1808        # Fill in the blanks.
1809        for t in allocated.keys():
1810            if not connInfo.has_key(t):
1811                connInfo[t] = { }
1812
1813        # Start a thread to do the resource allocation
1814        t  = Thread(target=self.allocate_resources,
1815                args=(allocated, masters, eid, expid, tbparams, 
1816                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
1817                    connInfo, tbmap, expcert_file),
1818                name=eid)
1819        t.start()
1820
1821        rv = {
1822                'experimentID': [
1823                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1824                ],
1825                'experimentStatus': 'starting',
1826            }
1827
1828        return rv
1829   
1830    def get_experiment_fedid(self, key):
1831        """
1832        find the fedid associated with the localname key in the state database.
1833        """
1834
1835        rv = None
1836        self.state_lock.acquire()
1837        if self.state.has_key(key):
1838            if isinstance(self.state[key], dict):
1839                try:
1840                    kl = [ f['fedid'] for f in \
1841                            self.state[key]['experimentID']\
1842                                if f.has_key('fedid') ]
1843                except KeyError:
1844                    self.state_lock.release()
1845                    raise service_error(service_error.internal, 
1846                            "No fedid for experiment %s when getting "+\
1847                                    "fedid(!?)" % key)
1848                if len(kl) == 1:
1849                    rv = kl[0]
1850                else:
1851                    self.state_lock.release()
1852                    raise service_error(service_error.internal, 
1853                            "multiple fedids for experiment %s when " +\
1854                                    "getting fedid(!?)" % key)
1855            else:
1856                self.state_lock.release()
1857                raise service_error(service_error.internal, 
1858                        "Unexpected state for %s" % key)
1859        self.state_lock.release()
1860        return rv
1861
1862    def check_experiment_access(self, fid, key):
1863        """
1864        Confirm that the fid has access to the experiment.  Though a request
1865        may be made in terms of a local name, the access attribute is always
1866        the experiment's fedid.
1867        """
1868        if not isinstance(key, fedid):
1869            key = self.get_experiment_fedid(key)
1870
1871        if self.auth.check_attribute(fid, key):
1872            return True
1873        else:
1874            raise service_error(service_error.access, "Access Denied")
1875
1876
1877    def get_handler(self, path, fid):
1878        self.log.info("Get handler %s %s" % (path, fid))
1879        if self.auth.check_attribute(fid, path):
1880            return ("%s/%s" % (self.repodir, path), "application/binary")
1881        else:
1882            return (None, None)
1883
1884    def get_vtopo(self, req, fid):
1885        """
1886        Return the stored virtual topology for this experiment
1887        """
1888        rv = None
1889        state = None
1890
1891        req = req.get('VtopoRequestBody', None)
1892        if not req:
1893            raise service_error(service_error.req,
1894                    "Bad request format (no VtopoRequestBody)")
1895        exp = req.get('experiment', None)
1896        if exp:
1897            if exp.has_key('fedid'):
1898                key = exp['fedid']
1899                keytype = "fedid"
1900            elif exp.has_key('localname'):
1901                key = exp['localname']
1902                keytype = "localname"
1903            else:
1904                raise service_error(service_error.req, "Unknown lookup type")
1905        else:
1906            raise service_error(service_error.req, "No request?")
1907
1908        self.check_experiment_access(fid, key)
1909
1910        self.state_lock.acquire()
1911        if self.state.has_key(key):
1912            if self.state[key].has_key('vtopo'):
1913                rv = { 'experiment' : {keytype: key },\
1914                        'vtopo': self.state[key]['vtopo'],\
1915                    }
1916            else:
1917                state = self.state[key]['experimentStatus']
1918        self.state_lock.release()
1919
1920        if rv: return rv
1921        else: 
1922            if state:
1923                raise service_error(service_error.partial, 
1924                        "Not ready: %s" % state)
1925            else:
1926                raise service_error(service_error.req, "No such experiment")
1927
1928    def get_vis(self, req, fid):
1929        """
1930        Return the stored visualization for this experiment
1931        """
1932        rv = None
1933        state = None
1934
1935        req = req.get('VisRequestBody', None)
1936        if not req:
1937            raise service_error(service_error.req,
1938                    "Bad request format (no VisRequestBody)")
1939        exp = req.get('experiment', None)
1940        if exp:
1941            if exp.has_key('fedid'):
1942                key = exp['fedid']
1943                keytype = "fedid"
1944            elif exp.has_key('localname'):
1945                key = exp['localname']
1946                keytype = "localname"
1947            else:
1948                raise service_error(service_error.req, "Unknown lookup type")
1949        else:
1950            raise service_error(service_error.req, "No request?")
1951
1952        self.check_experiment_access(fid, key)
1953
1954        self.state_lock.acquire()
1955        if self.state.has_key(key):
1956            if self.state[key].has_key('vis'):
1957                rv =  { 'experiment' : {keytype: key },\
1958                        'vis': self.state[key]['vis'],\
1959                        }
1960            else:
1961                state = self.state[key]['experimentStatus']
1962        self.state_lock.release()
1963
1964        if rv: return rv
1965        else:
1966            if state:
1967                raise service_error(service_error.partial, 
1968                        "Not ready: %s" % state)
1969            else:
1970                raise service_error(service_error.req, "No such experiment")
1971
1972    def clean_info_response(self, rv):
1973        """
1974        Remove the information in the experiment's state object that is not in
1975        the info response.
1976        """
1977        # Remove the owner info (should always be there, but...)
1978        if rv.has_key('owner'): del rv['owner']
1979
1980        # Convert the log into the allocationLog parameter and remove the
1981        # log entry (with defensive programming)
1982        if rv.has_key('log'):
1983            rv['allocationLog'] = "".join(rv['log'])
1984            del rv['log']
1985        else:
1986            rv['allocationLog'] = ""
1987
1988        if rv['experimentStatus'] != 'active':
1989            if rv.has_key('federant'): del rv['federant']
1990        else:
1991            # remove the allocationID and uri info from each federant
1992            for f in rv.get('federant', []):
1993                if f.has_key('allocID'): del f['allocID']
1994                if f.has_key('uri'): del f['uri']
1995
1996        return rv
1997
1998    def get_info(self, req, fid):
1999        """
2000        Return all the stored info about this experiment
2001        """
2002        rv = None
2003
2004        req = req.get('InfoRequestBody', None)
2005        if not req:
2006            raise service_error(service_error.req,
2007                    "Bad request format (no InfoRequestBody)")
2008        exp = req.get('experiment', None)
2009        if exp:
2010            if exp.has_key('fedid'):
2011                key = exp['fedid']
2012                keytype = "fedid"
2013            elif exp.has_key('localname'):
2014                key = exp['localname']
2015                keytype = "localname"
2016            else:
2017                raise service_error(service_error.req, "Unknown lookup type")
2018        else:
2019            raise service_error(service_error.req, "No request?")
2020
2021        self.check_experiment_access(fid, key)
2022
2023        # The state may be massaged by the service function that called
2024        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2025        # state.
2026        self.state_lock.acquire()
2027        if self.state.has_key(key):
2028            rv = copy.deepcopy(self.state[key])
2029        self.state_lock.release()
2030
2031        if rv:
2032            return self.clean_info_response(rv)
2033        else:
2034            raise service_error(service_error.req, "No such experiment")
2035
2036    def get_multi_info(self, req, fid):
2037        """
2038        Return all the stored info that this fedid can access
2039        """
2040        rv = { 'info': [ ] }
2041
2042        self.state_lock.acquire()
2043        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2044            try:
2045                self.check_experiment_access(fid, key)
2046            except service_error, e:
2047                if e.code == service_error.access:
2048                    continue
2049                else:
2050                    self.state_lock.release()
2051                    raise e
2052
2053            if self.state.has_key(key):
2054                e = copy.deepcopy(self.state[key])
2055                e = self.clean_info_response(e)
2056                rv['info'].append(e)
2057        self.state_lock.release()
2058        return rv
2059
2060    def remove_dirs(self, dir):
2061        """
2062        Remove the directory tree and all files rooted at dir.  Log any errors,
2063        but continue.
2064        """
2065        self.log.debug("[removedirs]: removing %s" % dir)
2066        try:
2067            for path, dirs, files in os.walk(dir, topdown=False):
2068                for f in files:
2069                    os.remove(os.path.join(path, f))
2070                for d in dirs:
2071                    os.rmdir(os.path.join(path, d))
2072            os.rmdir(dir)
2073        except EnvironmentError, e:
2074            self.log.error("Error deleting directory tree in %s" % e);
2075
2076    @staticmethod
2077    def make_temp_certfile(expcert, tmpdir):
2078        """
2079        make a protected copy of the access certificate so the experiment
2080        controller can act as the experiment principal.  mkstemp is the most
2081        secure way to do that. The directory should be created by
2082        mkdtemp.  Return the filename.
2083        """
2084        if expcert and tmpdir:
2085            try:
2086                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
2087                f = os.fdopen(certf, 'w')
2088                print >> f, expcert
2089                f.close()
2090            except EnvironmentError, e:
2091                raise service_error(service_error.internal, 
2092                        "Cannot create temp cert file?")
2093            return certfn
2094        else:
2095            return None
2096
2097    def terminate_experiment(self, req, fid):
2098        """
2099        Swap this experiment out on the federants and delete the shared
2100        information
2101        """
2102        tbparams = { }
2103        req = req.get('TerminateRequestBody', None)
2104        if not req:
2105            raise service_error(service_error.req,
2106                    "Bad request format (no TerminateRequestBody)")
2107        force = req.get('force', False)
2108        exp = req.get('experiment', None)
2109        if exp:
2110            if exp.has_key('fedid'):
2111                key = exp['fedid']
2112                keytype = "fedid"
2113            elif exp.has_key('localname'):
2114                key = exp['localname']
2115                keytype = "localname"
2116            else:
2117                raise service_error(service_error.req, "Unknown lookup type")
2118        else:
2119            raise service_error(service_error.req, "No request?")
2120
2121        self.check_experiment_access(fid, key)
2122
2123        dealloc_list = [ ]
2124
2125
2126        # Create a logger that logs to the dealloc_list as well as to the main
2127        # log file.
2128        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2129        h = logging.StreamHandler(self.list_log(dealloc_list))
2130        # XXX: there should be a global one of these rather than repeating the
2131        # code.
2132        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2133                    '%d %b %y %H:%M:%S'))
2134        dealloc_log.addHandler(h)
2135
2136        self.state_lock.acquire()
2137        fed_exp = self.state.get(key, None)
2138        repo = None
2139
2140        if fed_exp:
2141            # This branch of the conditional holds the lock to generate a
2142            # consistent temporary tbparams variable to deallocate experiments.
2143            # It releases the lock to do the deallocations and reacquires it to
2144            # remove the experiment state when the termination is complete.
2145
2146            # First make sure that the experiment creation is complete.
2147            status = fed_exp.get('experimentStatus', None)
2148
2149            if status:
2150                if status in ('starting', 'terminating'):
2151                    if not force:
2152                        self.state_lock.release()
2153                        raise service_error(service_error.partial, 
2154                                'Experiment still being created or destroyed')
2155                    else:
2156                        self.log.warning('Experiment in %s state ' % status + \
2157                                'being terminated by force.')
2158            else:
2159                # No status??? trouble
2160                self.state_lock.release()
2161                raise service_error(service_error.internal,
2162                        "Experiment has no status!?")
2163
2164            ids = []
2165            #  experimentID is a list of dicts that are self-describing
2166            #  identifiers.  This finds all the fedids and localnames - the
2167            #  keys of self.state - and puts them into ids.
2168            for id in fed_exp.get('experimentID', []):
2169                if id.has_key('fedid'): 
2170                    ids.append(id['fedid'])
2171                    repo = "%s" % id['fedid']
2172                if id.has_key('localname'): ids.append(id['localname'])
2173
2174            # Get the experimentAccess - the principal for this experiment.  It
2175            # is this principal to which credentials have been delegated, and
2176            # as which the experiment controller must act.
2177            if 'experimentAccess' in self.state[key] and \
2178                    'X509' in self.state[key]['experimentAccess']:
2179                expcert = self.state[key]['experimentAccess']['X509']
2180            else:
2181                expcert = None
2182            # Collect the allocation/segment ids into a dict keyed by the fedid
2183            # of the allocation (or a monotonically increasing integer) that
2184            # contains a tuple of uri, aid (which is a dict...)
2185            for i, fed in enumerate(fed_exp.get('federant', [])):
2186                try:
2187                    uri = fed['uri']
2188                    aid = fed['allocID']
2189                    k = fed['allocID'].get('fedid', i)
2190                except KeyError, e:
2191                    continue
2192                tbparams[k] = (uri, aid)
2193            fed_exp['experimentStatus'] = 'terminating'
2194            if self.state_filename: self.write_state()
2195            self.state_lock.release()
2196
2197            try:
2198                tmpdir = tempfile.mkdtemp(prefix="split-")
2199            except EnvironmentError:
2200                raise service_error(service_error.internal, 
2201                        "Cannot create tmp dir")
2202            # This try block makes sure the tempdir is cleared
2203            try:
2204                # If no expcert, try the deallocation as the experiment
2205                # controller instance.
2206                if expcert and self.auth_type != 'legacy': 
2207                    cert_file = self.make_temp_certfile(expcert, tmpdir)
2208                    pw = None
2209                else: 
2210                    cert_file = self.cert_file
2211                    pw = self.cert_pwd
2212
2213                # Stop everyone.  NB, wait_for_all waits until a thread starts
2214                # and then completes, so we can't wait if nothing starts.  So,
2215                # no tbparams, no start.
2216                if len(tbparams) > 0:
2217                    tp = thread_pool(self.nthreads)
2218                    for k in tbparams.keys():
2219                        # Create and start a thread to stop the segment
2220                        tp.wait_for_slot()
2221                        uri, aid = tbparams[k]
2222                        t  = pooled_thread(\
2223                                target=self.terminate_segment(log=dealloc_log,
2224                                    testbed=uri,
2225                                    cert_file=cert_file, 
2226                                    cert_pwd=pw,
2227                                    trusted_certs=self.trusted_certs,
2228                                    caller=self.call_TerminateSegment),
2229                                args=(uri, aid), name=k,
2230                                pdata=tp, trace_file=self.trace_file)
2231                        t.start()
2232                    # Wait for completions
2233                    tp.wait_for_all_done()
2234
2235                # release the allocations (failed experiments have done this
2236                # already, and starting experiments may be in odd states, so we
2237                # ignore errors releasing those allocations
2238                try: 
2239                    for k in tbparams.keys():
2240                        # This releases access by uri
2241                        uri, aid = tbparams[k]
2242                        self.release_access(None, aid, uri=uri,
2243                                cert_file=cert_file, cert_pwd=pw)
2244                except service_error, e:
2245                    if status != 'failed' and not force:
2246                        raise e
2247
2248            # Clean up the tmpdir no matter what
2249            finally:
2250                self.remove_dirs(tmpdir)
2251
2252            # Remove the terminated experiment
2253            self.state_lock.acquire()
2254            for id in ids:
2255                if self.state.has_key(id): del self.state[id]
2256
2257            if self.state_filename: self.write_state()
2258            self.state_lock.release()
2259
2260            # Delete any synch points associated with this experiment.  All
2261            # synch points begin with the fedid of the experiment.
2262            fedid_keys = set(["fedid:%s" % f for f in ids \
2263                    if isinstance(f, fedid)])
2264            for k in self.synch_store.all_keys():
2265                try:
2266                    if len(k) > 45 and k[0:46] in fedid_keys:
2267                        self.synch_store.del_value(k)
2268                except synch_store.BadDeletionError:
2269                    pass
2270            self.write_store()
2271
2272            # Remove software and other cached stuff from the filesystem.
2273            if repo:
2274                self.remove_dirs("%s/%s" % (self.repodir, repo))
2275               
2276            return { 
2277                    'experiment': exp , 
2278                    'deallocationLog': "".join(dealloc_list),
2279                    }
2280        else:
2281            # Don't forget to release the lock
2282            self.state_lock.release()
2283            raise service_error(service_error.req, "No saved state")
2284
2285
2286    def GetValue(self, req, fid):
2287        """
2288        Get a value from the synchronized store
2289        """
2290        req = req.get('GetValueRequestBody', None)
2291        if not req:
2292            raise service_error(service_error.req,
2293                    "Bad request format (no GetValueRequestBody)")
2294       
2295        name = req['name']
2296        wait = req['wait']
2297        rv = { 'name': name }
2298
2299        if self.auth.check_attribute(fid, name):
2300            self.log.debug("[GetValue] asking for %s " % name)
2301            try:
2302                v = self.synch_store.get_value(name, wait)
2303            except synch_store.RevokedKeyError:
2304                # No more synch on this key
2305                raise service_error(service_error.federant, 
2306                        "Synch key %s revoked" % name)
2307            if v is not None:
2308                rv['value'] = v
2309            self.log.debug("[GetValue] got %s from %s" % (v, name))
2310            return rv
2311        else:
2312            raise service_error(service_error.access, "Access Denied")
2313       
2314
2315    def SetValue(self, req, fid):
2316        """
2317        Set a value in the synchronized store
2318        """
2319        req = req.get('SetValueRequestBody', None)
2320        if not req:
2321            raise service_error(service_error.req,
2322                    "Bad request format (no SetValueRequestBody)")
2323       
2324        name = req['name']
2325        v = req['value']
2326
2327        if self.auth.check_attribute(fid, name):
2328            try:
2329                self.synch_store.set_value(name, v)
2330                self.write_store()
2331                self.log.debug("[SetValue] set %s to %s" % (name, v))
2332            except synch_store.CollisionError:
2333                # Translate into a service_error
2334                raise service_error(service_error.req,
2335                        "Value already set: %s" %name)
2336            except synch_store.RevokedKeyError:
2337                # No more synch on this key
2338                raise service_error(service_error.federant, 
2339                        "Synch key %s revoked" % name)
2340            return { 'name': name, 'value': v }
2341        else:
2342            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.