source: fedd/federation/experiment_control.py @ e83f2f2

axis_examplecompt_changesinfo-ops
Last change on this file since e83f2f2 was e83f2f2, checked in by Ted Faber <faber@…>, 13 years ago

Move proofs around. Lots of changes, including fault handling.

  • Property mode set to 100644
File size: 84.5 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205            self.get_access = self.legacy_get_access
206        elif self.auth_type == 'abac':
207            self.auth = abac_authorizer(load=self.auth_dir)
208        else:
209            raise service_error(service_error.internal, 
210                    "Unknown auth_type: %s" % self.auth_type)
211
212
213        if self.ssh_pubkey_file:
214            try:
215                f = open(self.ssh_pubkey_file, 'r')
216                self.ssh_pubkey = f.read()
217                f.close()
218            except EnvironmentError:
219                raise service_error(service_error.internal,
220                        "Cannot read sshpubkey")
221        else:
222            raise service_error(service_error.internal, 
223                    "No SSH public key file?")
224
225        if not self.ssh_privkey_file:
226            raise service_error(service_error.internal, 
227                    "No SSH public key file?")
228
229
230        if mapdb_file:
231            self.read_mapdb(mapdb_file)
232        else:
233            self.log.warn("[experiment_control] No testbed map, using defaults")
234            self.tbmap = { 
235                    'deter':'https://users.isi.deterlab.net:23235',
236                    'emulab':'https://users.isi.deterlab.net:23236',
237                    'ucb':'https://users.isi.deterlab.net:23237',
238                    }
239
240        if accessdb_file:
241                self.read_accessdb(accessdb_file)
242        else:
243            raise service_error(service_error.internal,
244                    "No accessdb specified in config")
245
246        # Grab saved state.  OK to do this w/o locking because it's read only
247        # and only one thread should be in existence that can see self.state at
248        # this point.
249        if self.state_filename:
250            self.read_state()
251
252        if self.store_filename:
253            self.read_store()
254        else:
255            self.log.warning("No saved synch store")
256            self.synch_store = synch_store
257
258        # Dispatch tables
259        self.soap_services = {\
260                'New': soap_handler('New', self.new_experiment),
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
266                'Terminate': soap_handler('Terminate', 
267                    self.terminate_experiment),
268                'GetValue': soap_handler('GetValue', self.GetValue),
269                'SetValue': soap_handler('SetValue', self.SetValue),
270        }
271
272        self.xmlrpc_services = {\
273                'New': xmlrpc_handler('New', self.new_experiment),
274                'Create': xmlrpc_handler('Create', self.create_experiment),
275                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
276                'Vis': xmlrpc_handler('Vis', self.get_vis),
277                'Info': xmlrpc_handler('Info', self.get_info),
278                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
279                'Terminate': xmlrpc_handler('Terminate',
280                    self.terminate_experiment),
281                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
282                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
283        }
284
285    # Call while holding self.state_lock
286    def write_state(self):
287        """
288        Write a new copy of experiment state after copying the existing state
289        to a backup.
290
291        State format is a simple pickling of the state dictionary.
292        """
293        if os.access(self.state_filename, os.W_OK):
294            copy_file(self.state_filename, \
295                    "%s.bak" % self.state_filename)
296        try:
297            f = open(self.state_filename, 'w')
298            pickle.dump(self.state, f)
299        except EnvironmentError, e:
300            self.log.error("Can't write file %s: %s" % \
301                    (self.state_filename, e))
302        except pickle.PicklingError, e:
303            self.log.error("Pickling problem: %s" % e)
304        except TypeError, e:
305            self.log.error("Pickling problem (TypeError): %s" % e)
306
307    @staticmethod
308    def get_alloc_ids(state):
309        """
310        Pull the fedids of the identifiers of each allocation from the
311        state.  Again, a dict dive that's best isolated.
312
313        Used by read_store and read state
314        """
315
316        return [ f['allocID']['fedid'] 
317                for f in state.get('federant',[]) \
318                    if f.has_key('allocID') and \
319                        f['allocID'].has_key('fedid')]
320
321    # Call while holding self.state_lock
322    def read_state(self):
323        """
324        Read a new copy of experiment state.  Old state is overwritten.
325
326        State format is a simple pickling of the state dictionary.
327        """
328       
329        def get_experiment_id(state):
330            """
331            Pull the fedid experimentID out of the saved state.  This is kind
332            of a gross walk through the dict.
333            """
334
335            if state.has_key('experimentID'):
336                for e in state['experimentID']:
337                    if e.has_key('fedid'):
338                        return e['fedid']
339                else:
340                    return None
341            else:
342                return None
343
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except EnvironmentError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355       
356        for s in self.state.values():
357            try:
358
359                eid = get_experiment_id(s)
360                if eid : 
361                    if self.auth_type == 'legacy':
362                        # XXX: legacy
363                        # Give the owner rights to the experiment
364                        self.auth.set_attribute(s['owner'], eid)
365                        # And holders of the eid as well
366                        self.auth.set_attribute(eid, eid)
367                        # allow overrides to control experiments as well
368                        for o in self.overrides:
369                            self.auth.set_attribute(o, eid)
370                        # Set permissions to allow reading of the software
371                        # repo, if any, as well.
372                        for a in self.get_alloc_ids(s):
373                            self.auth.set_attribute(a, 'repo/%s' % eid)
374                else:
375                    raise KeyError("No experiment id")
376            except KeyError, e:
377                self.log.warning("[read_state]: State ownership or identity " +\
378                        "misformatted in %s: %s" % (self.state_filename, e))
379
380
381    def read_accessdb(self, accessdb_file):
382        """
383        Read the mapping from fedids that can create experiments to their name
384        in the 3-level access namespace.  All will be asserted from this
385        testbed and can include the local username and porject that will be
386        asserted on their behalf by this fedd.  Each fedid is also added to the
387        authorization system with the "create" attribute.
388        """
389        self.accessdb = {}
390        # These are the regexps for parsing the db
391        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
392        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
394        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
395                "\s*->\s*(" + name_expr + ")\s*$")
396        lineno = 0
397
398        # Parse the mappings and store in self.authdb, a dict of
399        # fedid -> (proj, user)
400        try:
401            f = open(accessdb_file, "r")
402            for line in f:
403                lineno += 1
404                line = line.strip()
405                if len(line) == 0 or line.startswith('#'):
406                    continue
407                m = project_line.match(line)
408                if m:
409                    fid = fedid(hexstr=m.group(1))
410                    project, user = m.group(2,3)
411                    if not self.accessdb.has_key(fid):
412                        self.accessdb[fid] = []
413                    self.accessdb[fid].append((project, user))
414                    continue
415
416                m = user_line.match(line)
417                if m:
418                    fid = fedid(hexstr=m.group(1))
419                    project = None
420                    user = m.group(2)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425                self.log.warn("[experiment_control] Error parsing access " +\
426                        "db %s at line %d" %  (accessdb_file, lineno))
427        except EnvironmentError:
428            raise service_error(service_error.internal,
429                    ("Error opening/reading %s as experiment " +\
430                            "control accessdb") %  accessdb_file)
431        f.close()
432
433        # Initialize the authorization attributes
434        # XXX: legacy
435        if self.auth_type == 'legacy':
436            for fid in self.accessdb.keys():
437                self.auth.set_attribute(fid, 'create')
438                self.auth.set_attribute(fid, 'new')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except EnvironmentError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def read_store(self):
467        try:
468            self.synch_store = synch_store()
469            self.synch_store.load(self.store_filename)
470            self.log.debug("[read_store]: Read store from %s" % \
471                    self.store_filename)
472        except EnvironmentError, e:
473            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
474                    % (self.state_filename, e))
475            self.synch_store = synch_store()
476
477        # Set the initial permissions on data in the store.  XXX: This ad hoc
478        # authorization attribute initialization is getting out of hand.
479        # XXX: legacy
480        if self.auth_type == 'legacy':
481            for k in self.synch_store.all_keys():
482                try:
483                    if k.startswith('fedid:'):
484                        fid = fedid(hexstr=k[6:46])
485                        if self.state.has_key(fid):
486                            for a in self.get_alloc_ids(self.state[fid]):
487                                self.auth.set_attribute(a, k)
488                except ValueError, e:
489                    self.log.warn("Cannot deduce permissions for %s" % k)
490
491
492    def write_store(self):
493        """
494        Write a new copy of synch_store after writing current state
495        to a backup.  We use the internal synch_store pickle method to avoid
496        incinsistent data.
497
498        State format is a simple pickling of the store.
499        """
500        if os.access(self.store_filename, os.W_OK):
501            copy_file(self.store_filename, \
502                    "%s.bak" % self.store_filename)
503        try:
504            self.synch_store.save(self.store_filename)
505        except EnvironmentError, e:
506            self.log.error("Can't write file %s: %s" % \
507                    (self.store_filename, e))
508        except TypeError, e:
509            self.log.error("Pickling problem (TypeError): %s" % e)
510
511
512    def remove_dirs(self, dir):
513        """
514        Remove the directory tree and all files rooted at dir.  Log any errors,
515        but continue.
516        """
517        self.log.debug("[removedirs]: removing %s" % dir)
518        try:
519            for path, dirs, files in os.walk(dir, topdown=False):
520                for f in files:
521                    os.remove(os.path.join(path, f))
522                for d in dirs:
523                    os.rmdir(os.path.join(path, d))
524            os.rmdir(dir)
525        except EnvironmentError, e:
526            self.log.error("Error deleting directory tree in %s" % e);
527
528    @staticmethod
529    def make_temp_certfile(expcert, tmpdir):
530        """
531        make a protected copy of the access certificate so the experiment
532        controller can act as the experiment principal.  mkstemp is the most
533        secure way to do that. The directory should be created by
534        mkdtemp.  Return the filename.
535        """
536        if expcert and tmpdir:
537            try:
538                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
539                f = os.fdopen(certf, 'w')
540                print >> f, expcert
541                f.close()
542            except EnvironmentError, e:
543                raise service_error(service_error.internal, 
544                        "Cannot create temp cert file?")
545            return certfn
546        else:
547            return None
548
549       
550    def generate_ssh_keys(self, dest, type="rsa" ):
551        """
552        Generate a set of keys for the gateways to use to talk.
553
554        Keys are of type type and are stored in the required dest file.
555        """
556        valid_types = ("rsa", "dsa")
557        t = type.lower();
558        if t not in valid_types: raise ValueError
559        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
560
561        try:
562            trace = open("/dev/null", "w")
563        except EnvironmentError:
564            raise service_error(service_error.internal,
565                    "Cannot open /dev/null??");
566
567        # May raise CalledProcessError
568        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
569        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
570        if rv != 0:
571            raise service_error(service_error.internal, 
572                    "Cannot generate nonce ssh keys.  %s return code %d" \
573                            % (self.ssh_keygen, rv))
574
575    def gentopo(self, str):
576        """
577        Generate the topology data structure from the splitter's XML
578        representation of it.
579
580        The topology XML looks like:
581            <experiment>
582                <nodes>
583                    <node><vname></vname><ips>ip1:ip2</ips></node>
584                </nodes>
585                <lans>
586                    <lan>
587                        <vname></vname><vnode></vnode><ip></ip>
588                        <bandwidth></bandwidth><member>node:port</member>
589                    </lan>
590                </lans>
591        """
592        class topo_parse:
593            """
594            Parse the topology XML and create the dats structure.
595            """
596            def __init__(self):
597                # Typing of the subelements for data conversion
598                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
599                self.int_subelements = ( 'bandwidth',)
600                self.float_subelements = ( 'delay',)
601                # The final data structure
602                self.nodes = [ ]
603                self.lans =  [ ]
604                self.topo = { \
605                        'node': self.nodes,\
606                        'lan' : self.lans,\
607                    }
608                self.element = { }  # Current element being created
609                self.chars = ""     # Last text seen
610
611            def end_element(self, name):
612                # After each sub element the contents is added to the current
613                # element or to the appropriate list.
614                if name == 'node':
615                    self.nodes.append(self.element)
616                    self.element = { }
617                elif name == 'lan':
618                    self.lans.append(self.element)
619                    self.element = { }
620                elif name in self.str_subelements:
621                    self.element[name] = self.chars
622                    self.chars = ""
623                elif name in self.int_subelements:
624                    self.element[name] = int(self.chars)
625                    self.chars = ""
626                elif name in self.float_subelements:
627                    self.element[name] = float(self.chars)
628                    self.chars = ""
629
630            def found_chars(self, data):
631                self.chars += data.rstrip()
632
633
634        tp = topo_parse();
635        parser = xml.parsers.expat.ParserCreate()
636        parser.EndElementHandler = tp.end_element
637        parser.CharacterDataHandler = tp.found_chars
638
639        parser.Parse(str)
640
641        return tp.topo
642       
643
644    def genviz(self, topo):
645        """
646        Generate the visualization the virtual topology
647        """
648
649        neato = "/usr/local/bin/neato"
650        # These are used to parse neato output and to create the visualization
651        # file.
652        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
653        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
654                "%s</type></node>"
655
656        try:
657            # Node names
658            nodes = [ n['vname'] for n in topo['node'] ]
659            topo_lans = topo['lan']
660        except KeyError, e:
661            raise service_error(service_error.internal, "Bad topology: %s" %e)
662
663        lans = { }
664        links = { }
665
666        # Walk through the virtual topology, organizing the connections into
667        # 2-node connections (links) and more-than-2-node connections (lans).
668        # When a lan is created, it's added to the list of nodes (there's a
669        # node in the visualization for the lan).
670        for l in topo_lans:
671            if links.has_key(l['vname']):
672                if len(links[l['vname']]) < 2:
673                    links[l['vname']].append(l['vnode'])
674                else:
675                    nodes.append(l['vname'])
676                    lans[l['vname']] = links[l['vname']]
677                    del links[l['vname']]
678                    lans[l['vname']].append(l['vnode'])
679            elif lans.has_key(l['vname']):
680                lans[l['vname']].append(l['vnode'])
681            else:
682                links[l['vname']] = [ l['vnode'] ]
683
684
685        # Open up a temporary file for dot to turn into a visualization
686        try:
687            df, dotname = tempfile.mkstemp()
688            dotfile = os.fdopen(df, 'w')
689        except EnvironmentError:
690            raise service_error(service_error.internal,
691                    "Failed to open file in genviz")
692
693        try:
694            dnull = open('/dev/null', 'w')
695        except EnvironmentError:
696            service_error(service_error.internal,
697                    "Failed to open /dev/null in genviz")
698
699        # Generate a dot/neato input file from the links, nodes and lans
700        try:
701            print >>dotfile, "graph G {"
702            for n in nodes:
703                print >>dotfile, '\t"%s"' % n
704            for l in links.keys():
705                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
706            for l in lans.keys():
707                for n in lans[l]:
708                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
709            print >>dotfile, "}"
710            dotfile.close()
711        except TypeError:
712            raise service_error(service_error.internal,
713                    "Single endpoint link in vtopo")
714        except EnvironmentError:
715            raise service_error(service_error.internal, "Cannot write dot file")
716
717        # Use dot to create a visualization
718        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
719                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
720                close_fds=True)
721        dnull.close()
722
723        # Translate dot to vis format
724        vis_nodes = [ ]
725        vis = { 'node': vis_nodes }
726        for line in dot.stdout:
727            m = vis_re.match(line)
728            if m:
729                vn = m.group(1)
730                vis_node = {'name': vn, \
731                        'x': float(m.group(2)),\
732                        'y' : float(m.group(3)),\
733                    }
734                if vn in links.keys() or vn in lans.keys():
735                    vis_node['type'] = 'lan'
736                else:
737                    vis_node['type'] = 'node'
738                vis_nodes.append(vis_node)
739        rv = dot.wait()
740
741        os.remove(dotname)
742        if rv == 0 : return vis
743        else: return None
744
745
746    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
747            cert_pwd=None):
748        """
749        Release access to testbed through fedd
750        """
751
752        if not uri and tbmap:
753            uri = tbmap.get(tb, None)
754        if not uri:
755            raise service_error(service_error.server_config, 
756                    "Unknown testbed: %s" % tb)
757
758        if self.local_access.has_key(uri):
759            resp = self.local_access[uri].ReleaseAccess(\
760                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
761                    fedid(file=cert_file))
762            resp = { 'ReleaseAccessResponseBody': resp } 
763        else:
764            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
765                    cert_file, cert_pwd, self.trusted_certs)
766
767        # better error coding
768
769    def remote_ns2topdl(self, uri, desc):
770
771        req = {
772                'description' : { 'ns2description': desc },
773            }
774
775        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
776                self.trusted_certs)
777
778        if r.has_key('Ns2TopdlResponseBody'):
779            r = r['Ns2TopdlResponseBody']
780            ed = r.get('experimentdescription', None)
781            if ed.has_key('topdldescription'):
782                return topdl.Topology(**ed['topdldescription'])
783            else:
784                raise service_error(service_error.protocol, 
785                        "Bad splitter response (no output)")
786        else:
787            raise service_error(service_error.protocol, "Bad splitter response")
788
789    class start_segment:
790        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
791                cert_pwd=None, trusted_certs=None, caller=None,
792                log_collector=None):
793            self.log = log
794            self.debug = debug
795            self.cert_file = cert_file
796            self.cert_pwd = cert_pwd
797            self.trusted_certs = None
798            self.caller = caller
799            self.testbed = testbed
800            self.log_collector = log_collector
801            self.response = None
802            self.node = { }
803            self.proof = None
804
805        def make_map(self, resp):
806            for e in resp.get('embedding', []):
807                if 'toponame' in e and 'physname' in e:
808                    self.node[e['toponame']] = e['physname'][0]
809
810        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
811            req = {
812                    'allocID': { 'fedid' : aid }, 
813                    'segmentdescription': { 
814                        'topdldescription': topo.to_dict(),
815                    },
816                }
817
818            if connInfo:
819                req['connection'] = connInfo
820
821            import_svcs = [ s for m in masters.values() \
822                    for s in m if self.testbed in s.importers]
823
824            if import_svcs or self.testbed in masters:
825                req['service'] = []
826
827            for s in import_svcs:
828                for r in s.reqs:
829                    sr = copy.deepcopy(r)
830                    sr['visibility'] = 'import';
831                    req['service'].append(sr)
832
833            for s in masters.get(self.testbed, []):
834                for r in s.reqs:
835                    sr = copy.deepcopy(r)
836                    sr['visibility'] = 'export';
837                    req['service'].append(sr)
838
839            if attrs:
840                req['fedAttr'] = attrs
841
842            try:
843                self.log.debug("Calling StartSegment at %s " % uri)
844                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
845                        self.trusted_certs)
846                if r.has_key('StartSegmentResponseBody'):
847                    lval = r['StartSegmentResponseBody'].get('allocationLog',
848                            None)
849                    if lval and self.log_collector:
850                        for line in  lval.splitlines(True):
851                            self.log_collector.write(line)
852                    self.make_map(r['StartSegmentResponseBody'])
853                    if 'proof' in r: self.proof = r['proof']
854                    self.response = r
855                else:
856                    raise service_error(service_error.internal, 
857                            "Bad response!?: %s" %r)
858                return True
859            except service_error, e:
860                self.log.error("Start segment failed on %s: %s" % \
861                        (self.testbed, e))
862                return False
863
864
865
866    class terminate_segment:
867        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
868                cert_pwd=None, trusted_certs=None, caller=None):
869            self.log = log
870            self.debug = debug
871            self.cert_file = cert_file
872            self.cert_pwd = cert_pwd
873            self.trusted_certs = None
874            self.caller = caller
875            self.testbed = testbed
876
877        def __call__(self, uri, aid ):
878            req = {
879                    'allocID': aid , 
880                }
881            try:
882                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
883                        self.trusted_certs)
884                return True
885            except service_error, e:
886                self.log.error("Terminate segment failed on %s: %s" % \
887                        (self.testbed, e))
888                return False
889
890    def allocate_resources(self, allocated, masters, eid, expid, 
891            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
892            attrs=None, connInfo={}, tbmap=None, expcert=None):
893
894        started = { }           # Testbeds where a sub-experiment started
895                                # successfully
896
897        # XXX
898        fail_soft = False
899
900        if tbmap is None: tbmap = { }
901
902        log = alloc_log or self.log
903
904        tp = thread_pool(self.nthreads)
905        threads = [ ]
906        starters = [ ]
907
908        if expcert:
909            cert = expcert
910            pw = None
911        else:
912            cert = self.cert_file
913            pw = self.cert_pwd
914
915        for tb in allocated.keys():
916            # Create and start a thread to start the segment, and save it
917            # to get the return value later
918            tb_attrs = copy.copy(attrs)
919            tp.wait_for_slot()
920            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
921            base, suffix = split_testbed(tb)
922            if suffix:
923                tb_attrs.append({'attribute': 'experiment_name', 
924                    'value': "%s-%s" % (eid, suffix)})
925            else:
926                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
927            if not uri:
928                raise service_error(service_error.internal, 
929                        "Unknown testbed %s !?" % tb)
930
931            if tbparams[tb].has_key('allocID') and \
932                    tbparams[tb]['allocID'].has_key('fedid'):
933                aid = tbparams[tb]['allocID']['fedid']
934            else:
935                raise service_error(service_error.internal, 
936                        "No alloc id for testbed %s !?" % tb)
937
938            s = self.start_segment(log=log, debug=self.debug,
939                    testbed=tb, cert_file=cert,
940                    cert_pwd=pw, trusted_certs=self.trusted_certs,
941                    caller=self.call_StartSegment,
942                    log_collector=log_collector)
943            starters.append(s)
944            t  = pooled_thread(\
945                    target=s, name=tb,
946                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
947                    pdata=tp, trace_file=self.trace_file)
948            threads.append(t)
949            t.start()
950
951        # Wait until all finish (keep pinging the log, though)
952        mins = 0
953        revoked = False
954        while not tp.wait_for_all_done(60.0):
955            mins += 1
956            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
957                    % mins)
958            if not revoked and \
959                    len([ t.getName() for t in threads if t.rv == False]) > 0:
960                # a testbed has failed.  Revoke this experiment's
961                # synchronizarion values so that sub experiments will not
962                # deadlock waiting for synchronization that will never happen
963                self.log.info("A subexperiment has failed to swap in, " + \
964                        "revoking synch keys")
965                var_key = "fedid:%s" % expid
966                for k in self.synch_store.all_keys():
967                    if len(k) > 45 and k[0:46] == var_key:
968                        self.synch_store.revoke_key(k)
969                revoked = True
970
971        failed = [ t.getName() for t in threads if not t.rv ]
972        succeeded = [tb for tb in allocated.keys() if tb not in failed]
973
974        # If one failed clean up, unless fail_soft is set
975        if failed:
976            if not fail_soft:
977                tp.clear()
978                for tb in succeeded:
979                    # Create and start a thread to stop the segment
980                    tp.wait_for_slot()
981                    uri = tbparams[tb]['uri']
982                    t  = pooled_thread(\
983                            target=self.terminate_segment(log=log,
984                                testbed=tb,
985                                cert_file=cert, 
986                                cert_pwd=pw,
987                                trusted_certs=self.trusted_certs,
988                                caller=self.call_TerminateSegment),
989                            args=(uri, tbparams[tb]['federant']['allocID']),
990                            name=tb,
991                            pdata=tp, trace_file=self.trace_file)
992                    t.start()
993                # Wait until all finish (if any are being stopped)
994                if succeeded:
995                    tp.wait_for_all_done()
996
997                # release the allocations
998                for tb in tbparams.keys():
999                    try:
1000                        self.release_access(tb, tbparams[tb]['allocID'], 
1001                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1002                                cert_file=cert, cert_pwd=pw)
1003                    except service_error, e:
1004                        self.log.warn("Error releasing access: %s" % e.desc)
1005                # Remove the placeholder
1006                self.state_lock.acquire()
1007                self.state[eid]['experimentStatus'] = 'failed'
1008                if self.state_filename: self.write_state()
1009                self.state_lock.release()
1010                # Remove the repo dir
1011                self.remove_dirs("%s/%s" %(self.repodir, expid))
1012                # Walk up tmpdir, deleting as we go
1013                if self.cleanup:
1014                    self.remove_dirs(tmpdir)
1015                else:
1016                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1017
1018
1019                log.error("Swap in failed on %s" % ",".join(failed))
1020                return
1021        else:
1022            # Walk through the successes and gather the virtual to physical
1023            # mapping.
1024            embedding = [ ]
1025            proofs = { }
1026            for s in starters:
1027                for k, v in s.node.items():
1028                    embedding.append({
1029                        'toponame': k, 
1030                        'physname': [ v],
1031                        'testbed': s.testbed
1032                        })
1033                if s.proof:
1034                    proofs[s.testbed] = s.proof
1035            log.info("[start_segment]: Experiment %s active" % eid)
1036
1037
1038        # Walk up tmpdir, deleting as we go
1039        if self.cleanup:
1040            self.remove_dirs(tmpdir)
1041        else:
1042            log.debug("[start_experiment]: not removing %s" % tmpdir)
1043
1044        # Insert the experiment into our state and update the disk copy.
1045        self.state_lock.acquire()
1046        self.state[expid]['experimentStatus'] = 'active'
1047        self.state[eid] = self.state[expid]
1048        self.state[eid]['experimentdescription']['topdldescription'] = \
1049                top.to_dict()
1050        self.state[eid]['embedding'] = embedding
1051        # Append startup proofs
1052        for f in self.state[eid]['federant']:
1053            if 'name' in f and 'localname' in f['name']:
1054                if f['name']['localname'] in proofs:
1055                    f['proof'].append(proofs[f['name']['localname']])
1056
1057        if self.state_filename: self.write_state()
1058        self.state_lock.release()
1059        return
1060
1061
1062    def add_kit(self, e, kit):
1063        """
1064        Add a Software object created from the list of (install, location)
1065        tuples passed as kit  to the software attribute of an object e.  We
1066        do this enough to break out the code, but it's kind of a hack to
1067        avoid changing the old tuple rep.
1068        """
1069
1070        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1071
1072        if isinstance(e.software, list): e.software.extend(s)
1073        else: e.software = s
1074
1075    def append_experiment_authorization(self, expid, attrs, 
1076            need_state_lock=True):
1077        """
1078        Append the authorization information to system state
1079        """
1080
1081        for p, a in attrs:
1082            self.auth.set_attribute(p, a)
1083        self.auth.save()
1084
1085        if need_state_lock: self.state_lock.acquire()
1086        self.state[expid]['auth'].update(attrs)
1087        if self.state_filename: self.write_state()
1088        if need_state_lock: self.state_lock.release()
1089
1090    def clear_experiment_authorization(self, expid, need_state_lock=True):
1091        """
1092        Attrs is a set of attribute principal pairs that need to be removed
1093        from the authenticator.  Remove them and save the authenticator.
1094        """
1095
1096        if need_state_lock: self.state_lock.acquire()
1097        if expid in self.state and 'auth' in self.state[expid]:
1098            for p, a in self.state[expid]['auth']:
1099                self.auth.unset_attribute(p, a)
1100            self.state[expid]['auth'] = set()
1101        if self.state_filename: self.write_state()
1102        if need_state_lock: self.state_lock.release()
1103        self.auth.save()
1104
1105
1106    def create_experiment_state(self, fid, req, expid, expcert,
1107            state='starting'):
1108        """
1109        Create the initial entry in the experiment's state.  The expid and
1110        expcert are the experiment's fedid and certifacte that represents that
1111        ID, which are installed in the experiment state.  If the request
1112        includes a suggested local name that is used if possible.  If the local
1113        name is already taken by an experiment owned by this user that has
1114        failed, it is overwritten.  Otherwise new letters are added until a
1115        valid localname is found.  The generated local name is returned.
1116        """
1117
1118        if req.has_key('experimentID') and \
1119                req['experimentID'].has_key('localname'):
1120            overwrite = False
1121            eid = req['experimentID']['localname']
1122            # If there's an old failed experiment here with the same local name
1123            # and accessible by this user, we'll overwrite it, otherwise we'll
1124            # fall through and do the collision avoidance.
1125            old_expid = self.get_experiment_fedid(eid)
1126            if old_expid and self.check_experiment_access(fid, old_expid):
1127                self.state_lock.acquire()
1128                status = self.state[eid].get('experimentStatus', None)
1129                if status and status == 'failed':
1130                    # remove the old access attributes
1131                    self.clear_experiment_authorization(eid,
1132                            need_state_lock=False)
1133                    overwrite = True
1134                    del self.state[eid]
1135                    del self.state[old_expid]
1136                self.state_lock.release()
1137            self.state_lock.acquire()
1138            while (self.state.has_key(eid) and not overwrite):
1139                eid += random.choice(string.ascii_letters)
1140            # Initial state
1141            self.state[eid] = {
1142                    'experimentID' : \
1143                            [ { 'localname' : eid }, {'fedid': expid } ],
1144                    'experimentStatus': state,
1145                    'experimentAccess': { 'X509' : expcert },
1146                    'owner': fid,
1147                    'log' : [],
1148                    'auth': set(),
1149                }
1150            self.state[expid] = self.state[eid]
1151            if self.state_filename: self.write_state()
1152            self.state_lock.release()
1153        else:
1154            eid = self.exp_stem
1155            for i in range(0,5):
1156                eid += random.choice(string.ascii_letters)
1157            self.state_lock.acquire()
1158            while (self.state.has_key(eid)):
1159                eid = self.exp_stem
1160                for i in range(0,5):
1161                    eid += random.choice(string.ascii_letters)
1162            # Initial state
1163            self.state[eid] = {
1164                    'experimentID' : \
1165                            [ { 'localname' : eid }, {'fedid': expid } ],
1166                    'experimentStatus': state,
1167                    'experimentAccess': { 'X509' : expcert },
1168                    'owner': fid,
1169                    'log' : [],
1170                    'auth': set(),
1171                }
1172            self.state[expid] = self.state[eid]
1173            if self.state_filename: self.write_state()
1174            self.state_lock.release()
1175
1176        # Let users touch the state.  Authorize this fid and the expid itself
1177        # to touch the experiment, as well as allowing th eoverrides.
1178        self.append_experiment_authorization(eid, 
1179                set([(fid, expid), (expid,expid)] + \
1180                        [ (o, expid) for o in self.overrides]))
1181
1182        return eid
1183
1184
1185    def allocate_ips_to_topo(self, top):
1186        """
1187        Add an ip4_address attribute to all the hosts in the topology, based on
1188        the shared substrates on which they sit.  An /etc/hosts file is also
1189        created and returned as a list of hostfiles entries.  We also return
1190        the allocator, because we may need to allocate IPs to portals
1191        (specifically DRAGON portals).
1192        """
1193        subs = sorted(top.substrates, 
1194                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1195                reverse=True)
1196        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1197        ifs = { }
1198        hosts = [ ]
1199
1200        for idx, s in enumerate(subs):
1201            net_size = len(s.interfaces)+2
1202
1203            a = ips.allocate(net_size)
1204            if a :
1205                base, num = a
1206                if num < net_size: 
1207                    raise service_error(service_error.internal,
1208                            "Allocator returned wrong number of IPs??")
1209            else:
1210                raise service_error(service_error.req, 
1211                        "Cannot allocate IP addresses")
1212            mask = ips.min_alloc
1213            while mask < net_size:
1214                mask *= 2
1215
1216            netmask = ((2**32-1) ^ (mask-1))
1217
1218            base += 1
1219            for i in s.interfaces:
1220                i.attribute.append(
1221                        topdl.Attribute('ip4_address', 
1222                            "%s" % ip_addr(base)))
1223                i.attribute.append(
1224                        topdl.Attribute('ip4_netmask', 
1225                            "%s" % ip_addr(int(netmask))))
1226
1227                hname = i.element.name
1228                if ifs.has_key(hname):
1229                    hosts.append("%s\t%s-%s %s-%d" % \
1230                            (ip_addr(base), hname, s.name, hname,
1231                                ifs[hname]))
1232                else:
1233                    ifs[hname] = 0
1234                    hosts.append("%s\t%s-%s %s-%d %s" % \
1235                            (ip_addr(base), hname, s.name, hname,
1236                                ifs[hname], hname))
1237
1238                ifs[hname] += 1
1239                base += 1
1240        return hosts, ips
1241
1242    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1243            tbparam, masters, tbmap, expid=None, expcert=None):
1244        for tb in testbeds:
1245            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1246                    expcert)
1247            allocated[tb] = 1
1248
1249    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1250            expcert=None):
1251        """
1252        Get access to testbed through fedd and set the parameters for that tb
1253        """
1254        def get_export_project(svcs):
1255            """
1256            Look through for the list of federated_service for this testbed
1257            objects for a project_export service, and extract the project
1258            parameter.
1259            """
1260
1261            pe = [s for s in svcs if s.name=='project_export']
1262            if len(pe) == 1:
1263                return pe[0].params.get('project', None)
1264            elif len(pe) == 0:
1265                return None
1266            else:
1267                raise service_error(service_error.req,
1268                        "More than one project export is not supported")
1269
1270        def add_services(svcs, type, slist):
1271            """
1272            Add the given services to slist.  type is import or export.
1273            """
1274            for i, s in enumerate(svcs):
1275                idx = '%s%d' % (type, i)
1276                sr = {'id': idx, 'name': s.name, 'visibility': type }
1277                if s.params:
1278                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1279                            for k, v in s.params.items()]
1280                slist.append(sr)
1281
1282        uri = tbmap.get(testbed_base(tb), None)
1283        if not uri:
1284            raise service_error(service_error.server_config, 
1285                    "Unknown testbed: %s" % tb)
1286
1287        export_svcs = masters.get(tb,[])
1288        import_svcs = [ s for m in masters.values() \
1289                for s in m \
1290                    if tb in s.importers ]
1291
1292        export_project = get_export_project(export_svcs)
1293        # Compose the credential list so that IDs come before attributes
1294        creds = set()
1295        keys = set()
1296        certs = self.auth.get_creds_for_principal(fid)
1297        if expid:
1298            certs.update(self.auth.get_creds_for_principal(expid))
1299        for c in certs:
1300            keys.add(c.issuer_cert())
1301            creds.add(c.attribute_cert())
1302        creds = list(keys) + list(creds)
1303
1304        if expcert: cert, pw = expcert, None
1305        else: cert, pw = self.cert_file, self.cert_pw
1306
1307        # Request credentials
1308        req = {
1309                'abac_credential': creds,
1310            }
1311        # Make the service request from the services we're importing and
1312        # exporting.  Keep track of the export request ids so we can
1313        # collect the resulting info from the access response.
1314        e_keys = { }
1315        if import_svcs or export_svcs:
1316            slist = []
1317            add_services(import_svcs, 'import', slist)
1318            add_services(export_svcs, 'export', slist)
1319            req['service'] = slist
1320
1321        if self.local_access.has_key(uri):
1322            # Local access call
1323            req = { 'RequestAccessRequestBody' : req }
1324            r = self.local_access[uri].RequestAccess(req, 
1325                    fedid(file=self.cert_file))
1326            r = { 'RequestAccessResponseBody' : r }
1327        else:
1328            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1329
1330        if r.has_key('RequestAccessResponseBody'):
1331            # Through to here we have a valid response, not a fault.
1332            # Access denied is a fault, so something better or worse than
1333            # access denied has happened.
1334            r = r['RequestAccessResponseBody']
1335            self.log.debug("[get_access] Access granted")
1336        else:
1337            raise service_error(service_error.protocol,
1338                        "Bad proxy response")
1339        if 'proof' not in r:
1340            raise service_error(service_error.protocol,
1341                        "Bad access response (no access proof)")
1342       
1343        tbparam[tb] = { 
1344                "allocID" : r['allocID'],
1345                "uri": uri,
1346                "proof": [ r['proof'] ],
1347                }
1348
1349        # Collect the responses corresponding to the services this testbed
1350        # exports.  These will be the service requests that we will include in
1351        # the start segment requests (with appropriate visibility values) to
1352        # import and export the segments.
1353        for s in r.get('service', []):
1354            id = s.get('id', None)
1355            if id and id in e_keys:
1356                e_keys[id].reqs.append(s)
1357
1358        # Add attributes to parameter space.  We don't allow attributes to
1359        # overlay any parameters already installed.
1360        for a in r.get('fedAttr', []):
1361            try:
1362                if a['attribute'] and \
1363                        isinstance(a['attribute'], basestring)\
1364                        and not tbparam[tb].has_key(a['attribute'].lower()):
1365                    tbparam[tb][a['attribute'].lower()] = a['value']
1366            except KeyError:
1367                self.log.error("Bad attribute in response: %s" % a)
1368
1369
1370    def split_topology(self, top, topo, testbeds):
1371        """
1372        Create the sub-topologies that are needed for experiment instantiation.
1373        """
1374        for tb in testbeds:
1375            topo[tb] = top.clone()
1376            # copy in for loop allows deletions from the original
1377            for e in [ e for e in topo[tb].elements]:
1378                etb = e.get_attribute('testbed')
1379                # NB: elements without a testbed attribute won't appear in any
1380                # sub topologies. 
1381                if not etb or etb != tb:
1382                    for i in e.interface:
1383                        for s in i.subs:
1384                            try:
1385                                s.interfaces.remove(i)
1386                            except ValueError:
1387                                raise service_error(service_error.internal,
1388                                        "Can't remove interface??")
1389                    topo[tb].elements.remove(e)
1390            topo[tb].make_indices()
1391
1392    def confirm_software(self, top):
1393        """
1394        Make sure that the software to be loaded in the topo is all available
1395        before we begin making access requests, etc.  This is a subset of
1396        wrangle_software.
1397        """
1398        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1399        pkgs.update([x.location for e in top.elements for x in e.software])
1400
1401        for pkg in pkgs:
1402            loc = pkg
1403
1404            scheme, host, path = urlparse(loc)[0:3]
1405            dest = os.path.basename(path)
1406            if not scheme:
1407                if not loc.startswith('/'):
1408                    loc = "/%s" % loc
1409                loc = "file://%s" %loc
1410            # NB: if scheme was found, loc == pkg
1411            try:
1412                u = urlopen(loc)
1413                u.close()
1414            except Exception, e:
1415                raise service_error(service_error.req, 
1416                        "Cannot open %s: %s" % (loc, e))
1417        return True
1418
1419    def wrangle_software(self, expid, top, topo, tbparams):
1420        """
1421        Copy software out to the repository directory, allocate permissions and
1422        rewrite the segment topologies to look for the software in local
1423        places.
1424        """
1425
1426        # Copy the rpms and tarfiles to a distribution directory from
1427        # which the federants can retrieve them
1428        linkpath = "%s/software" %  expid
1429        softdir ="%s/%s" % ( self.repodir, linkpath)
1430        softmap = { }
1431
1432        # self.fedkit and self.gateway kit are lists of tuples of
1433        # (install_location, download_location) this extracts the download
1434        # locations.
1435        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1436        pkgs.update([x.location for e in top.elements for x in e.software])
1437        try:
1438            os.makedirs(softdir)
1439        except EnvironmentError, e:
1440            raise service_error(
1441                    "Cannot create software directory: %s" % e)
1442        # The actual copying.  Everything's converted into a url for copying.
1443        auth_attrs = set()
1444        for pkg in pkgs:
1445            loc = pkg
1446
1447            scheme, host, path = urlparse(loc)[0:3]
1448            dest = os.path.basename(path)
1449            if not scheme:
1450                if not loc.startswith('/'):
1451                    loc = "/%s" % loc
1452                loc = "file://%s" %loc
1453            # NB: if scheme was found, loc == pkg
1454            try:
1455                u = urlopen(loc)
1456            except Exception, e:
1457                raise service_error(service_error.req, 
1458                        "Cannot open %s: %s" % (loc, e))
1459            try:
1460                f = open("%s/%s" % (softdir, dest) , "w")
1461                self.log.debug("Writing %s/%s" % (softdir,dest) )
1462                data = u.read(4096)
1463                while data:
1464                    f.write(data)
1465                    data = u.read(4096)
1466                f.close()
1467                u.close()
1468            except Exception, e:
1469                raise service_error(service_error.internal,
1470                        "Could not copy %s: %s" % (loc, e))
1471            path = re.sub("/tmp", "", linkpath)
1472            # XXX
1473            softmap[pkg] = \
1474                    "%s/%s/%s" %\
1475                    ( self.repo_url, path, dest)
1476
1477            # Allow the individual segments to access the software by assigning
1478            # an attribute to each testbed allocation that encodes the data to
1479            # be released.  This expression collects the data for each run of
1480            # the loop.
1481            auth_attrs.update([
1482                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1483                        for tb in tbparams.keys()])
1484
1485        self.append_experiment_authorization(expid, auth_attrs)
1486
1487        # Convert the software locations in the segments into the local
1488        # copies on this host
1489        for soft in [ s for tb in topo.values() \
1490                for e in tb.elements \
1491                    if getattr(e, 'software', False) \
1492                        for s in e.software ]:
1493            if softmap.has_key(soft.location):
1494                soft.location = softmap[soft.location]
1495
1496
1497    def new_experiment(self, req, fid):
1498        """
1499        The external interface to empty initial experiment creation called from
1500        the dispatcher.
1501
1502        Creates a working directory, splits the incoming description using the
1503        splitter script and parses out the avrious subsections using the
1504        lcasses above.  Once each sub-experiment is created, use pooled threads
1505        to instantiate them and start it all up.
1506        """
1507        req = req.get('NewRequestBody', None)
1508        if not req:
1509            raise service_error(service_error.req,
1510                    "Bad request format (no NewRequestBody)")
1511
1512        if self.auth.import_credentials(data_list=req.get('credential', [])):
1513            self.auth.save()
1514
1515        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1516                with_proof=True)
1517
1518        if not access_ok:
1519            raise service_error(service_error.access, "New access denied",
1520                    proof=[proof])
1521
1522        try:
1523            tmpdir = tempfile.mkdtemp(prefix="split-")
1524        except EnvironmentError:
1525            raise service_error(service_error.internal, "Cannot create tmp dir")
1526
1527        try:
1528            access_user = self.accessdb[fid]
1529        except KeyError:
1530            raise service_error(service_error.internal,
1531                    "Access map and authorizer out of sync in " + \
1532                            "new_experiment for fedid %s"  % fid)
1533
1534        # Generate an ID for the experiment (slice) and a certificate that the
1535        # allocator can use to prove they own it.  We'll ship it back through
1536        # the encrypted connection.  If the requester supplied one, use it.
1537        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1538            expcert = req['experimentAccess']['X509']
1539            expid = fedid(certstr=expcert)
1540            self.state_lock.acquire()
1541            if expid in self.state:
1542                self.state_lock.release()
1543                raise service_error(service_error.req, 
1544                        'fedid %s identifies an existing experiment' % expid)
1545            self.state_lock.release()
1546        else:
1547            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1548
1549        #now we're done with the tmpdir, and it should be empty
1550        if self.cleanup:
1551            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1552            os.rmdir(tmpdir)
1553        else:
1554            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1555
1556        eid = self.create_experiment_state(fid, req, expid, expcert, 
1557                state='empty')
1558
1559        rv = {
1560                'experimentID': [
1561                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1562                ],
1563                'experimentStatus': 'empty',
1564                'experimentAccess': { 'X509' : expcert },
1565                'proof': proof.to_dict(),
1566            }
1567
1568        return rv
1569
1570    # create_experiment sub-functions
1571
1572    @staticmethod
1573    def get_experiment_key(req, field='experimentID'):
1574        """
1575        Parse the experiment identifiers out of the request (the request body
1576        tag has been removed).  Specifically this pulls either the fedid or the
1577        localname out of the experimentID field.  A fedid is preferred.  If
1578        neither is present or the request does not contain the fields,
1579        service_errors are raised.
1580        """
1581        # Get the experiment access
1582        exp = req.get(field, None)
1583        if exp:
1584            if exp.has_key('fedid'):
1585                key = exp['fedid']
1586            elif exp.has_key('localname'):
1587                key = exp['localname']
1588            else:
1589                raise service_error(service_error.req, "Unknown lookup type")
1590        else:
1591            raise service_error(service_error.req, "No request?")
1592
1593        return key
1594
1595    def get_experiment_ids_and_start(self, key, tmpdir):
1596        """
1597        Get the experiment name, id and access certificate from the state, and
1598        set the experiment state to 'starting'.  returns a triple (fedid,
1599        localname, access_cert_file). The access_cert_file is a copy of the
1600        contents of the access certificate, created in the tempdir with
1601        restricted permissions.  If things are confused, raise an exception.
1602        """
1603
1604        expid = eid = None
1605        self.state_lock.acquire()
1606        if self.state.has_key(key):
1607            self.state[key]['experimentStatus'] = "starting"
1608            for e in self.state[key].get('experimentID',[]):
1609                if not expid and e.has_key('fedid'):
1610                    expid = e['fedid']
1611                elif not eid and e.has_key('localname'):
1612                    eid = e['localname']
1613            if 'experimentAccess' in self.state[key] and \
1614                    'X509' in self.state[key]['experimentAccess']:
1615                expcert = self.state[key]['experimentAccess']['X509']
1616            else:
1617                expcert = None
1618        self.state_lock.release()
1619
1620        # make a protected copy of the access certificate so the experiment
1621        # controller can act as the experiment principal.
1622        if expcert:
1623            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1624            if not expcert_file:
1625                raise service_error(service_error.internal, 
1626                        "Cannot create temp cert file?")
1627        else:
1628            expcert_file = None
1629
1630        return (eid, expid, expcert_file)
1631
1632    def get_topology(self, req, tmpdir):
1633        """
1634        Get the ns2 content and put it into a file for parsing.  Call the local
1635        or remote parser and return the topdl.Topology.  Errors result in
1636        exceptions.  req is the request and tmpdir is a work directory.
1637        """
1638
1639        # The tcl parser needs to read a file so put the content into that file
1640        descr=req.get('experimentdescription', None)
1641        if descr:
1642            if 'ns2description' in descr:
1643                file_content=descr['ns2description']
1644            elif 'topdldescription' in descr:
1645                return topdl.Topology(**descr['topdldescription'])
1646            else:
1647                raise service_error(service_error.req, 
1648                        'Unknown experiment description type')
1649        else:
1650            raise service_error(service_error.req, "No experiment description")
1651
1652
1653        if self.splitter_url:
1654            self.log.debug("Calling remote topdl translator at %s" % \
1655                    self.splitter_url)
1656            top = self.remote_ns2topdl(self.splitter_url, file_content)
1657        else:
1658            tclfile = os.path.join(tmpdir, "experiment.tcl")
1659            if file_content:
1660                try:
1661                    f = open(tclfile, 'w')
1662                    f.write(file_content)
1663                    f.close()
1664                except EnvironmentError:
1665                    raise service_error(service_error.internal,
1666                            "Cannot write temp experiment description")
1667            else:
1668                raise service_error(service_error.req, 
1669                        "Only ns2descriptions supported")
1670            pid = "dummy"
1671            gid = "dummy"
1672            eid = "dummy"
1673
1674            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1675                str(self.muxmax), '-m', 'dummy']
1676
1677            tclcmd.extend([pid, gid, eid, tclfile])
1678
1679            self.log.debug("running local splitter %s", " ".join(tclcmd))
1680            # This is just fantastic.  As a side effect the parser copies
1681            # tb_compat.tcl into the current directory, so that directory
1682            # must be writable by the fedd user.  Doing this in the
1683            # temporary subdir ensures this is the case.
1684            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1685                    cwd=tmpdir)
1686            split_data = tclparser.stdout
1687
1688            top = topdl.topology_from_xml(file=split_data, top="experiment")
1689            os.remove(tclfile)
1690
1691        return top
1692
1693    def get_testbed_services(self, req, testbeds):
1694        """
1695        Parse the services section of the request into into two dicts mapping
1696        testbed to lists of federated_service objects.  The first lists all
1697        exporters of services, and the second all exporters of services that
1698        need control portals int the experiment.
1699        """
1700        masters = { }
1701        pmasters = { }
1702        for s in req.get('service', []):
1703            # If this is a service request with the importall field
1704            # set, fill it out.
1705
1706            if s.get('importall', False):
1707                s['import'] = [ tb for tb in testbeds \
1708                        if tb not in s.get('export',[])]
1709                del s['importall']
1710
1711            # Add the service to masters
1712            for tb in s.get('export', []):
1713                if s.get('name', None):
1714
1715                    params = { }
1716                    for a in s.get('fedAttr', []):
1717                        params[a.get('attribute', '')] = a.get('value','')
1718
1719                    fser = federated_service(name=s['name'],
1720                            exporter=tb, importers=s.get('import',[]),
1721                            params=params)
1722                    if fser.name == 'hide_hosts' \
1723                            and 'hosts' not in fser.params:
1724                        fser.params['hosts'] = \
1725                                ",".join(tb_hosts.get(fser.exporter, []))
1726                    if tb in masters: masters[tb].append(fser)
1727                    else: masters[tb] = [fser]
1728
1729                    if fser.portal:
1730                        if tb not in pmasters: pmasters[tb] = [ fser ]
1731                        else: pmasters[tb].append(fser)
1732                else:
1733                    self.log.error('Testbed service does not have name " + \
1734                            "and importers')
1735        return masters, pmasters
1736
1737    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1738        """
1739        Create the ssh keys necessary for interconnecting the potral nodes and
1740        the global hosts file for letting each segment know about the IP
1741        addresses in play.  Save these into the repo.  Add attributes to the
1742        autorizer allowing access controllers to download them and return a set
1743        of attributes that inform the segments where to find this stuff.  Mau
1744        raise service_errors in if there are problems.
1745        """
1746        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1747        gw_secretkey_base = "fed.%s" % self.ssh_type
1748        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1749        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1750
1751        try:
1752            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1753        except ValueError:
1754            raise service_error(service_error.server_config, 
1755                    "Bad key type (%s)" % self.ssh_type)
1756
1757
1758        # Copy configuration files into the remote file store
1759        # The config urlpath
1760        configpath = "/%s/config" % expid
1761        # The config file system location
1762        configdir ="%s%s" % ( self.repodir, configpath)
1763        try:
1764            os.makedirs(configdir)
1765        except EnvironmentError, e:
1766            raise service_error(service_error.internal,
1767                    "Cannot create config directory: %s" % e)
1768        try:
1769            f = open("%s/hosts" % configdir, "w")
1770            print >> f, string.join(hosts, '\n')
1771            f.close()
1772        except EnvironmentError, e:
1773            raise service_error(service_error.internal, 
1774                    "Cannot write hosts file: %s" % e)
1775        try:
1776            copy_file("%s" % gw_pubkey, "%s/%s" % \
1777                    (configdir, gw_pubkey_base))
1778            copy_file("%s" % gw_secretkey, "%s/%s" % \
1779                    (configdir, gw_secretkey_base))
1780        except EnvironmentError, e:
1781            raise service_error(service_error.internal, 
1782                    "Cannot copy keyfiles: %s" % e)
1783
1784        # Allow the individual testbeds to access the configuration files,
1785        # again by setting an attribute for the relevant pathnames on each
1786        # allocation principal.  Yeah, that's a long list comprehension.
1787        self.append_experiment_authorization(expid, set([
1788            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1789                    for tb in tbparams.keys() \
1790                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1791
1792        attrs = [ 
1793                {
1794                    'attribute': 'ssh_pubkey', 
1795                    'value': '%s/%s/config/%s' % \
1796                            (self.repo_url, expid, gw_pubkey_base)
1797                },
1798                {
1799                    'attribute': 'ssh_secretkey', 
1800                    'value': '%s/%s/config/%s' % \
1801                            (self.repo_url, expid, gw_secretkey_base)
1802                },
1803                {
1804                    'attribute': 'hosts', 
1805                    'value': '%s/%s/config/hosts' % \
1806                            (self.repo_url, expid)
1807                },
1808            ]
1809        return attrs
1810
1811
1812    def get_vtopo(self, req, fid):
1813        """
1814        Return the stored virtual topology for this experiment
1815        """
1816        rv = None
1817        state = None
1818
1819        req = req.get('VtopoRequestBody', None)
1820        if not req:
1821            raise service_error(service_error.req,
1822                    "Bad request format (no VtopoRequestBody)")
1823        exp = req.get('experiment', None)
1824        if exp:
1825            if exp.has_key('fedid'):
1826                key = exp['fedid']
1827                keytype = "fedid"
1828            elif exp.has_key('localname'):
1829                key = exp['localname']
1830                keytype = "localname"
1831            else:
1832                raise service_error(service_error.req, "Unknown lookup type")
1833        else:
1834            raise service_error(service_error.req, "No request?")
1835
1836        proof = self.check_experiment_access(fid, key)
1837
1838        self.state_lock.acquire()
1839        if self.state.has_key(key):
1840            if self.state[key].has_key('vtopo'):
1841                rv = { 'experiment' : {keytype: key },
1842                        'vtopo': self.state[key]['vtopo'],
1843                        'proof': proof.to_dict(), 
1844                    }
1845            else:
1846                state = self.state[key]['experimentStatus']
1847        self.state_lock.release()
1848
1849        if rv: return rv
1850        else: 
1851            if state:
1852                raise service_error(service_error.partial, 
1853                        "Not ready: %s" % state)
1854            else:
1855                raise service_error(service_error.req, "No such experiment")
1856
1857    def get_vis(self, req, fid):
1858        """
1859        Return the stored visualization for this experiment
1860        """
1861        rv = None
1862        state = None
1863
1864        req = req.get('VisRequestBody', None)
1865        if not req:
1866            raise service_error(service_error.req,
1867                    "Bad request format (no VisRequestBody)")
1868        exp = req.get('experiment', None)
1869        if exp:
1870            if exp.has_key('fedid'):
1871                key = exp['fedid']
1872                keytype = "fedid"
1873            elif exp.has_key('localname'):
1874                key = exp['localname']
1875                keytype = "localname"
1876            else:
1877                raise service_error(service_error.req, "Unknown lookup type")
1878        else:
1879            raise service_error(service_error.req, "No request?")
1880
1881        proof = self.check_experiment_access(fid, key)
1882
1883        self.state_lock.acquire()
1884        if self.state.has_key(key):
1885            if self.state[key].has_key('vis'):
1886                rv =  { 'experiment' : {keytype: key },
1887                        'vis': self.state[key]['vis'],
1888                        'proof': proof.to_dict(), 
1889                        }
1890            else:
1891                state = self.state[key]['experimentStatus']
1892        self.state_lock.release()
1893
1894        if rv: return rv
1895        else:
1896            if state:
1897                raise service_error(service_error.partial, 
1898                        "Not ready: %s" % state)
1899            else:
1900                raise service_error(service_error.req, "No such experiment")
1901
1902   
1903    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1904            top):
1905        """
1906        Store the various data that have changed in the experiment state
1907        between when it was started and the beginning of resource allocation.
1908        This is basically the information about each local allocation.  This
1909        fills in the values of the placeholder allocation in the state.  It
1910        also collects the access proofs and returns them as dicts for a
1911        response message.
1912        """
1913        # save federant information
1914        for k in allocated.keys():
1915            tbparams[k]['federant'] = {
1916                    'name': [ { 'localname' : eid} ],
1917                    'allocID' : tbparams[k]['allocID'],
1918                    'uri': tbparams[k]['uri'],
1919                    'proof': tbparams[k]['proof'],
1920                }
1921
1922        self.state_lock.acquire()
1923        self.state[eid]['vtopo'] = vtopo
1924        self.state[eid]['vis'] = vis
1925        self.state[eid]['experimentdescription'] = \
1926                { 'topdldescription': top.to_dict() }
1927        self.state[eid]['federant'] = \
1928                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1929                    if tbparams[tb].has_key('federant') ]
1930        # Access proofs for the response message
1931        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1932                    for p in tbparams[k]['federant']['proof']]
1933        if self.state_filename: 
1934            self.write_state()
1935        self.state_lock.release()
1936        return proofs
1937
1938    def clear_placeholder(self, eid, expid, tmpdir):
1939        """
1940        Clear the placeholder and remove any allocated temporary dir.
1941        """
1942
1943        self.state_lock.acquire()
1944        del self.state[eid]
1945        del self.state[expid]
1946        if self.state_filename: self.write_state()
1947        self.state_lock.release()
1948        if tmpdir and self.cleanup:
1949            self.remove_dirs(tmpdir)
1950
1951    # end of create_experiment sub-functions
1952
1953    def create_experiment(self, req, fid):
1954        """
1955        The external interface to experiment creation called from the
1956        dispatcher.
1957
1958        Creates a working directory, splits the incoming description using the
1959        splitter script and parses out the various subsections using the
1960        classes above.  Once each sub-experiment is created, use pooled threads
1961        to instantiate them and start it all up.
1962        """
1963
1964        req = req.get('CreateRequestBody', None)
1965        if req:
1966            key = self.get_experiment_key(req)
1967        else:
1968            raise service_error(service_error.req,
1969                    "Bad request format (no CreateRequestBody)")
1970
1971        # Import information from the requester
1972        if self.auth.import_credentials(data_list=req.get('credential', [])):
1973            self.auth.save()
1974
1975        # Make sure that the caller can talk to us
1976        proof = self.check_experiment_access(fid, key)
1977
1978        # Install the testbed map entries supplied with the request into a copy
1979        # of the testbed map.
1980        tbmap = dict(self.tbmap)
1981        for m in req.get('testbedmap', []):
1982            if 'testbed' in m and 'uri' in m:
1983                tbmap[m['testbed']] = m['uri']
1984
1985        # a place to work
1986        try:
1987            tmpdir = tempfile.mkdtemp(prefix="split-")
1988            os.mkdir(tmpdir+"/keys")
1989        except EnvironmentError:
1990            raise service_error(service_error.internal, "Cannot create tmp dir")
1991
1992        tbparams = { }
1993
1994        eid, expid, expcert_file = \
1995                self.get_experiment_ids_and_start(key, tmpdir)
1996
1997        # This catches exceptions to clear the placeholder if necessary
1998        try: 
1999            if not (eid and expid):
2000                raise service_error(service_error.internal, 
2001                        "Cannot find local experiment info!?")
2002
2003            top = self.get_topology(req, tmpdir)
2004            self.confirm_software(top)
2005            # Assign the IPs
2006            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2007            # Find the testbeds to look up
2008            tb_hosts = { }
2009            testbeds = [ ]
2010            for e in top.elements:
2011                if isinstance(e, topdl.Computer):
2012                    tb = e.get_attribute('testbed') or 'default'
2013                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2014                    else: 
2015                        tb_hosts[tb] = [ e.name ]
2016                        testbeds.append(tb)
2017
2018            masters, pmasters = self.get_testbed_services(req, testbeds)
2019            allocated = { }         # Testbeds we can access
2020            topo ={ }               # Sub topologies
2021            connInfo = { }          # Connection information
2022
2023            self.split_topology(top, topo, testbeds)
2024
2025            self.get_access_to_testbeds(testbeds, fid, allocated, 
2026                    tbparams, masters, tbmap, expid, expcert_file)
2027
2028            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2029
2030            part = experiment_partition(self.auth, self.store_url, tbmap,
2031                    self.muxmax, self.direct_transit)
2032            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2033                    connInfo, expid)
2034
2035            auth_attrs = set()
2036            # Now get access to the dynamic testbeds (those added above)
2037            for tb in [ t for t in topo if t not in allocated]:
2038                self.get_access(tb, tbparams, fid, masters, tbmap, 
2039                        expid, expcert_file)
2040                allocated[tb] = 1
2041                store_keys = topo[tb].get_attribute('store_keys')
2042                # Give the testbed access to keys it exports or imports
2043                if store_keys:
2044                    auth_attrs.update(set([
2045                        (tbparams[tb]['allocID']['fedid'], sk) \
2046                                for sk in store_keys.split(" ")]))
2047
2048            if auth_attrs:
2049                self.append_experiment_authorization(expid, auth_attrs)
2050
2051            # transit and disconnected testbeds may not have a connInfo entry.
2052            # Fill in the blanks.
2053            for t in allocated.keys():
2054                if not connInfo.has_key(t):
2055                    connInfo[t] = { }
2056
2057            self.wrangle_software(expid, top, topo, tbparams)
2058
2059            vtopo = topdl.topology_to_vtopo(top)
2060            vis = self.genviz(vtopo)
2061            proofs = self.save_federant_information(allocated, tbparams, 
2062                    eid, vtopo, vis, top)
2063        except service_error, e:
2064            # If something goes wrong in the parse (usually an access error)
2065            # clear the placeholder state.  From here on out the code delays
2066            # exceptions.  Failing at this point returns a fault to the remote
2067            # caller.
2068            self.clear_placeholder(eid, expid, tmpdir)
2069            raise e
2070
2071        # Start the background swapper and return the starting state.  From
2072        # here on out, the state will stick around a while.
2073
2074        # Create a logger that logs to the experiment's state object as well as
2075        # to the main log file.
2076        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2077        alloc_collector = self.list_log(self.state[eid]['log'])
2078        h = logging.StreamHandler(alloc_collector)
2079        # XXX: there should be a global one of these rather than repeating the
2080        # code.
2081        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2082                    '%d %b %y %H:%M:%S'))
2083        alloc_log.addHandler(h)
2084
2085        # Start a thread to do the resource allocation
2086        t  = Thread(target=self.allocate_resources,
2087                args=(allocated, masters, eid, expid, tbparams, 
2088                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2089                    connInfo, tbmap, expcert_file),
2090                name=eid)
2091        t.start()
2092
2093        rv = {
2094                'experimentID': [
2095                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2096                ],
2097                'experimentStatus': 'starting',
2098                'proof': [ proof.to_dict() ] + proofs,
2099            }
2100
2101        return rv
2102   
2103    def get_experiment_fedid(self, key):
2104        """
2105        find the fedid associated with the localname key in the state database.
2106        """
2107
2108        rv = None
2109        self.state_lock.acquire()
2110        if self.state.has_key(key):
2111            if isinstance(self.state[key], dict):
2112                try:
2113                    kl = [ f['fedid'] for f in \
2114                            self.state[key]['experimentID']\
2115                                if f.has_key('fedid') ]
2116                except KeyError:
2117                    self.state_lock.release()
2118                    raise service_error(service_error.internal, 
2119                            "No fedid for experiment %s when getting "+\
2120                                    "fedid(!?)" % key)
2121                if len(kl) == 1:
2122                    rv = kl[0]
2123                else:
2124                    self.state_lock.release()
2125                    raise service_error(service_error.internal, 
2126                            "multiple fedids for experiment %s when " +\
2127                                    "getting fedid(!?)" % key)
2128            else:
2129                self.state_lock.release()
2130                raise service_error(service_error.internal, 
2131                        "Unexpected state for %s" % key)
2132        self.state_lock.release()
2133        return rv
2134
2135    def check_experiment_access(self, fid, key):
2136        """
2137        Confirm that the fid has access to the experiment.  Though a request
2138        may be made in terms of a local name, the access attribute is always
2139        the experiment's fedid.
2140        """
2141        if not isinstance(key, fedid):
2142            key = self.get_experiment_fedid(key)
2143
2144        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2145
2146        if access_ok:
2147            return proof
2148        else:
2149            raise service_error(service_error.access, "Access Denied",
2150                proof)
2151
2152
2153    def get_handler(self, path, fid):
2154        """
2155        Perhaps surprisingly named, this function handles HTTP GET requests to
2156        this server (SOAP requests are POSTs).
2157        """
2158        self.log.info("Get handler %s %s" % (path, fid))
2159        # XXX: log proofs?
2160        if self.auth.check_attribute(fid, path):
2161            return ("%s/%s" % (self.repodir, path), "application/binary")
2162        else:
2163            return (None, None)
2164
2165    def clean_info_response(self, rv, proof):
2166        """
2167        Remove the information in the experiment's state object that is not in
2168        the info response.
2169        """
2170        # Remove the owner info (should always be there, but...)
2171        if rv.has_key('owner'): del rv['owner']
2172        if 'auth' in rv: del rv['auth']
2173
2174        # Convert the log into the allocationLog parameter and remove the
2175        # log entry (with defensive programming)
2176        if rv.has_key('log'):
2177            rv['allocationLog'] = "".join(rv['log'])
2178            del rv['log']
2179        else:
2180            rv['allocationLog'] = ""
2181
2182        if rv['experimentStatus'] != 'active':
2183            if rv.has_key('federant'): del rv['federant']
2184        else:
2185            # remove the allocationID and uri info from each federant
2186            for f in rv.get('federant', []):
2187                if f.has_key('allocID'): del f['allocID']
2188                if f.has_key('uri'): del f['uri']
2189        rv['proof'] = proof.to_dict()
2190
2191        return rv
2192
2193    def get_info(self, req, fid):
2194        """
2195        Return all the stored info about this experiment
2196        """
2197        rv = None
2198
2199        req = req.get('InfoRequestBody', None)
2200        if not req:
2201            raise service_error(service_error.req,
2202                    "Bad request format (no InfoRequestBody)")
2203        exp = req.get('experiment', None)
2204        if exp:
2205            if exp.has_key('fedid'):
2206                key = exp['fedid']
2207                keytype = "fedid"
2208            elif exp.has_key('localname'):
2209                key = exp['localname']
2210                keytype = "localname"
2211            else:
2212                raise service_error(service_error.req, "Unknown lookup type")
2213        else:
2214            raise service_error(service_error.req, "No request?")
2215
2216        proof = self.check_experiment_access(fid, key)
2217
2218        # The state may be massaged by the service function that called
2219        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2220        # state.
2221        self.state_lock.acquire()
2222        if self.state.has_key(key):
2223            rv = copy.deepcopy(self.state[key])
2224        self.state_lock.release()
2225
2226        if rv:
2227            return self.clean_info_response(rv, proof)
2228        else:
2229            raise service_error(service_error.req, "No such experiment")
2230
2231    def get_multi_info(self, req, fid):
2232        """
2233        Return all the stored info that this fedid can access
2234        """
2235        rv = { 'info': [ ], 'proof': [ ] }
2236
2237        self.state_lock.acquire()
2238        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2239            try:
2240                proof = self.check_experiment_access(fid, key)
2241            except service_error, e:
2242                if e.code == service_error.access:
2243                    continue
2244                else:
2245                    self.state_lock.release()
2246                    raise e
2247
2248            if self.state.has_key(key):
2249                e = copy.deepcopy(self.state[key])
2250                e = self.clean_info_response(e, proof)
2251                rv['info'].append(e)
2252                rv['proof'].append(proof.to_dict())
2253        self.state_lock.release()
2254        return rv
2255
2256    def check_termination_status(self, fed_exp, force):
2257        """
2258        Confirm that the experiment is sin a valid state to stop (or force it)
2259        return the state - invalid states for deletion and force settings cause
2260        exceptions.
2261        """
2262        self.state_lock.acquire()
2263        status = fed_exp.get('experimentStatus', None)
2264
2265        if status:
2266            if status in ('starting', 'terminating'):
2267                if not force:
2268                    self.state_lock.release()
2269                    raise service_error(service_error.partial, 
2270                            'Experiment still being created or destroyed')
2271                else:
2272                    self.log.warning('Experiment in %s state ' % status + \
2273                            'being terminated by force.')
2274            self.state_lock.release()
2275            return status
2276        else:
2277            # No status??? trouble
2278            self.state_lock.release()
2279            raise service_error(service_error.internal,
2280                    "Experiment has no status!?")
2281
2282
2283    def get_termination_info(self, fed_exp):
2284        ids = []
2285        term_params = { }
2286        self.state_lock.acquire()
2287        #  experimentID is a list of dicts that are self-describing
2288        #  identifiers.  This finds all the fedids and localnames - the
2289        #  keys of self.state - and puts them into ids, which is used to delete
2290        #  the state after everything is swapped out.
2291        for id in fed_exp.get('experimentID', []):
2292            if 'fedid' in id: 
2293                ids.append(id['fedid'])
2294                repo = "%s" % id['fedid']
2295            if 'localname' in id: ids.append(id['localname'])
2296
2297        # Get the experimentAccess - the principal for this experiment.  It
2298        # is this principal to which credentials have been delegated, and
2299        # as which the experiment controller must act.
2300        if 'experimentAccess' in fed_exp and \
2301                'X509' in fed_exp['experimentAccess']:
2302            expcert = fed_exp['experimentAccess']['X509']
2303        else:
2304            expcert = None
2305
2306        # Collect the allocation/segment ids into a dict keyed by the fedid
2307        # of the allocation (or a monotonically increasing integer) that
2308        # contains a tuple of uri, aid (which is a dict...)
2309        for i, fed in enumerate(fed_exp.get('federant', [])):
2310            try:
2311                uri = fed['uri']
2312                aid = fed['allocID']
2313                k = fed['allocID'].get('fedid', i)
2314            except KeyError, e:
2315                continue
2316            term_params[k] = (uri, aid)
2317        # Change the experiment state
2318        fed_exp['experimentStatus'] = 'terminating'
2319        if self.state_filename: self.write_state()
2320        self.state_lock.release()
2321
2322        return ids, term_params, expcert, repo
2323
2324
2325    def deallocate_resources(self, term_params, expcert, status, force, 
2326            dealloc_log):
2327        tmpdir = None
2328        # This try block makes sure the tempdir is cleared
2329        try:
2330            # If no expcert, try the deallocation as the experiment
2331            # controller instance.
2332            if expcert and self.auth_type != 'legacy': 
2333                try:
2334                    tmpdir = tempfile.mkdtemp(prefix="term-")
2335                except EnvironmentError:
2336                    raise service_error(service_error.internal, 
2337                            "Cannot create tmp dir")
2338                cert_file = self.make_temp_certfile(expcert, tmpdir)
2339                pw = None
2340            else: 
2341                cert_file = self.cert_file
2342                pw = self.cert_pwd
2343
2344            # Stop everyone.  NB, wait_for_all waits until a thread starts
2345            # and then completes, so we can't wait if nothing starts.  So,
2346            # no tbparams, no start.
2347            if len(term_params) > 0:
2348                tp = thread_pool(self.nthreads)
2349                for k, (uri, aid) in term_params.items():
2350                    # Create and start a thread to stop the segment
2351                    tp.wait_for_slot()
2352                    t  = pooled_thread(\
2353                            target=self.terminate_segment(log=dealloc_log,
2354                                testbed=uri,
2355                                cert_file=cert_file, 
2356                                cert_pwd=pw,
2357                                trusted_certs=self.trusted_certs,
2358                                caller=self.call_TerminateSegment),
2359                            args=(uri, aid), name=k,
2360                            pdata=tp, trace_file=self.trace_file)
2361                    t.start()
2362                # Wait for completions
2363                tp.wait_for_all_done()
2364
2365            # release the allocations (failed experiments have done this
2366            # already, and starting experiments may be in odd states, so we
2367            # ignore errors releasing those allocations
2368            try: 
2369                for k, (uri, aid)  in term_params.items():
2370                    self.release_access(None, aid, uri=uri,
2371                            cert_file=cert_file, cert_pwd=pw)
2372            except service_error, e:
2373                if status != 'failed' and not force:
2374                    raise e
2375
2376        # Clean up the tmpdir no matter what
2377        finally:
2378            if tmpdir: self.remove_dirs(tmpdir)
2379
2380    def terminate_experiment(self, req, fid):
2381        """
2382        Swap this experiment out on the federants and delete the shared
2383        information
2384        """
2385        tbparams = { }
2386        req = req.get('TerminateRequestBody', None)
2387        if not req:
2388            raise service_error(service_error.req,
2389                    "Bad request format (no TerminateRequestBody)")
2390
2391        key = self.get_experiment_key(req, 'experiment')
2392        proof = self.check_experiment_access(fid, key)
2393        exp = req.get('experiment', False)
2394        force = req.get('force', False)
2395
2396        dealloc_list = [ ]
2397
2398
2399        # Create a logger that logs to the dealloc_list as well as to the main
2400        # log file.
2401        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2402        h = logging.StreamHandler(self.list_log(dealloc_list))
2403        # XXX: there should be a global one of these rather than repeating the
2404        # code.
2405        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2406                    '%d %b %y %H:%M:%S'))
2407        dealloc_log.addHandler(h)
2408
2409        self.state_lock.acquire()
2410        fed_exp = self.state.get(key, None)
2411        self.state_lock.release()
2412        repo = None
2413
2414        if fed_exp:
2415            status = self.check_termination_status(fed_exp, force)
2416            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2417            self.deallocate_resources(term_params, expcert, status, force, 
2418                    dealloc_log)
2419
2420            # Remove the terminated experiment
2421            self.state_lock.acquire()
2422            for id in ids:
2423                self.clear_experiment_authorization(id, need_state_lock=False)
2424                if id in self.state: del self.state[id]
2425
2426            if self.state_filename: self.write_state()
2427            self.state_lock.release()
2428
2429            # Delete any synch points associated with this experiment.  All
2430            # synch points begin with the fedid of the experiment.
2431            fedid_keys = set(["fedid:%s" % f for f in ids \
2432                    if isinstance(f, fedid)])
2433            for k in self.synch_store.all_keys():
2434                try:
2435                    if len(k) > 45 and k[0:46] in fedid_keys:
2436                        self.synch_store.del_value(k)
2437                except synch_store.BadDeletionError:
2438                    pass
2439            self.write_store()
2440
2441            # Remove software and other cached stuff from the filesystem.
2442            if repo:
2443                self.remove_dirs("%s/%s" % (self.repodir, repo))
2444       
2445            return { 
2446                    'experiment': exp , 
2447                    'deallocationLog': string.join(dealloc_list, ''),
2448                    'proof': [proof.to_dict()],
2449                    }
2450        else:
2451            raise service_error(service_error.req, "No saved state")
2452
2453
2454    def GetValue(self, req, fid):
2455        """
2456        Get a value from the synchronized store
2457        """
2458        req = req.get('GetValueRequestBody', None)
2459        if not req:
2460            raise service_error(service_error.req,
2461                    "Bad request format (no GetValueRequestBody)")
2462       
2463        name = req.get('name', None)
2464        wait = req.get('wait', False)
2465        rv = { 'name': name }
2466
2467        if not name:
2468            raise service_error(service_error.req, "No name?")
2469
2470        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2471
2472        if access_ok:
2473            self.log.debug("[GetValue] asking for %s " % name)
2474            try:
2475                v = self.synch_store.get_value(name, wait)
2476            except synch_store.RevokedKeyError:
2477                # No more synch on this key
2478                raise service_error(service_error.federant, 
2479                        "Synch key %s revoked" % name)
2480            if v is not None:
2481                rv['value'] = v
2482            rv['proof'] = proof.to_dict()
2483            self.log.debug("[GetValue] got %s from %s" % (v, name))
2484            return rv
2485        else:
2486            raise service_error(service_error.access, "Access Denied",
2487                    proof=proof)
2488       
2489
2490    def SetValue(self, req, fid):
2491        """
2492        Set a value in the synchronized store
2493        """
2494        req = req.get('SetValueRequestBody', None)
2495        if not req:
2496            raise service_error(service_error.req,
2497                    "Bad request format (no SetValueRequestBody)")
2498       
2499        name = req.get('name', None)
2500        v = req.get('value', '')
2501
2502        if not name:
2503            raise service_error(service_error.req, "No name?")
2504
2505        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2506
2507        if access_ok:
2508            try:
2509                self.synch_store.set_value(name, v)
2510                self.write_store()
2511                self.log.debug("[SetValue] set %s to %s" % (name, v))
2512            except synch_store.CollisionError:
2513                # Translate into a service_error
2514                raise service_error(service_error.req,
2515                        "Value already set: %s" %name)
2516            except synch_store.RevokedKeyError:
2517                # No more synch on this key
2518                raise service_error(service_error.federant, 
2519                        "Synch key %s revoked" % name)
2520                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2521        else:
2522            raise service_error(service_error.access, "Access Denied",
2523                    proof=proof)
Note: See TracBrowser for help on using the repository browser.