source: fedd/federation/experiment_control.py @ 2627eb3

axis_examplecompt_changesinfo-ops
Last change on this file since 2627eb3 was 2627eb3, checked in by Ted Faber <faber@…>, 14 years ago

Fail if software not available before allocations are made.

  • Property mode set to 100644
File size: 83.3 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import traceback
17# For parsing visualization output and splitter output
18import xml.parsers.expat
19
20from threading import Lock, Thread, Condition
21from subprocess import call, Popen, PIPE
22from string import join
23
24from urlparse import urlparse
25from urllib2 import urlopen
26
27from util import *
28from fedid import fedid, generate_fedid
29from remote_service import xmlrpc_handler, soap_handler, service_caller
30from service_error import service_error
31from synch_store import synch_store
32from experiment_partition import experiment_partition
33from experiment_control_legacy import experiment_control_legacy
34from authorizer import abac_authorizer
35from thread_pool import thread_pool, pooled_thread
36
37import topdl
38import list_log
39from ip_allocator import ip_allocator
40from ip_addr import ip_addr
41
42
43class nullHandler(logging.Handler):
44    def emit(self, record): pass
45
46fl = logging.getLogger("fedd.experiment_control")
47fl.addHandler(nullHandler())
48
49
50# Right now, no support for composition.
51class federated_service:
52    def __init__(self, name, exporter=None, importers=None, params=None, 
53            reqs=None, portal=None):
54        self.name=name
55        self.exporter=exporter
56        if importers is None: self.importers = []
57        else: self.importers=importers
58        if params is None: self.params = { }
59        else: self.params = params
60        if reqs is None: self.reqs = []
61        else: self.reqs = reqs
62
63        if portal is not None:
64            self.portal = portal
65        else:
66            self.portal = (name in federated_service.needs_portal)
67
68    def __str__(self):
69        return "name %s export %s import %s params %s reqs %s" % \
70                (self.name, self.exporter, self.importers, self.params,
71                        [ (r['name'], r['visibility']) for r in self.reqs] )
72
73    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
74
75class experiment_control_local(experiment_control_legacy):
76    """
77    Control of experiments that this system can directly access.
78
79    Includes experiment creation, termination and information dissemination.
80    Thred safe.
81    """
82
83    class ssh_cmd_timeout(RuntimeError): pass
84   
85    call_RequestAccess = service_caller('RequestAccess')
86    call_ReleaseAccess = service_caller('ReleaseAccess')
87    call_StartSegment = service_caller('StartSegment')
88    call_TerminateSegment = service_caller('TerminateSegment')
89    call_Ns2Topdl = service_caller('Ns2Topdl')
90
91    def __init__(self, config=None, auth=None):
92        """
93        Intialize the various attributes, most from the config object
94        """
95
96        def parse_tarfile_list(tf):
97            """
98            Parse a tarfile list from the configuration.  This is a set of
99            paths and tarfiles separated by spaces.
100            """
101            rv = [ ]
102            if tf is not None:
103                tl = tf.split()
104                while len(tl) > 1:
105                    p, t = tl[0:2]
106                    del tl[0:2]
107                    rv.append((p, t))
108            return rv
109
110        self.list_log = list_log.list_log
111
112        self.cert_file = config.get("experiment_control", "cert_file")
113        if self.cert_file:
114            self.cert_pwd = config.get("experiment_control", "cert_pwd")
115        else:
116            self.cert_file = config.get("globals", "cert_file")
117            self.cert_pwd = config.get("globals", "cert_pwd")
118
119        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
120                or config.get("globals", "trusted_certs")
121
122        self.repodir = config.get("experiment_control", "repodir")
123        self.repo_url = config.get("experiment_control", "repo_url", 
124                "https://users.isi.deterlab.net:23235");
125
126        self.exp_stem = "fed-stem"
127        self.log = logging.getLogger("fedd.experiment_control")
128        set_log_level(config, "experiment_control", self.log)
129        self.muxmax = 2
130        self.nthreads = 10
131        self.randomize_experiments = False
132
133        self.splitter = None
134        self.ssh_keygen = "/usr/bin/ssh-keygen"
135        self.ssh_identity_file = None
136
137
138        self.debug = config.getboolean("experiment_control", "create_debug")
139        self.cleanup = not config.getboolean("experiment_control", 
140                "leave_tmpfiles")
141        self.state_filename = config.get("experiment_control", 
142                "experiment_state")
143        self.store_filename = config.get("experiment_control", 
144                "synch_store")
145        self.store_url = config.get("experiment_control", "store_url")
146        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
147        self.fedkit = parse_tarfile_list(\
148                config.get("experiment_control", "fedkit"))
149        self.gatewaykit = parse_tarfile_list(\
150                config.get("experiment_control", "gatewaykit"))
151        accessdb_file = config.get("experiment_control", "accessdb")
152
153        self.ssh_pubkey_file = config.get("experiment_control", 
154                "ssh_pubkey_file")
155        self.ssh_privkey_file = config.get("experiment_control",
156                "ssh_privkey_file")
157        dt = config.get("experiment_control", "direct_transit")
158        self.auth_type = config.get('experiment_control', 'auth_type') \
159                or 'legacy'
160        self.auth_dir = config.get('experiment_control', 'auth_dir')
161        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
162        else: self.direct_transit = [ ]
163        # NB for internal master/slave ops, not experiment setup
164        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
165
166        self.overrides = set([])
167        ovr = config.get('experiment_control', 'overrides')
168        if ovr:
169            for o in ovr.split(","):
170                o = o.strip()
171                if o.startswith('fedid:'): o = o[len('fedid:'):]
172                self.overrides.add(fedid(hexstr=o))
173
174        self.state = { }
175        self.state_lock = Lock()
176        self.tclsh = "/usr/local/bin/otclsh"
177        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
178                config.get("experiment_control", "tcl_splitter",
179                        "/usr/testbed/lib/ns2ir/parse.tcl")
180        mapdb_file = config.get("experiment_control", "mapdb")
181        self.trace_file = sys.stderr
182
183        self.def_expstart = \
184                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
185                "/tmp/federate";
186        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
187                "FEDDIR/hosts";
188        self.def_gwstart = \
189                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
190                "/tmp/bridge.log";
191        self.def_mgwstart = \
192                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
193                "/tmp/bridge.log";
194        self.def_gwimage = "FBSD61-TUNNEL2";
195        self.def_gwtype = "pc";
196        self.local_access = { }
197
198        if self.auth_type == 'legacy':
199            if auth:
200                self.auth = auth
201            else:
202                self.log.error( "[access]: No authorizer initialized, " +\
203                        "creating local one.")
204                auth = authorizer()
205            self.get_access = self.legacy_get_access
206        elif self.auth_type == 'abac':
207            self.auth = abac_authorizer(load=self.auth_dir)
208        else:
209            raise service_error(service_error.internal, 
210                    "Unknown auth_type: %s" % self.auth_type)
211
212
213        if self.ssh_pubkey_file:
214            try:
215                f = open(self.ssh_pubkey_file, 'r')
216                self.ssh_pubkey = f.read()
217                f.close()
218            except EnvironmentError:
219                raise service_error(service_error.internal,
220                        "Cannot read sshpubkey")
221        else:
222            raise service_error(service_error.internal, 
223                    "No SSH public key file?")
224
225        if not self.ssh_privkey_file:
226            raise service_error(service_error.internal, 
227                    "No SSH public key file?")
228
229
230        if mapdb_file:
231            self.read_mapdb(mapdb_file)
232        else:
233            self.log.warn("[experiment_control] No testbed map, using defaults")
234            self.tbmap = { 
235                    'deter':'https://users.isi.deterlab.net:23235',
236                    'emulab':'https://users.isi.deterlab.net:23236',
237                    'ucb':'https://users.isi.deterlab.net:23237',
238                    }
239
240        if accessdb_file:
241                self.read_accessdb(accessdb_file)
242        else:
243            raise service_error(service_error.internal,
244                    "No accessdb specified in config")
245
246        # Grab saved state.  OK to do this w/o locking because it's read only
247        # and only one thread should be in existence that can see self.state at
248        # this point.
249        if self.state_filename:
250            self.read_state()
251
252        if self.store_filename:
253            self.read_store()
254        else:
255            self.log.warning("No saved synch store")
256            self.synch_store = synch_store
257
258        # Dispatch tables
259        self.soap_services = {\
260                'New': soap_handler('New', self.new_experiment),
261                'Create': soap_handler('Create', self.create_experiment),
262                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
263                'Vis': soap_handler('Vis', self.get_vis),
264                'Info': soap_handler('Info', self.get_info),
265                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
266                'Terminate': soap_handler('Terminate', 
267                    self.terminate_experiment),
268                'GetValue': soap_handler('GetValue', self.GetValue),
269                'SetValue': soap_handler('SetValue', self.SetValue),
270        }
271
272        self.xmlrpc_services = {\
273                'New': xmlrpc_handler('New', self.new_experiment),
274                'Create': xmlrpc_handler('Create', self.create_experiment),
275                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
276                'Vis': xmlrpc_handler('Vis', self.get_vis),
277                'Info': xmlrpc_handler('Info', self.get_info),
278                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
279                'Terminate': xmlrpc_handler('Terminate',
280                    self.terminate_experiment),
281                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
282                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
283        }
284
285    # Call while holding self.state_lock
286    def write_state(self):
287        """
288        Write a new copy of experiment state after copying the existing state
289        to a backup.
290
291        State format is a simple pickling of the state dictionary.
292        """
293        if os.access(self.state_filename, os.W_OK):
294            copy_file(self.state_filename, \
295                    "%s.bak" % self.state_filename)
296        try:
297            f = open(self.state_filename, 'w')
298            pickle.dump(self.state, f)
299        except EnvironmentError, e:
300            self.log.error("Can't write file %s: %s" % \
301                    (self.state_filename, e))
302        except pickle.PicklingError, e:
303            self.log.error("Pickling problem: %s" % e)
304        except TypeError, e:
305            self.log.error("Pickling problem (TypeError): %s" % e)
306
307    @staticmethod
308    def get_alloc_ids(state):
309        """
310        Pull the fedids of the identifiers of each allocation from the
311        state.  Again, a dict dive that's best isolated.
312
313        Used by read_store and read state
314        """
315
316        return [ f['allocID']['fedid'] 
317                for f in state.get('federant',[]) \
318                    if f.has_key('allocID') and \
319                        f['allocID'].has_key('fedid')]
320
321    # Call while holding self.state_lock
322    def read_state(self):
323        """
324        Read a new copy of experiment state.  Old state is overwritten.
325
326        State format is a simple pickling of the state dictionary.
327        """
328       
329        def get_experiment_id(state):
330            """
331            Pull the fedid experimentID out of the saved state.  This is kind
332            of a gross walk through the dict.
333            """
334
335            if state.has_key('experimentID'):
336                for e in state['experimentID']:
337                    if e.has_key('fedid'):
338                        return e['fedid']
339                else:
340                    return None
341            else:
342                return None
343
344        try:
345            f = open(self.state_filename, "r")
346            self.state = pickle.load(f)
347            self.log.debug("[read_state]: Read state from %s" % \
348                    self.state_filename)
349        except EnvironmentError, e:
350            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
351                    % (self.state_filename, e))
352        except pickle.UnpicklingError, e:
353            self.log.warning(("[read_state]: No saved state: " + \
354                    "Unpickling failed: %s") % e)
355       
356        for s in self.state.values():
357            try:
358
359                eid = get_experiment_id(s)
360                if eid : 
361                    if self.auth_type == 'legacy':
362                        # XXX: legacy
363                        # Give the owner rights to the experiment
364                        self.auth.set_attribute(s['owner'], eid)
365                        # And holders of the eid as well
366                        self.auth.set_attribute(eid, eid)
367                        # allow overrides to control experiments as well
368                        for o in self.overrides:
369                            self.auth.set_attribute(o, eid)
370                        # Set permissions to allow reading of the software
371                        # repo, if any, as well.
372                        for a in self.get_alloc_ids(s):
373                            self.auth.set_attribute(a, 'repo/%s' % eid)
374                else:
375                    raise KeyError("No experiment id")
376            except KeyError, e:
377                self.log.warning("[read_state]: State ownership or identity " +\
378                        "misformatted in %s: %s" % (self.state_filename, e))
379
380
381    def read_accessdb(self, accessdb_file):
382        """
383        Read the mapping from fedids that can create experiments to their name
384        in the 3-level access namespace.  All will be asserted from this
385        testbed and can include the local username and porject that will be
386        asserted on their behalf by this fedd.  Each fedid is also added to the
387        authorization system with the "create" attribute.
388        """
389        self.accessdb = {}
390        # These are the regexps for parsing the db
391        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
392        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
393                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
394        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
395                "\s*->\s*(" + name_expr + ")\s*$")
396        lineno = 0
397
398        # Parse the mappings and store in self.authdb, a dict of
399        # fedid -> (proj, user)
400        try:
401            f = open(accessdb_file, "r")
402            for line in f:
403                lineno += 1
404                line = line.strip()
405                if len(line) == 0 or line.startswith('#'):
406                    continue
407                m = project_line.match(line)
408                if m:
409                    fid = fedid(hexstr=m.group(1))
410                    project, user = m.group(2,3)
411                    if not self.accessdb.has_key(fid):
412                        self.accessdb[fid] = []
413                    self.accessdb[fid].append((project, user))
414                    continue
415
416                m = user_line.match(line)
417                if m:
418                    fid = fedid(hexstr=m.group(1))
419                    project = None
420                    user = m.group(2)
421                    if not self.accessdb.has_key(fid):
422                        self.accessdb[fid] = []
423                    self.accessdb[fid].append((project, user))
424                    continue
425                self.log.warn("[experiment_control] Error parsing access " +\
426                        "db %s at line %d" %  (accessdb_file, lineno))
427        except EnvironmentError:
428            raise service_error(service_error.internal,
429                    ("Error opening/reading %s as experiment " +\
430                            "control accessdb") %  accessdb_file)
431        f.close()
432
433        # Initialize the authorization attributes
434        # XXX: legacy
435        if self.auth_type == 'legacy':
436            for fid in self.accessdb.keys():
437                self.auth.set_attribute(fid, 'create')
438                self.auth.set_attribute(fid, 'new')
439
440    def read_mapdb(self, file):
441        """
442        Read a simple colon separated list of mappings for the
443        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
444        """
445
446        self.tbmap = { }
447        lineno =0
448        try:
449            f = open(file, "r")
450            for line in f:
451                lineno += 1
452                line = line.strip()
453                if line.startswith('#') or len(line) == 0:
454                    continue
455                try:
456                    label, url = line.split(':', 1)
457                    self.tbmap[label] = url
458                except ValueError, e:
459                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
460                            "map db: %s %s" % (lineno, line, e))
461        except EnvironmentError, e:
462            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
463                    "open %s: %s" % (file, e))
464        f.close()
465
466    def read_store(self):
467        try:
468            self.synch_store = synch_store()
469            self.synch_store.load(self.store_filename)
470            self.log.debug("[read_store]: Read store from %s" % \
471                    self.store_filename)
472        except EnvironmentError, e:
473            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
474                    % (self.state_filename, e))
475            self.synch_store = synch_store()
476
477        # Set the initial permissions on data in the store.  XXX: This ad hoc
478        # authorization attribute initialization is getting out of hand.
479        # XXX: legacy
480        if self.auth_type == 'legacy':
481            for k in self.synch_store.all_keys():
482                try:
483                    if k.startswith('fedid:'):
484                        fid = fedid(hexstr=k[6:46])
485                        if self.state.has_key(fid):
486                            for a in self.get_alloc_ids(self.state[fid]):
487                                self.auth.set_attribute(a, k)
488                except ValueError, e:
489                    self.log.warn("Cannot deduce permissions for %s" % k)
490
491
492    def write_store(self):
493        """
494        Write a new copy of synch_store after writing current state
495        to a backup.  We use the internal synch_store pickle method to avoid
496        incinsistent data.
497
498        State format is a simple pickling of the store.
499        """
500        if os.access(self.store_filename, os.W_OK):
501            copy_file(self.store_filename, \
502                    "%s.bak" % self.store_filename)
503        try:
504            self.synch_store.save(self.store_filename)
505        except EnvironmentError, e:
506            self.log.error("Can't write file %s: %s" % \
507                    (self.store_filename, e))
508        except TypeError, e:
509            self.log.error("Pickling problem (TypeError): %s" % e)
510
511
512    def remove_dirs(self, dir):
513        """
514        Remove the directory tree and all files rooted at dir.  Log any errors,
515        but continue.
516        """
517        self.log.debug("[removedirs]: removing %s" % dir)
518        try:
519            for path, dirs, files in os.walk(dir, topdown=False):
520                for f in files:
521                    os.remove(os.path.join(path, f))
522                for d in dirs:
523                    os.rmdir(os.path.join(path, d))
524            os.rmdir(dir)
525        except EnvironmentError, e:
526            self.log.error("Error deleting directory tree in %s" % e);
527
528    @staticmethod
529    def make_temp_certfile(expcert, tmpdir):
530        """
531        make a protected copy of the access certificate so the experiment
532        controller can act as the experiment principal.  mkstemp is the most
533        secure way to do that. The directory should be created by
534        mkdtemp.  Return the filename.
535        """
536        if expcert and tmpdir:
537            try:
538                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
539                f = os.fdopen(certf, 'w')
540                print >> f, expcert
541                f.close()
542            except EnvironmentError, e:
543                raise service_error(service_error.internal, 
544                        "Cannot create temp cert file?")
545            return certfn
546        else:
547            return None
548
549       
550    def generate_ssh_keys(self, dest, type="rsa" ):
551        """
552        Generate a set of keys for the gateways to use to talk.
553
554        Keys are of type type and are stored in the required dest file.
555        """
556        valid_types = ("rsa", "dsa")
557        t = type.lower();
558        if t not in valid_types: raise ValueError
559        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
560
561        try:
562            trace = open("/dev/null", "w")
563        except EnvironmentError:
564            raise service_error(service_error.internal,
565                    "Cannot open /dev/null??");
566
567        # May raise CalledProcessError
568        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
569        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
570        if rv != 0:
571            raise service_error(service_error.internal, 
572                    "Cannot generate nonce ssh keys.  %s return code %d" \
573                            % (self.ssh_keygen, rv))
574
575    def gentopo(self, str):
576        """
577        Generate the topology data structure from the splitter's XML
578        representation of it.
579
580        The topology XML looks like:
581            <experiment>
582                <nodes>
583                    <node><vname></vname><ips>ip1:ip2</ips></node>
584                </nodes>
585                <lans>
586                    <lan>
587                        <vname></vname><vnode></vnode><ip></ip>
588                        <bandwidth></bandwidth><member>node:port</member>
589                    </lan>
590                </lans>
591        """
592        class topo_parse:
593            """
594            Parse the topology XML and create the dats structure.
595            """
596            def __init__(self):
597                # Typing of the subelements for data conversion
598                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
599                self.int_subelements = ( 'bandwidth',)
600                self.float_subelements = ( 'delay',)
601                # The final data structure
602                self.nodes = [ ]
603                self.lans =  [ ]
604                self.topo = { \
605                        'node': self.nodes,\
606                        'lan' : self.lans,\
607                    }
608                self.element = { }  # Current element being created
609                self.chars = ""     # Last text seen
610
611            def end_element(self, name):
612                # After each sub element the contents is added to the current
613                # element or to the appropriate list.
614                if name == 'node':
615                    self.nodes.append(self.element)
616                    self.element = { }
617                elif name == 'lan':
618                    self.lans.append(self.element)
619                    self.element = { }
620                elif name in self.str_subelements:
621                    self.element[name] = self.chars
622                    self.chars = ""
623                elif name in self.int_subelements:
624                    self.element[name] = int(self.chars)
625                    self.chars = ""
626                elif name in self.float_subelements:
627                    self.element[name] = float(self.chars)
628                    self.chars = ""
629
630            def found_chars(self, data):
631                self.chars += data.rstrip()
632
633
634        tp = topo_parse();
635        parser = xml.parsers.expat.ParserCreate()
636        parser.EndElementHandler = tp.end_element
637        parser.CharacterDataHandler = tp.found_chars
638
639        parser.Parse(str)
640
641        return tp.topo
642       
643
644    def genviz(self, topo):
645        """
646        Generate the visualization the virtual topology
647        """
648
649        neato = "/usr/local/bin/neato"
650        # These are used to parse neato output and to create the visualization
651        # file.
652        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
653        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
654                "%s</type></node>"
655
656        try:
657            # Node names
658            nodes = [ n['vname'] for n in topo['node'] ]
659            topo_lans = topo['lan']
660        except KeyError, e:
661            raise service_error(service_error.internal, "Bad topology: %s" %e)
662
663        lans = { }
664        links = { }
665
666        # Walk through the virtual topology, organizing the connections into
667        # 2-node connections (links) and more-than-2-node connections (lans).
668        # When a lan is created, it's added to the list of nodes (there's a
669        # node in the visualization for the lan).
670        for l in topo_lans:
671            if links.has_key(l['vname']):
672                if len(links[l['vname']]) < 2:
673                    links[l['vname']].append(l['vnode'])
674                else:
675                    nodes.append(l['vname'])
676                    lans[l['vname']] = links[l['vname']]
677                    del links[l['vname']]
678                    lans[l['vname']].append(l['vnode'])
679            elif lans.has_key(l['vname']):
680                lans[l['vname']].append(l['vnode'])
681            else:
682                links[l['vname']] = [ l['vnode'] ]
683
684
685        # Open up a temporary file for dot to turn into a visualization
686        try:
687            df, dotname = tempfile.mkstemp()
688            dotfile = os.fdopen(df, 'w')
689        except EnvironmentError:
690            raise service_error(service_error.internal,
691                    "Failed to open file in genviz")
692
693        try:
694            dnull = open('/dev/null', 'w')
695        except EnvironmentError:
696            service_error(service_error.internal,
697                    "Failed to open /dev/null in genviz")
698
699        # Generate a dot/neato input file from the links, nodes and lans
700        try:
701            print >>dotfile, "graph G {"
702            for n in nodes:
703                print >>dotfile, '\t"%s"' % n
704            for l in links.keys():
705                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
706            for l in lans.keys():
707                for n in lans[l]:
708                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
709            print >>dotfile, "}"
710            dotfile.close()
711        except TypeError:
712            raise service_error(service_error.internal,
713                    "Single endpoint link in vtopo")
714        except EnvironmentError:
715            raise service_error(service_error.internal, "Cannot write dot file")
716
717        # Use dot to create a visualization
718        dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005', '-Gmaxiter=2000',
719                '-Gpack=true', dotname], stdout=PIPE, stderr=dnull,
720                close_fds=True)
721        dnull.close()
722
723        # Translate dot to vis format
724        vis_nodes = [ ]
725        vis = { 'node': vis_nodes }
726        for line in dot.stdout:
727            m = vis_re.match(line)
728            if m:
729                vn = m.group(1)
730                vis_node = {'name': vn, \
731                        'x': float(m.group(2)),\
732                        'y' : float(m.group(3)),\
733                    }
734                if vn in links.keys() or vn in lans.keys():
735                    vis_node['type'] = 'lan'
736                else:
737                    vis_node['type'] = 'node'
738                vis_nodes.append(vis_node)
739        rv = dot.wait()
740
741        os.remove(dotname)
742        if rv == 0 : return vis
743        else: return None
744
745
746    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
747            cert_pwd=None):
748        """
749        Release access to testbed through fedd
750        """
751
752        if not uri and tbmap:
753            uri = tbmap.get(tb, None)
754        if not uri:
755            raise service_error(service_error.server_config, 
756                    "Unknown testbed: %s" % tb)
757
758        if self.local_access.has_key(uri):
759            resp = self.local_access[uri].ReleaseAccess(\
760                    { 'ReleaseAccessRequestBody' : {'allocID': aid},}, 
761                    fedid(file=cert_file))
762            resp = { 'ReleaseAccessResponseBody': resp } 
763        else:
764            resp = self.call_ReleaseAccess(uri, {'allocID': aid},
765                    cert_file, cert_pwd, self.trusted_certs)
766
767        # better error coding
768
769    def remote_ns2topdl(self, uri, desc):
770
771        req = {
772                'description' : { 'ns2description': desc },
773            }
774
775        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
776                self.trusted_certs)
777
778        if r.has_key('Ns2TopdlResponseBody'):
779            r = r['Ns2TopdlResponseBody']
780            ed = r.get('experimentdescription', None)
781            if ed.has_key('topdldescription'):
782                return topdl.Topology(**ed['topdldescription'])
783            else:
784                raise service_error(service_error.protocol, 
785                        "Bad splitter response (no output)")
786        else:
787            raise service_error(service_error.protocol, "Bad splitter response")
788
789    class start_segment:
790        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
791                cert_pwd=None, trusted_certs=None, caller=None,
792                log_collector=None):
793            self.log = log
794            self.debug = debug
795            self.cert_file = cert_file
796            self.cert_pwd = cert_pwd
797            self.trusted_certs = None
798            self.caller = caller
799            self.testbed = testbed
800            self.log_collector = log_collector
801            self.response = None
802            self.node = { }
803
804        def make_map(self, resp):
805            for e in resp.get('embedding', []):
806                if 'toponame' in e and 'physname' in e:
807                    self.node[e['toponame']] = e['physname'][0]
808
809        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
810            req = {
811                    'allocID': { 'fedid' : aid }, 
812                    'segmentdescription': { 
813                        'topdldescription': topo.to_dict(),
814                    },
815                }
816
817            if connInfo:
818                req['connection'] = connInfo
819
820            import_svcs = [ s for m in masters.values() \
821                    for s in m if self.testbed in s.importers]
822
823            if import_svcs or self.testbed in masters:
824                req['service'] = []
825
826            for s in import_svcs:
827                for r in s.reqs:
828                    sr = copy.deepcopy(r)
829                    sr['visibility'] = 'import';
830                    req['service'].append(sr)
831
832            for s in masters.get(self.testbed, []):
833                for r in s.reqs:
834                    sr = copy.deepcopy(r)
835                    sr['visibility'] = 'export';
836                    req['service'].append(sr)
837
838            if attrs:
839                req['fedAttr'] = attrs
840
841            try:
842                self.log.debug("Calling StartSegment at %s " % uri)
843                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
844                        self.trusted_certs)
845                if r.has_key('StartSegmentResponseBody'):
846                    lval = r['StartSegmentResponseBody'].get('allocationLog',
847                            None)
848                    if lval and self.log_collector:
849                        for line in  lval.splitlines(True):
850                            self.log_collector.write(line)
851                    self.make_map(r['StartSegmentResponseBody'])
852                    self.response = r
853                else:
854                    raise service_error(service_error.internal, 
855                            "Bad response!?: %s" %r)
856                return True
857            except service_error, e:
858                self.log.error("Start segment failed on %s: %s" % \
859                        (self.testbed, e))
860                return False
861
862
863
864    class terminate_segment:
865        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
866                cert_pwd=None, trusted_certs=None, caller=None):
867            self.log = log
868            self.debug = debug
869            self.cert_file = cert_file
870            self.cert_pwd = cert_pwd
871            self.trusted_certs = None
872            self.caller = caller
873            self.testbed = testbed
874
875        def __call__(self, uri, aid ):
876            req = {
877                    'allocID': aid , 
878                }
879            try:
880                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
881                        self.trusted_certs)
882                return True
883            except service_error, e:
884                self.log.error("Terminate segment failed on %s: %s" % \
885                        (self.testbed, e))
886                return False
887
888    def allocate_resources(self, allocated, masters, eid, expid, 
889            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
890            attrs=None, connInfo={}, tbmap=None, expcert=None):
891
892        started = { }           # Testbeds where a sub-experiment started
893                                # successfully
894
895        # XXX
896        fail_soft = False
897
898        if tbmap is None: tbmap = { }
899
900        log = alloc_log or self.log
901
902        tp = thread_pool(self.nthreads)
903        threads = [ ]
904        starters = [ ]
905
906        if expcert:
907            cert = expcert
908            pw = None
909        else:
910            cert = self.cert_file
911            pw = self.cert_pwd
912
913        for tb in allocated.keys():
914            # Create and start a thread to start the segment, and save it
915            # to get the return value later
916            tb_attrs = copy.copy(attrs)
917            tp.wait_for_slot()
918            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
919            base, suffix = split_testbed(tb)
920            if suffix:
921                tb_attrs.append({'attribute': 'experiment_name', 
922                    'value': "%s-%s" % (eid, suffix)})
923            else:
924                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
925            if not uri:
926                raise service_error(service_error.internal, 
927                        "Unknown testbed %s !?" % tb)
928
929            if tbparams[tb].has_key('allocID') and \
930                    tbparams[tb]['allocID'].has_key('fedid'):
931                aid = tbparams[tb]['allocID']['fedid']
932            else:
933                raise service_error(service_error.internal, 
934                        "No alloc id for testbed %s !?" % tb)
935
936            s = self.start_segment(log=log, debug=self.debug,
937                    testbed=tb, cert_file=cert,
938                    cert_pwd=pw, trusted_certs=self.trusted_certs,
939                    caller=self.call_StartSegment,
940                    log_collector=log_collector)
941            starters.append(s)
942            t  = pooled_thread(\
943                    target=s, name=tb,
944                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
945                    pdata=tp, trace_file=self.trace_file)
946            threads.append(t)
947            t.start()
948
949        # Wait until all finish (keep pinging the log, though)
950        mins = 0
951        revoked = False
952        while not tp.wait_for_all_done(60.0):
953            mins += 1
954            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
955                    % mins)
956            if not revoked and \
957                    len([ t.getName() for t in threads if t.rv == False]) > 0:
958                # a testbed has failed.  Revoke this experiment's
959                # synchronizarion values so that sub experiments will not
960                # deadlock waiting for synchronization that will never happen
961                self.log.info("A subexperiment has failed to swap in, " + \
962                        "revoking synch keys")
963                var_key = "fedid:%s" % expid
964                for k in self.synch_store.all_keys():
965                    if len(k) > 45 and k[0:46] == var_key:
966                        self.synch_store.revoke_key(k)
967                revoked = True
968
969        failed = [ t.getName() for t in threads if not t.rv ]
970        succeeded = [tb for tb in allocated.keys() if tb not in failed]
971
972        # If one failed clean up, unless fail_soft is set
973        if failed:
974            if not fail_soft:
975                tp.clear()
976                for tb in succeeded:
977                    # Create and start a thread to stop the segment
978                    tp.wait_for_slot()
979                    uri = tbparams[tb]['uri']
980                    t  = pooled_thread(\
981                            target=self.terminate_segment(log=log,
982                                testbed=tb,
983                                cert_file=cert, 
984                                cert_pwd=pw,
985                                trusted_certs=self.trusted_certs,
986                                caller=self.call_TerminateSegment),
987                            args=(uri, tbparams[tb]['federant']['allocID']),
988                            name=tb,
989                            pdata=tp, trace_file=self.trace_file)
990                    t.start()
991                # Wait until all finish (if any are being stopped)
992                if succeeded:
993                    tp.wait_for_all_done()
994
995                # release the allocations
996                for tb in tbparams.keys():
997                    try:
998                        self.release_access(tb, tbparams[tb]['allocID'], 
999                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1000                                cert_file=cert, cert_pwd=pw)
1001                    except service_error, e:
1002                        self.log.warn("Error releasing access: %s" % e.desc)
1003                # Remove the placeholder
1004                self.state_lock.acquire()
1005                self.state[eid]['experimentStatus'] = 'failed'
1006                if self.state_filename: self.write_state()
1007                self.state_lock.release()
1008                # Remove the repo dir
1009                self.remove_dirs("%s/%s" %(self.repodir, expid))
1010                # Walk up tmpdir, deleting as we go
1011                if self.cleanup:
1012                    self.remove_dirs(tmpdir)
1013                else:
1014                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1015
1016
1017                log.error("Swap in failed on %s" % ",".join(failed))
1018                return
1019        else:
1020            # Walk through the successes and gather the virtual to physical
1021            # mapping.
1022            embedding = [ ]
1023            for s in starters:
1024                for k, v in s.node.items():
1025                    embedding.append({
1026                        'toponame': k, 
1027                        'physname': [ v],
1028                        'testbed': s.testbed
1029                        })
1030            log.info("[start_segment]: Experiment %s active" % eid)
1031
1032
1033        # Walk up tmpdir, deleting as we go
1034        if self.cleanup:
1035            self.remove_dirs(tmpdir)
1036        else:
1037            log.debug("[start_experiment]: not removing %s" % tmpdir)
1038
1039        # Insert the experiment into our state and update the disk copy.
1040        self.state_lock.acquire()
1041        self.state[expid]['experimentStatus'] = 'active'
1042        self.state[eid] = self.state[expid]
1043        self.state[eid]['experimentdescription']['topdldescription'] = \
1044                top.to_dict()
1045        self.state[eid]['embedding'] = embedding
1046        if self.state_filename: self.write_state()
1047        self.state_lock.release()
1048        return
1049
1050
1051    def add_kit(self, e, kit):
1052        """
1053        Add a Software object created from the list of (install, location)
1054        tuples passed as kit  to the software attribute of an object e.  We
1055        do this enough to break out the code, but it's kind of a hack to
1056        avoid changing the old tuple rep.
1057        """
1058
1059        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1060
1061        if isinstance(e.software, list): e.software.extend(s)
1062        else: e.software = s
1063
1064    def append_experiment_authorization(self, expid, attrs, 
1065            need_state_lock=True):
1066        """
1067        Append the authorization information to system state
1068        """
1069
1070        for p, a in attrs:
1071            self.auth.set_attribute(p, a)
1072        self.auth.save()
1073
1074        if need_state_lock: self.state_lock.acquire()
1075        self.state[expid]['auth'].update(attrs)
1076        if self.state_filename: self.write_state()
1077        if need_state_lock: self.state_lock.release()
1078
1079    def clear_experiment_authorization(self, expid, need_state_lock=True):
1080        """
1081        Attrs is a set of attribute principal pairs that need to be removed
1082        from the authenticator.  Remove them and save the authenticator.
1083        """
1084
1085        if need_state_lock: self.state_lock.acquire()
1086        if expid in self.state and 'auth' in self.state[expid]:
1087            for p, a in self.state[expid]['auth']:
1088                self.auth.unset_attribute(p, a)
1089            self.state[expid]['auth'] = set()
1090        if self.state_filename: self.write_state()
1091        if need_state_lock: self.state_lock.release()
1092        self.auth.save()
1093
1094
1095    def create_experiment_state(self, fid, req, expid, expcert,
1096            state='starting'):
1097        """
1098        Create the initial entry in the experiment's state.  The expid and
1099        expcert are the experiment's fedid and certifacte that represents that
1100        ID, which are installed in the experiment state.  If the request
1101        includes a suggested local name that is used if possible.  If the local
1102        name is already taken by an experiment owned by this user that has
1103        failed, it is overwritten.  Otherwise new letters are added until a
1104        valid localname is found.  The generated local name is returned.
1105        """
1106
1107        if req.has_key('experimentID') and \
1108                req['experimentID'].has_key('localname'):
1109            overwrite = False
1110            eid = req['experimentID']['localname']
1111            # If there's an old failed experiment here with the same local name
1112            # and accessible by this user, we'll overwrite it, otherwise we'll
1113            # fall through and do the collision avoidance.
1114            old_expid = self.get_experiment_fedid(eid)
1115            if old_expid and self.check_experiment_access(fid, old_expid):
1116                self.state_lock.acquire()
1117                status = self.state[eid].get('experimentStatus', None)
1118                if status and status == 'failed':
1119                    # remove the old access attributes
1120                    self.clear_experiment_authorization(eid,
1121                            need_state_lock=False)
1122                    overwrite = True
1123                    del self.state[eid]
1124                    del self.state[old_expid]
1125                self.state_lock.release()
1126            self.state_lock.acquire()
1127            while (self.state.has_key(eid) and not overwrite):
1128                eid += random.choice(string.ascii_letters)
1129            # Initial state
1130            self.state[eid] = {
1131                    'experimentID' : \
1132                            [ { 'localname' : eid }, {'fedid': expid } ],
1133                    'experimentStatus': state,
1134                    'experimentAccess': { 'X509' : expcert },
1135                    'owner': fid,
1136                    'log' : [],
1137                    'auth': set(),
1138                }
1139            self.state[expid] = self.state[eid]
1140            if self.state_filename: self.write_state()
1141            self.state_lock.release()
1142        else:
1143            eid = self.exp_stem
1144            for i in range(0,5):
1145                eid += random.choice(string.ascii_letters)
1146            self.state_lock.acquire()
1147            while (self.state.has_key(eid)):
1148                eid = self.exp_stem
1149                for i in range(0,5):
1150                    eid += random.choice(string.ascii_letters)
1151            # Initial state
1152            self.state[eid] = {
1153                    'experimentID' : \
1154                            [ { 'localname' : eid }, {'fedid': expid } ],
1155                    'experimentStatus': state,
1156                    'experimentAccess': { 'X509' : expcert },
1157                    'owner': fid,
1158                    'log' : [],
1159                    'auth': set(),
1160                }
1161            self.state[expid] = self.state[eid]
1162            if self.state_filename: self.write_state()
1163            self.state_lock.release()
1164
1165        # Let users touch the state.  Authorize this fid and the expid itself
1166        # to touch the experiment, as well as allowing th eoverrides.
1167        self.append_experiment_authorization(eid, 
1168                set([(fid, expid), (expid,expid)] + \
1169                        [ (o, expid) for o in self.overrides]))
1170
1171        return eid
1172
1173
1174    def allocate_ips_to_topo(self, top):
1175        """
1176        Add an ip4_address attribute to all the hosts in the topology, based on
1177        the shared substrates on which they sit.  An /etc/hosts file is also
1178        created and returned as a list of hostfiles entries.  We also return
1179        the allocator, because we may need to allocate IPs to portals
1180        (specifically DRAGON portals).
1181        """
1182        subs = sorted(top.substrates, 
1183                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1184                reverse=True)
1185        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1186        ifs = { }
1187        hosts = [ ]
1188
1189        for idx, s in enumerate(subs):
1190            net_size = len(s.interfaces)+2
1191
1192            a = ips.allocate(net_size)
1193            if a :
1194                base, num = a
1195                if num < net_size: 
1196                    raise service_error(service_error.internal,
1197                            "Allocator returned wrong number of IPs??")
1198            else:
1199                raise service_error(service_error.req, 
1200                        "Cannot allocate IP addresses")
1201            mask = ips.min_alloc
1202            while mask < net_size:
1203                mask *= 2
1204
1205            netmask = ((2**32-1) ^ (mask-1))
1206
1207            base += 1
1208            for i in s.interfaces:
1209                i.attribute.append(
1210                        topdl.Attribute('ip4_address', 
1211                            "%s" % ip_addr(base)))
1212                i.attribute.append(
1213                        topdl.Attribute('ip4_netmask', 
1214                            "%s" % ip_addr(int(netmask))))
1215
1216                hname = i.element.name
1217                if ifs.has_key(hname):
1218                    hosts.append("%s\t%s-%s %s-%d" % \
1219                            (ip_addr(base), hname, s.name, hname,
1220                                ifs[hname]))
1221                else:
1222                    ifs[hname] = 0
1223                    hosts.append("%s\t%s-%s %s-%d %s" % \
1224                            (ip_addr(base), hname, s.name, hname,
1225                                ifs[hname], hname))
1226
1227                ifs[hname] += 1
1228                base += 1
1229        return hosts, ips
1230
1231    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1232            tbparam, masters, tbmap, expid=None, expcert=None):
1233        for tb in testbeds:
1234            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1235                    expcert)
1236            allocated[tb] = 1
1237
1238    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1239            expcert=None):
1240        """
1241        Get access to testbed through fedd and set the parameters for that tb
1242        """
1243        def get_export_project(svcs):
1244            """
1245            Look through for the list of federated_service for this testbed
1246            objects for a project_export service, and extract the project
1247            parameter.
1248            """
1249
1250            pe = [s for s in svcs if s.name=='project_export']
1251            if len(pe) == 1:
1252                return pe[0].params.get('project', None)
1253            elif len(pe) == 0:
1254                return None
1255            else:
1256                raise service_error(service_error.req,
1257                        "More than one project export is not supported")
1258
1259        def add_services(svcs, type, slist):
1260            """
1261            Add the given services to slist.  type is import or export.
1262            """
1263            for i, s in enumerate(svcs):
1264                idx = '%s%d' % (type, i)
1265                sr = {'id': idx, 'name': s.name, 'visibility': type }
1266                if s.params:
1267                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1268                            for k, v in s.params.items()]
1269                slist.append(sr)
1270
1271        uri = tbmap.get(testbed_base(tb), None)
1272        if not uri:
1273            raise service_error(service_error.server_config, 
1274                    "Unknown testbed: %s" % tb)
1275
1276        export_svcs = masters.get(tb,[])
1277        import_svcs = [ s for m in masters.values() \
1278                for s in m \
1279                    if tb in s.importers ]
1280
1281        export_project = get_export_project(export_svcs)
1282        # Compose the credential list so that IDs come before attributes
1283        creds = set()
1284        keys = set()
1285        certs = self.auth.get_creds_for_principal(fid)
1286        if expid:
1287            certs.update(self.auth.get_creds_for_principal(expid))
1288        for c in certs:
1289            keys.add(c.issuer_cert())
1290            creds.add(c.attribute_cert())
1291        creds = list(keys) + list(creds)
1292
1293        if expcert: cert, pw = expcert, None
1294        else: cert, pw = self.cert_file, self.cert_pw
1295
1296        # Request credentials
1297        req = {
1298                'abac_credential': creds,
1299            }
1300        # Make the service request from the services we're importing and
1301        # exporting.  Keep track of the export request ids so we can
1302        # collect the resulting info from the access response.
1303        e_keys = { }
1304        if import_svcs or export_svcs:
1305            slist = []
1306            add_services(import_svcs, 'import', slist)
1307            add_services(export_svcs, 'export', slist)
1308            req['service'] = slist
1309
1310        if self.local_access.has_key(uri):
1311            # Local access call
1312            req = { 'RequestAccessRequestBody' : req }
1313            r = self.local_access[uri].RequestAccess(req, 
1314                    fedid(file=self.cert_file))
1315            r = { 'RequestAccessResponseBody' : r }
1316        else:
1317            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1318
1319        if r.has_key('RequestAccessResponseBody'):
1320            # Through to here we have a valid response, not a fault.
1321            # Access denied is a fault, so something better or worse than
1322            # access denied has happened.
1323            r = r['RequestAccessResponseBody']
1324            self.log.debug("[get_access] Access granted")
1325        else:
1326            raise service_error(service_error.protocol,
1327                        "Bad proxy response")
1328       
1329        tbparam[tb] = { 
1330                "allocID" : r['allocID'],
1331                "uri": uri,
1332                }
1333
1334        # Collect the responses corresponding to the services this testbed
1335        # exports.  These will be the service requests that we will include in
1336        # the start segment requests (with appropriate visibility values) to
1337        # import and export the segments.
1338        for s in r.get('service', []):
1339            id = s.get('id', None)
1340            if id and id in e_keys:
1341                e_keys[id].reqs.append(s)
1342
1343        # Add attributes to parameter space.  We don't allow attributes to
1344        # overlay any parameters already installed.
1345        for a in r.get('fedAttr', []):
1346            try:
1347                if a['attribute'] and \
1348                        isinstance(a['attribute'], basestring)\
1349                        and not tbparam[tb].has_key(a['attribute'].lower()):
1350                    tbparam[tb][a['attribute'].lower()] = a['value']
1351            except KeyError:
1352                self.log.error("Bad attribute in response: %s" % a)
1353
1354
1355    def split_topology(self, top, topo, testbeds):
1356        """
1357        Create the sub-topologies that are needed for experiment instantiation.
1358        """
1359        for tb in testbeds:
1360            topo[tb] = top.clone()
1361            # copy in for loop allows deletions from the original
1362            for e in [ e for e in topo[tb].elements]:
1363                etb = e.get_attribute('testbed')
1364                # NB: elements without a testbed attribute won't appear in any
1365                # sub topologies. 
1366                if not etb or etb != tb:
1367                    for i in e.interface:
1368                        for s in i.subs:
1369                            try:
1370                                s.interfaces.remove(i)
1371                            except ValueError:
1372                                raise service_error(service_error.internal,
1373                                        "Can't remove interface??")
1374                    topo[tb].elements.remove(e)
1375            topo[tb].make_indices()
1376
1377    def confirm_software(self, top):
1378        """
1379        Make sure that the software to be loaded in the topo is all available
1380        before we begin making access requests, etc.  This is a subset of
1381        wrangle_software.
1382        """
1383        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1384        pkgs.update([x.location for e in top.elements for x in e.software])
1385
1386        for pkg in pkgs:
1387            loc = pkg
1388
1389            scheme, host, path = urlparse(loc)[0:3]
1390            dest = os.path.basename(path)
1391            if not scheme:
1392                if not loc.startswith('/'):
1393                    loc = "/%s" % loc
1394                loc = "file://%s" %loc
1395            # NB: if scheme was found, loc == pkg
1396            try:
1397                u = urlopen(loc)
1398                u.close()
1399            except Exception, e:
1400                raise service_error(service_error.req, 
1401                        "Cannot open %s: %s" % (loc, e))
1402        return True
1403
1404    def wrangle_software(self, expid, top, topo, tbparams):
1405        """
1406        Copy software out to the repository directory, allocate permissions and
1407        rewrite the segment topologies to look for the software in local
1408        places.
1409        """
1410
1411        # Copy the rpms and tarfiles to a distribution directory from
1412        # which the federants can retrieve them
1413        linkpath = "%s/software" %  expid
1414        softdir ="%s/%s" % ( self.repodir, linkpath)
1415        softmap = { }
1416
1417        # self.fedkit and self.gateway kit are lists of tuples of
1418        # (install_location, download_location) this extracts the download
1419        # locations.
1420        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1421        pkgs.update([x.location for e in top.elements for x in e.software])
1422        try:
1423            os.makedirs(softdir)
1424        except EnvironmentError, e:
1425            raise service_error(
1426                    "Cannot create software directory: %s" % e)
1427        # The actual copying.  Everything's converted into a url for copying.
1428        auth_attrs = set()
1429        for pkg in pkgs:
1430            loc = pkg
1431
1432            scheme, host, path = urlparse(loc)[0:3]
1433            dest = os.path.basename(path)
1434            if not scheme:
1435                if not loc.startswith('/'):
1436                    loc = "/%s" % loc
1437                loc = "file://%s" %loc
1438            # NB: if scheme was found, loc == pkg
1439            try:
1440                u = urlopen(loc)
1441            except Exception, e:
1442                raise service_error(service_error.req, 
1443                        "Cannot open %s: %s" % (loc, e))
1444            try:
1445                f = open("%s/%s" % (softdir, dest) , "w")
1446                self.log.debug("Writing %s/%s" % (softdir,dest) )
1447                data = u.read(4096)
1448                while data:
1449                    f.write(data)
1450                    data = u.read(4096)
1451                f.close()
1452                u.close()
1453            except Exception, e:
1454                raise service_error(service_error.internal,
1455                        "Could not copy %s: %s" % (loc, e))
1456            path = re.sub("/tmp", "", linkpath)
1457            # XXX
1458            softmap[pkg] = \
1459                    "%s/%s/%s" %\
1460                    ( self.repo_url, path, dest)
1461
1462            # Allow the individual segments to access the software by assigning
1463            # an attribute to each testbed allocation that encodes the data to
1464            # be released.  This expression collects the data for each run of
1465            # the loop.
1466            auth_attrs.update([
1467                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1468                        for tb in tbparams.keys()])
1469
1470        self.append_experiment_authorization(expid, auth_attrs)
1471
1472        # Convert the software locations in the segments into the local
1473        # copies on this host
1474        for soft in [ s for tb in topo.values() \
1475                for e in tb.elements \
1476                    if getattr(e, 'software', False) \
1477                        for s in e.software ]:
1478            if softmap.has_key(soft.location):
1479                soft.location = softmap[soft.location]
1480
1481
1482    def new_experiment(self, req, fid):
1483        """
1484        The external interface to empty initial experiment creation called from
1485        the dispatcher.
1486
1487        Creates a working directory, splits the incoming description using the
1488        splitter script and parses out the avrious subsections using the
1489        lcasses above.  Once each sub-experiment is created, use pooled threads
1490        to instantiate them and start it all up.
1491        """
1492        req = req.get('NewRequestBody', None)
1493        if not req:
1494            raise service_error(service_error.req,
1495                    "Bad request format (no NewRequestBody)")
1496
1497        if self.auth.import_credentials(data_list=req.get('credential', [])):
1498            self.auth.save()
1499
1500        if not self.auth.check_attribute(fid, 'new'):
1501            raise service_error(service_error.access, "New access denied")
1502
1503        try:
1504            tmpdir = tempfile.mkdtemp(prefix="split-")
1505        except EnvironmentError:
1506            raise service_error(service_error.internal, "Cannot create tmp dir")
1507
1508        try:
1509            access_user = self.accessdb[fid]
1510        except KeyError:
1511            raise service_error(service_error.internal,
1512                    "Access map and authorizer out of sync in " + \
1513                            "new_experiment for fedid %s"  % fid)
1514
1515        # Generate an ID for the experiment (slice) and a certificate that the
1516        # allocator can use to prove they own it.  We'll ship it back through
1517        # the encrypted connection.  If the requester supplied one, use it.
1518        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1519            expcert = req['experimentAccess']['X509']
1520            expid = fedid(certstr=expcert)
1521            self.state_lock.acquire()
1522            if expid in self.state:
1523                self.state_lock.release()
1524                raise service_error(service_error.req, 
1525                        'fedid %s identifies an existing experiment' % expid)
1526            self.state_lock.release()
1527        else:
1528            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1529
1530        #now we're done with the tmpdir, and it should be empty
1531        if self.cleanup:
1532            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1533            os.rmdir(tmpdir)
1534        else:
1535            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1536
1537        eid = self.create_experiment_state(fid, req, expid, expcert, 
1538                state='empty')
1539
1540        rv = {
1541                'experimentID': [
1542                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1543                ],
1544                'experimentStatus': 'empty',
1545                'experimentAccess': { 'X509' : expcert }
1546            }
1547
1548        return rv
1549
1550    # create_experiment sub-functions
1551
1552    @staticmethod
1553    def get_experiment_key(req, field='experimentID'):
1554        """
1555        Parse the experiment identifiers out of the request (the request body
1556        tag has been removed).  Specifically this pulls either the fedid or the
1557        localname out of the experimentID field.  A fedid is preferred.  If
1558        neither is present or the request does not contain the fields,
1559        service_errors are raised.
1560        """
1561        # Get the experiment access
1562        exp = req.get(field, None)
1563        if exp:
1564            if exp.has_key('fedid'):
1565                key = exp['fedid']
1566            elif exp.has_key('localname'):
1567                key = exp['localname']
1568            else:
1569                raise service_error(service_error.req, "Unknown lookup type")
1570        else:
1571            raise service_error(service_error.req, "No request?")
1572
1573        return key
1574
1575    def get_experiment_ids_and_start(self, key, tmpdir):
1576        """
1577        Get the experiment name, id and access certificate from the state, and
1578        set the experiment state to 'starting'.  returns a triple (fedid,
1579        localname, access_cert_file). The access_cert_file is a copy of the
1580        contents of the access certificate, created in the tempdir with
1581        restricted permissions.  If things are confused, raise an exception.
1582        """
1583
1584        expid = eid = None
1585        self.state_lock.acquire()
1586        if self.state.has_key(key):
1587            self.state[key]['experimentStatus'] = "starting"
1588            for e in self.state[key].get('experimentID',[]):
1589                if not expid and e.has_key('fedid'):
1590                    expid = e['fedid']
1591                elif not eid and e.has_key('localname'):
1592                    eid = e['localname']
1593            if 'experimentAccess' in self.state[key] and \
1594                    'X509' in self.state[key]['experimentAccess']:
1595                expcert = self.state[key]['experimentAccess']['X509']
1596            else:
1597                expcert = None
1598        self.state_lock.release()
1599
1600        # make a protected copy of the access certificate so the experiment
1601        # controller can act as the experiment principal.
1602        if expcert:
1603            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1604            if not expcert_file:
1605                raise service_error(service_error.internal, 
1606                        "Cannot create temp cert file?")
1607        else:
1608            expcert_file = None
1609
1610        return (eid, expid, expcert_file)
1611
1612    def get_topology(self, req, tmpdir):
1613        """
1614        Get the ns2 content and put it into a file for parsing.  Call the local
1615        or remote parser and return the topdl.Topology.  Errors result in
1616        exceptions.  req is the request and tmpdir is a work directory.
1617        """
1618
1619        # The tcl parser needs to read a file so put the content into that file
1620        descr=req.get('experimentdescription', None)
1621        if descr:
1622            if 'ns2description' in descr:
1623                file_content=descr['ns2description']
1624            elif 'topdldescription' in descr:
1625                return topdl.Topology(**descr['topdldescription'])
1626            else:
1627                raise service_error(service_error.req, 
1628                        'Unknown experiment description type')
1629        else:
1630            raise service_error(service_error.req, "No experiment description")
1631
1632
1633        if self.splitter_url:
1634            self.log.debug("Calling remote topdl translator at %s" % \
1635                    self.splitter_url)
1636            top = self.remote_ns2topdl(self.splitter_url, file_content)
1637        else:
1638            tclfile = os.path.join(tmpdir, "experiment.tcl")
1639            if file_content:
1640                try:
1641                    f = open(tclfile, 'w')
1642                    f.write(file_content)
1643                    f.close()
1644                except EnvironmentError:
1645                    raise service_error(service_error.internal,
1646                            "Cannot write temp experiment description")
1647            else:
1648                raise service_error(service_error.req, 
1649                        "Only ns2descriptions supported")
1650            pid = "dummy"
1651            gid = "dummy"
1652            eid = "dummy"
1653
1654            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1655                str(self.muxmax), '-m', 'dummy']
1656
1657            tclcmd.extend([pid, gid, eid, tclfile])
1658
1659            self.log.debug("running local splitter %s", " ".join(tclcmd))
1660            # This is just fantastic.  As a side effect the parser copies
1661            # tb_compat.tcl into the current directory, so that directory
1662            # must be writable by the fedd user.  Doing this in the
1663            # temporary subdir ensures this is the case.
1664            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1665                    cwd=tmpdir)
1666            split_data = tclparser.stdout
1667
1668            top = topdl.topology_from_xml(file=split_data, top="experiment")
1669            os.remove(tclfile)
1670
1671        return top
1672
1673    def get_testbed_services(self, req, testbeds):
1674        """
1675        Parse the services section of the request into into two dicts mapping
1676        testbed to lists of federated_service objects.  The first lists all
1677        exporters of services, and the second all exporters of services that
1678        need control portals int the experiment.
1679        """
1680        masters = { }
1681        pmasters = { }
1682        for s in req.get('service', []):
1683            # If this is a service request with the importall field
1684            # set, fill it out.
1685
1686            if s.get('importall', False):
1687                s['import'] = [ tb for tb in testbeds \
1688                        if tb not in s.get('export',[])]
1689                del s['importall']
1690
1691            # Add the service to masters
1692            for tb in s.get('export', []):
1693                if s.get('name', None):
1694
1695                    params = { }
1696                    for a in s.get('fedAttr', []):
1697                        params[a.get('attribute', '')] = a.get('value','')
1698
1699                    fser = federated_service(name=s['name'],
1700                            exporter=tb, importers=s.get('import',[]),
1701                            params=params)
1702                    if fser.name == 'hide_hosts' \
1703                            and 'hosts' not in fser.params:
1704                        fser.params['hosts'] = \
1705                                ",".join(tb_hosts.get(fser.exporter, []))
1706                    if tb in masters: masters[tb].append(fser)
1707                    else: masters[tb] = [fser]
1708
1709                    if fser.portal:
1710                        if tb not in pmasters: pmasters[tb] = [ fser ]
1711                        else: pmasters[tb].append(fser)
1712                else:
1713                    self.log.error('Testbed service does not have name " + \
1714                            "and importers')
1715        return masters, pmasters
1716
1717    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1718        """
1719        Create the ssh keys necessary for interconnecting the potral nodes and
1720        the global hosts file for letting each segment know about the IP
1721        addresses in play.  Save these into the repo.  Add attributes to the
1722        autorizer allowing access controllers to download them and return a set
1723        of attributes that inform the segments where to find this stuff.  Mau
1724        raise service_errors in if there are problems.
1725        """
1726        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1727        gw_secretkey_base = "fed.%s" % self.ssh_type
1728        gw_pubkey = tmpdir + "/keys/" + gw_pubkey_base
1729        gw_secretkey = tmpdir + "/keys/" + gw_secretkey_base
1730
1731        try:
1732            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1733        except ValueError:
1734            raise service_error(service_error.server_config, 
1735                    "Bad key type (%s)" % self.ssh_type)
1736
1737
1738        # Copy configuration files into the remote file store
1739        # The config urlpath
1740        configpath = "/%s/config" % expid
1741        # The config file system location
1742        configdir ="%s%s" % ( self.repodir, configpath)
1743        try:
1744            os.makedirs(configdir)
1745        except EnvironmentError, e:
1746            raise service_error(service_error.internal,
1747                    "Cannot create config directory: %s" % e)
1748        try:
1749            f = open("%s/hosts" % configdir, "w")
1750            print >> f, string.join(hosts, '\n')
1751            f.close()
1752        except EnvironmentError, e:
1753            raise service_error(service_error.internal, 
1754                    "Cannot write hosts file: %s" % e)
1755        try:
1756            copy_file("%s" % gw_pubkey, "%s/%s" % \
1757                    (configdir, gw_pubkey_base))
1758            copy_file("%s" % gw_secretkey, "%s/%s" % \
1759                    (configdir, gw_secretkey_base))
1760        except EnvironmentError, e:
1761            raise service_error(service_error.internal, 
1762                    "Cannot copy keyfiles: %s" % e)
1763
1764        # Allow the individual testbeds to access the configuration files,
1765        # again by setting an attribute for the relevant pathnames on each
1766        # allocation principal.  Yeah, that's a long list comprehension.
1767        self.append_experiment_authorization(expid, set([
1768            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1769                    for tb in tbparams.keys() \
1770                        for f in ("hosts", gw_secretkey_base, gw_pubkey_base)]))
1771
1772        attrs = [ 
1773                {
1774                    'attribute': 'ssh_pubkey', 
1775                    'value': '%s/%s/config/%s' % \
1776                            (self.repo_url, expid, gw_pubkey_base)
1777                },
1778                {
1779                    'attribute': 'ssh_secretkey', 
1780                    'value': '%s/%s/config/%s' % \
1781                            (self.repo_url, expid, gw_secretkey_base)
1782                },
1783                {
1784                    'attribute': 'hosts', 
1785                    'value': '%s/%s/config/hosts' % \
1786                            (self.repo_url, expid)
1787                },
1788            ]
1789        return attrs
1790
1791
1792    def get_vtopo(self, req, fid):
1793        """
1794        Return the stored virtual topology for this experiment
1795        """
1796        rv = None
1797        state = None
1798
1799        req = req.get('VtopoRequestBody', None)
1800        if not req:
1801            raise service_error(service_error.req,
1802                    "Bad request format (no VtopoRequestBody)")
1803        exp = req.get('experiment', None)
1804        if exp:
1805            if exp.has_key('fedid'):
1806                key = exp['fedid']
1807                keytype = "fedid"
1808            elif exp.has_key('localname'):
1809                key = exp['localname']
1810                keytype = "localname"
1811            else:
1812                raise service_error(service_error.req, "Unknown lookup type")
1813        else:
1814            raise service_error(service_error.req, "No request?")
1815
1816        self.check_experiment_access(fid, key)
1817
1818        self.state_lock.acquire()
1819        if self.state.has_key(key):
1820            if self.state[key].has_key('vtopo'):
1821                rv = { 'experiment' : {keytype: key },\
1822                        'vtopo': self.state[key]['vtopo'],\
1823                    }
1824            else:
1825                state = self.state[key]['experimentStatus']
1826        self.state_lock.release()
1827
1828        if rv: return rv
1829        else: 
1830            if state:
1831                raise service_error(service_error.partial, 
1832                        "Not ready: %s" % state)
1833            else:
1834                raise service_error(service_error.req, "No such experiment")
1835
1836    def get_vis(self, req, fid):
1837        """
1838        Return the stored visualization for this experiment
1839        """
1840        rv = None
1841        state = None
1842
1843        req = req.get('VisRequestBody', None)
1844        if not req:
1845            raise service_error(service_error.req,
1846                    "Bad request format (no VisRequestBody)")
1847        exp = req.get('experiment', None)
1848        if exp:
1849            if exp.has_key('fedid'):
1850                key = exp['fedid']
1851                keytype = "fedid"
1852            elif exp.has_key('localname'):
1853                key = exp['localname']
1854                keytype = "localname"
1855            else:
1856                raise service_error(service_error.req, "Unknown lookup type")
1857        else:
1858            raise service_error(service_error.req, "No request?")
1859
1860        self.check_experiment_access(fid, key)
1861
1862        self.state_lock.acquire()
1863        if self.state.has_key(key):
1864            if self.state[key].has_key('vis'):
1865                rv =  { 'experiment' : {keytype: key },\
1866                        'vis': self.state[key]['vis'],\
1867                        }
1868            else:
1869                state = self.state[key]['experimentStatus']
1870        self.state_lock.release()
1871
1872        if rv: return rv
1873        else:
1874            if state:
1875                raise service_error(service_error.partial, 
1876                        "Not ready: %s" % state)
1877            else:
1878                raise service_error(service_error.req, "No such experiment")
1879
1880   
1881    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1882            top):
1883        """
1884        Store the various data that have changed in the experiment state
1885        between when it was started and the beginning of resource allocation.
1886        This is basically the information about each local allocation.  This
1887        fills in the values of the placeholder allocation in the state.
1888        """
1889        # save federant information
1890        for k in allocated.keys():
1891            tbparams[k]['federant'] = {
1892                    'name': [ { 'localname' : eid} ],
1893                    'allocID' : tbparams[k]['allocID'],
1894                    'uri': tbparams[k]['uri'],
1895                }
1896
1897        self.state_lock.acquire()
1898        self.state[eid]['vtopo'] = vtopo
1899        self.state[eid]['vis'] = vis
1900        self.state[eid]['experimentdescription'] = \
1901                { 'topdldescription': top.to_dict() }
1902        self.state[eid]['federant'] = \
1903                [ tbparams[tb]['federant'] for tb in tbparams.keys() \
1904                    if tbparams[tb].has_key('federant') ]
1905        if self.state_filename: 
1906            self.write_state()
1907        self.state_lock.release()
1908
1909    def clear_placeholder(self, eid, expid, tmpdir):
1910        """
1911        Clear the placeholder and remove any allocated temporary dir.
1912        """
1913
1914        self.state_lock.acquire()
1915        del self.state[eid]
1916        del self.state[expid]
1917        if self.state_filename: self.write_state()
1918        self.state_lock.release()
1919        if tmpdir and self.cleanup:
1920            self.remove_dirs(tmpdir)
1921
1922    # end of create_experiment sub-functions
1923
1924    def create_experiment(self, req, fid):
1925        """
1926        The external interface to experiment creation called from the
1927        dispatcher.
1928
1929        Creates a working directory, splits the incoming description using the
1930        splitter script and parses out the various subsections using the
1931        classes above.  Once each sub-experiment is created, use pooled threads
1932        to instantiate them and start it all up.
1933        """
1934
1935        req = req.get('CreateRequestBody', None)
1936        if req:
1937            key = self.get_experiment_key(req)
1938        else:
1939            raise service_error(service_error.req,
1940                    "Bad request format (no CreateRequestBody)")
1941
1942        # Import information from the requester
1943        if self.auth.import_credentials(data_list=req.get('credential', [])):
1944            self.auth.save()
1945
1946        # Make sure that the caller can talk to us
1947        self.check_experiment_access(fid, key)
1948
1949        # Install the testbed map entries supplied with the request into a copy
1950        # of the testbed map.
1951        tbmap = dict(self.tbmap)
1952        for m in req.get('testbedmap', []):
1953            if 'testbed' in m and 'uri' in m:
1954                tbmap[m['testbed']] = m['uri']
1955
1956        # a place to work
1957        try:
1958            tmpdir = tempfile.mkdtemp(prefix="split-")
1959            os.mkdir(tmpdir+"/keys")
1960        except EnvironmentError:
1961            raise service_error(service_error.internal, "Cannot create tmp dir")
1962
1963        tbparams = { }
1964
1965        eid, expid, expcert_file = \
1966                self.get_experiment_ids_and_start(key, tmpdir)
1967
1968        # This catches exceptions to clear the placeholder if necessary
1969        try: 
1970            if not (eid and expid):
1971                raise service_error(service_error.internal, 
1972                        "Cannot find local experiment info!?")
1973
1974            top = self.get_topology(req, tmpdir)
1975            self.confirm_software(top)
1976            # Assign the IPs
1977            hosts, ip_allocator = self.allocate_ips_to_topo(top)
1978            # Find the testbeds to look up
1979            tb_hosts = { }
1980            testbeds = [ ]
1981            for e in top.elements:
1982                if isinstance(e, topdl.Computer):
1983                    tb = e.get_attribute('testbed') or 'default'
1984                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
1985                    else: 
1986                        tb_hosts[tb] = [ e.name ]
1987                        testbeds.append(tb)
1988
1989            masters, pmasters = self.get_testbed_services(req, testbeds)
1990            allocated = { }         # Testbeds we can access
1991            topo ={ }               # Sub topologies
1992            connInfo = { }          # Connection information
1993
1994            self.split_topology(top, topo, testbeds)
1995
1996            self.get_access_to_testbeds(testbeds, fid, allocated, 
1997                    tbparams, masters, tbmap, expid, expcert_file)
1998
1999            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2000
2001            part = experiment_partition(self.auth, self.store_url, tbmap,
2002                    self.muxmax, self.direct_transit)
2003            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2004                    connInfo, expid)
2005
2006            auth_attrs = set()
2007            # Now get access to the dynamic testbeds (those added above)
2008            for tb in [ t for t in topo if t not in allocated]:
2009                self.get_access(tb, tbparams, fid, masters, tbmap, 
2010                        expid, expcert_file)
2011                allocated[tb] = 1
2012                store_keys = topo[tb].get_attribute('store_keys')
2013                # Give the testbed access to keys it exports or imports
2014                if store_keys:
2015                    auth_attrs.update(set([
2016                        (tbparams[tb]['allocID']['fedid'], sk) \
2017                                for sk in store_keys.split(" ")]))
2018
2019            if auth_attrs:
2020                self.append_experiment_authorization(expid, auth_attrs)
2021
2022            # transit and disconnected testbeds may not have a connInfo entry.
2023            # Fill in the blanks.
2024            for t in allocated.keys():
2025                if not connInfo.has_key(t):
2026                    connInfo[t] = { }
2027
2028            self.wrangle_software(expid, top, topo, tbparams)
2029
2030            vtopo = topdl.topology_to_vtopo(top)
2031            vis = self.genviz(vtopo)
2032            self.save_federant_information(allocated, tbparams, eid, vtopo, 
2033                    vis, top)
2034        except service_error, e:
2035            # If something goes wrong in the parse (usually an access error)
2036            # clear the placeholder state.  From here on out the code delays
2037            # exceptions.  Failing at this point returns a fault to the remote
2038            # caller.
2039            self.clear_placeholder(eid, expid, tmpdir)
2040            raise e
2041
2042        # Start the background swapper and return the starting state.  From
2043        # here on out, the state will stick around a while.
2044
2045        # XXX: I think this is redundant
2046        # Let users touch the state
2047        # self.auth.set_attribute(fid, expid)
2048        # self.auth.set_attribute(expid, expid)
2049        # Override fedids can manipulate state as well
2050        # for o in self.overrides:
2051            # self.auth.set_attribute(o, expid)
2052        # self.auth.save()
2053
2054        # Create a logger that logs to the experiment's state object as well as
2055        # to the main log file.
2056        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2057        alloc_collector = self.list_log(self.state[eid]['log'])
2058        h = logging.StreamHandler(alloc_collector)
2059        # XXX: there should be a global one of these rather than repeating the
2060        # code.
2061        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2062                    '%d %b %y %H:%M:%S'))
2063        alloc_log.addHandler(h)
2064
2065        # Start a thread to do the resource allocation
2066        t  = Thread(target=self.allocate_resources,
2067                args=(allocated, masters, eid, expid, tbparams, 
2068                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2069                    connInfo, tbmap, expcert_file),
2070                name=eid)
2071        t.start()
2072
2073        rv = {
2074                'experimentID': [
2075                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2076                ],
2077                'experimentStatus': 'starting',
2078            }
2079
2080        return rv
2081   
2082    def get_experiment_fedid(self, key):
2083        """
2084        find the fedid associated with the localname key in the state database.
2085        """
2086
2087        rv = None
2088        self.state_lock.acquire()
2089        if self.state.has_key(key):
2090            if isinstance(self.state[key], dict):
2091                try:
2092                    kl = [ f['fedid'] for f in \
2093                            self.state[key]['experimentID']\
2094                                if f.has_key('fedid') ]
2095                except KeyError:
2096                    self.state_lock.release()
2097                    raise service_error(service_error.internal, 
2098                            "No fedid for experiment %s when getting "+\
2099                                    "fedid(!?)" % key)
2100                if len(kl) == 1:
2101                    rv = kl[0]
2102                else:
2103                    self.state_lock.release()
2104                    raise service_error(service_error.internal, 
2105                            "multiple fedids for experiment %s when " +\
2106                                    "getting fedid(!?)" % key)
2107            else:
2108                self.state_lock.release()
2109                raise service_error(service_error.internal, 
2110                        "Unexpected state for %s" % key)
2111        self.state_lock.release()
2112        return rv
2113
2114    def check_experiment_access(self, fid, key):
2115        """
2116        Confirm that the fid has access to the experiment.  Though a request
2117        may be made in terms of a local name, the access attribute is always
2118        the experiment's fedid.
2119        """
2120        if not isinstance(key, fedid):
2121            key = self.get_experiment_fedid(key)
2122
2123        if self.auth.check_attribute(fid, key):
2124            return True
2125        else:
2126            raise service_error(service_error.access, "Access Denied")
2127
2128
2129    def get_handler(self, path, fid):
2130        """
2131        Perhaps surprisingly named, this function handles HTTP GET requests to
2132        this server (SOAP requests are POSTs).
2133        """
2134        self.log.info("Get handler %s %s" % (path, fid))
2135        if self.auth.check_attribute(fid, path):
2136            return ("%s/%s" % (self.repodir, path), "application/binary")
2137        else:
2138            return (None, None)
2139
2140    def clean_info_response(self, rv):
2141        """
2142        Remove the information in the experiment's state object that is not in
2143        the info response.
2144        """
2145        # Remove the owner info (should always be there, but...)
2146        if rv.has_key('owner'): del rv['owner']
2147        if 'auth' in rv: del rv['auth']
2148
2149        # Convert the log into the allocationLog parameter and remove the
2150        # log entry (with defensive programming)
2151        if rv.has_key('log'):
2152            rv['allocationLog'] = "".join(rv['log'])
2153            del rv['log']
2154        else:
2155            rv['allocationLog'] = ""
2156
2157        if rv['experimentStatus'] != 'active':
2158            if rv.has_key('federant'): del rv['federant']
2159        else:
2160            # remove the allocationID and uri info from each federant
2161            for f in rv.get('federant', []):
2162                if f.has_key('allocID'): del f['allocID']
2163                if f.has_key('uri'): del f['uri']
2164
2165        return rv
2166
2167    def get_info(self, req, fid):
2168        """
2169        Return all the stored info about this experiment
2170        """
2171        rv = None
2172
2173        req = req.get('InfoRequestBody', None)
2174        if not req:
2175            raise service_error(service_error.req,
2176                    "Bad request format (no InfoRequestBody)")
2177        exp = req.get('experiment', None)
2178        if exp:
2179            if exp.has_key('fedid'):
2180                key = exp['fedid']
2181                keytype = "fedid"
2182            elif exp.has_key('localname'):
2183                key = exp['localname']
2184                keytype = "localname"
2185            else:
2186                raise service_error(service_error.req, "Unknown lookup type")
2187        else:
2188            raise service_error(service_error.req, "No request?")
2189
2190        self.check_experiment_access(fid, key)
2191
2192        # The state may be massaged by the service function that called
2193        # get_info (e.g., encoded for XMLRPC transport) so send a copy of the
2194        # state.
2195        self.state_lock.acquire()
2196        if self.state.has_key(key):
2197            rv = copy.deepcopy(self.state[key])
2198        self.state_lock.release()
2199
2200        if rv:
2201            return self.clean_info_response(rv)
2202        else:
2203            raise service_error(service_error.req, "No such experiment")
2204
2205    def get_multi_info(self, req, fid):
2206        """
2207        Return all the stored info that this fedid can access
2208        """
2209        rv = { 'info': [ ] }
2210
2211        self.state_lock.acquire()
2212        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2213            try:
2214                self.check_experiment_access(fid, key)
2215            except service_error, e:
2216                if e.code == service_error.access:
2217                    continue
2218                else:
2219                    self.state_lock.release()
2220                    raise e
2221
2222            if self.state.has_key(key):
2223                e = copy.deepcopy(self.state[key])
2224                e = self.clean_info_response(e)
2225                rv['info'].append(e)
2226        self.state_lock.release()
2227        return rv
2228
2229    def check_termination_status(self, fed_exp, force):
2230        """
2231        Confirm that the experiment is sin a valid state to stop (or force it)
2232        return the state - invalid states for deletion and force settings cause
2233        exceptions.
2234        """
2235        self.state_lock.acquire()
2236        status = fed_exp.get('experimentStatus', None)
2237
2238        if status:
2239            if status in ('starting', 'terminating'):
2240                if not force:
2241                    self.state_lock.release()
2242                    raise service_error(service_error.partial, 
2243                            'Experiment still being created or destroyed')
2244                else:
2245                    self.log.warning('Experiment in %s state ' % status + \
2246                            'being terminated by force.')
2247            self.state_lock.release()
2248            return status
2249        else:
2250            # No status??? trouble
2251            self.state_lock.release()
2252            raise service_error(service_error.internal,
2253                    "Experiment has no status!?")
2254
2255
2256    def get_termination_info(self, fed_exp):
2257        ids = []
2258        term_params = { }
2259        self.state_lock.acquire()
2260        #  experimentID is a list of dicts that are self-describing
2261        #  identifiers.  This finds all the fedids and localnames - the
2262        #  keys of self.state - and puts them into ids, which is used to delete
2263        #  the state after everything is swapped out.
2264        for id in fed_exp.get('experimentID', []):
2265            if 'fedid' in id: 
2266                ids.append(id['fedid'])
2267                repo = "%s" % id['fedid']
2268            if 'localname' in id: ids.append(id['localname'])
2269
2270        # Get the experimentAccess - the principal for this experiment.  It
2271        # is this principal to which credentials have been delegated, and
2272        # as which the experiment controller must act.
2273        if 'experimentAccess' in fed_exp and \
2274                'X509' in fed_exp['experimentAccess']:
2275            expcert = fed_exp['experimentAccess']['X509']
2276        else:
2277            expcert = None
2278
2279        # Collect the allocation/segment ids into a dict keyed by the fedid
2280        # of the allocation (or a monotonically increasing integer) that
2281        # contains a tuple of uri, aid (which is a dict...)
2282        for i, fed in enumerate(fed_exp.get('federant', [])):
2283            try:
2284                uri = fed['uri']
2285                aid = fed['allocID']
2286                k = fed['allocID'].get('fedid', i)
2287            except KeyError, e:
2288                continue
2289            term_params[k] = (uri, aid)
2290        # Change the experiment state
2291        fed_exp['experimentStatus'] = 'terminating'
2292        if self.state_filename: self.write_state()
2293        self.state_lock.release()
2294
2295        return ids, term_params, expcert, repo
2296
2297
2298    def deallocate_resources(self, term_params, expcert, status, force, 
2299            dealloc_log):
2300        tmpdir = None
2301        # This try block makes sure the tempdir is cleared
2302        try:
2303            # If no expcert, try the deallocation as the experiment
2304            # controller instance.
2305            if expcert and self.auth_type != 'legacy': 
2306                try:
2307                    tmpdir = tempfile.mkdtemp(prefix="term-")
2308                except EnvironmentError:
2309                    raise service_error(service_error.internal, 
2310                            "Cannot create tmp dir")
2311                cert_file = self.make_temp_certfile(expcert, tmpdir)
2312                pw = None
2313            else: 
2314                cert_file = self.cert_file
2315                pw = self.cert_pwd
2316
2317            # Stop everyone.  NB, wait_for_all waits until a thread starts
2318            # and then completes, so we can't wait if nothing starts.  So,
2319            # no tbparams, no start.
2320            if len(term_params) > 0:
2321                tp = thread_pool(self.nthreads)
2322                for k, (uri, aid) in term_params.items():
2323                    # Create and start a thread to stop the segment
2324                    tp.wait_for_slot()
2325                    t  = pooled_thread(\
2326                            target=self.terminate_segment(log=dealloc_log,
2327                                testbed=uri,
2328                                cert_file=cert_file, 
2329                                cert_pwd=pw,
2330                                trusted_certs=self.trusted_certs,
2331                                caller=self.call_TerminateSegment),
2332                            args=(uri, aid), name=k,
2333                            pdata=tp, trace_file=self.trace_file)
2334                    t.start()
2335                # Wait for completions
2336                tp.wait_for_all_done()
2337
2338            # release the allocations (failed experiments have done this
2339            # already, and starting experiments may be in odd states, so we
2340            # ignore errors releasing those allocations
2341            try: 
2342                for k, (uri, aid)  in term_params.items():
2343                    self.release_access(None, aid, uri=uri,
2344                            cert_file=cert_file, cert_pwd=pw)
2345            except service_error, e:
2346                if status != 'failed' and not force:
2347                    raise e
2348
2349        # Clean up the tmpdir no matter what
2350        finally:
2351            if tmpdir: self.remove_dirs(tmpdir)
2352
2353
2354    def terminate_experiment(self, req, fid):
2355        """
2356        Swap this experiment out on the federants and delete the shared
2357        information
2358        """
2359        tbparams = { }
2360        req = req.get('TerminateRequestBody', None)
2361        if not req:
2362            raise service_error(service_error.req,
2363                    "Bad request format (no TerminateRequestBody)")
2364
2365        key = self.get_experiment_key(req, 'experiment')
2366        self.check_experiment_access(fid, key)
2367        exp = req.get('experiment', False)
2368        force = req.get('force', False)
2369
2370        dealloc_list = [ ]
2371
2372
2373        # Create a logger that logs to the dealloc_list as well as to the main
2374        # log file.
2375        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2376        h = logging.StreamHandler(self.list_log(dealloc_list))
2377        # XXX: there should be a global one of these rather than repeating the
2378        # code.
2379        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2380                    '%d %b %y %H:%M:%S'))
2381        dealloc_log.addHandler(h)
2382
2383        self.state_lock.acquire()
2384        fed_exp = self.state.get(key, None)
2385        self.state_lock.release()
2386        repo = None
2387
2388        if fed_exp:
2389            status = self.check_termination_status(fed_exp, force)
2390            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2391            self.deallocate_resources(term_params, expcert, status, force, 
2392                    dealloc_log)
2393
2394            # Remove the terminated experiment
2395            self.state_lock.acquire()
2396            for id in ids:
2397                self.clear_experiment_authorization(id, need_state_lock=False)
2398                if id in self.state: del self.state[id]
2399
2400            if self.state_filename: self.write_state()
2401            self.state_lock.release()
2402
2403            # Delete any synch points associated with this experiment.  All
2404            # synch points begin with the fedid of the experiment.
2405            fedid_keys = set(["fedid:%s" % f for f in ids \
2406                    if isinstance(f, fedid)])
2407            for k in self.synch_store.all_keys():
2408                try:
2409                    if len(k) > 45 and k[0:46] in fedid_keys:
2410                        self.synch_store.del_value(k)
2411                except synch_store.BadDeletionError:
2412                    pass
2413            self.write_store()
2414
2415            # Remove software and other cached stuff from the filesystem.
2416            if repo:
2417                self.remove_dirs("%s/%s" % (self.repodir, repo))
2418               
2419            return { 
2420                    'experiment': exp , 
2421                    'deallocationLog': string.join(dealloc_list, ''),
2422                    }
2423        else:
2424            raise service_error(service_error.req, "No saved state")
2425
2426
2427    def GetValue(self, req, fid):
2428        """
2429        Get a value from the synchronized store
2430        """
2431        req = req.get('GetValueRequestBody', None)
2432        if not req:
2433            raise service_error(service_error.req,
2434                    "Bad request format (no GetValueRequestBody)")
2435       
2436        name = req.get('name', None)
2437        wait = req.get('wait', False)
2438        rv = { 'name': name }
2439
2440        if name and self.auth.check_attribute(fid, name):
2441            self.log.debug("[GetValue] asking for %s " % name)
2442            try:
2443                v = self.synch_store.get_value(name, wait)
2444            except synch_store.RevokedKeyError:
2445                # No more synch on this key
2446                raise service_error(service_error.federant, 
2447                        "Synch key %s revoked" % name)
2448            if v is not None:
2449                rv['value'] = v
2450            self.log.debug("[GetValue] got %s from %s" % (v, name))
2451            return rv
2452        else:
2453            raise service_error(service_error.access, "Access Denied")
2454       
2455
2456    def SetValue(self, req, fid):
2457        """
2458        Set a value in the synchronized store
2459        """
2460        req = req.get('SetValueRequestBody', None)
2461        if not req:
2462            raise service_error(service_error.req,
2463                    "Bad request format (no SetValueRequestBody)")
2464       
2465        name = req.get('name', None)
2466        v = req.get('value', '')
2467
2468        if name and self.auth.check_attribute(fid, name):
2469            try:
2470                self.synch_store.set_value(name, v)
2471                self.write_store()
2472                self.log.debug("[SetValue] set %s to %s" % (name, v))
2473            except synch_store.CollisionError:
2474                # Translate into a service_error
2475                raise service_error(service_error.req,
2476                        "Value already set: %s" %name)
2477            except synch_store.RevokedKeyError:
2478                # No more synch on this key
2479                raise service_error(service_error.federant, 
2480                        "Synch key %s revoked" % name)
2481            return { 'name': name, 'value': v }
2482        else:
2483            raise service_error(service_error.access, "Access Denied")
Note: See TracBrowser for help on using the repository browser.