source: fedd/federation/experiment_control.py @ 29d5f7c

compt_changesinfo-ops
Last change on this file since 29d5f7c was 29d5f7c, checked in by Ted Faber <faber@…>, 12 years ago

More new Info stuff. Create, terminate, ftopo all work.

  • Property mode set to 100644
File size: 83.0 KB
Line 
1#!/usr/local/bin/python
2
3import os,sys
4
5import re
6import random
7import string
8import subprocess
9import tempfile
10import copy
11import pickle
12import logging
13import signal
14import time
15
16import os.path
17
18import traceback
19# For parsing visualization output and splitter output
20import xml.parsers.expat
21
22from threading import Lock, Thread, Condition
23from subprocess import call, Popen, PIPE
24from string import join
25
26from urlparse import urlparse
27from urllib2 import urlopen
28
29from util import *
30from fedid import fedid, generate_fedid
31from remote_service import xmlrpc_handler, soap_handler, service_caller
32from service_error import service_error
33from synch_store import synch_store
34from experiment_partition import experiment_partition
35from experiment_control_legacy import experiment_control_legacy
36from authorizer import abac_authorizer
37from thread_pool import thread_pool, pooled_thread
38from experiment_info import experiment_info, allocation_info
39
40import topdl
41import list_log
42from ip_allocator import ip_allocator
43from ip_addr import ip_addr
44
45
46class nullHandler(logging.Handler):
47    def emit(self, record): pass
48
49fl = logging.getLogger("fedd.experiment_control")
50fl.addHandler(nullHandler())
51
52
53# Right now, no support for composition.
54class federated_service:
55    def __init__(self, name, exporter=None, importers=None, params=None, 
56            reqs=None, portal=None):
57        self.name=name
58        self.exporter=exporter
59        if importers is None: self.importers = []
60        else: self.importers=importers
61        if params is None: self.params = { }
62        else: self.params = params
63        if reqs is None: self.reqs = []
64        else: self.reqs = reqs
65
66        if portal is not None:
67            self.portal = portal
68        else:
69            self.portal = (name in federated_service.needs_portal)
70
71    def __str__(self):
72        return "name %s export %s import %s params %s reqs %s" % \
73                (self.name, self.exporter, self.importers, self.params,
74                        [ (r['name'], r['visibility']) for r in self.reqs] )
75
76    needs_portal = ('SMB', 'seer', 'tmcd', 'project_export', 'seer_master')
77
78class experiment_control_local(experiment_control_legacy):
79    """
80    Control of experiments that this system can directly access.
81
82    Includes experiment creation, termination and information dissemination.
83    Thred safe.
84    """
85
86    class ssh_cmd_timeout(RuntimeError): pass
87   
88    call_RequestAccess = service_caller('RequestAccess')
89    call_ReleaseAccess = service_caller('ReleaseAccess')
90    call_StartSegment = service_caller('StartSegment')
91    call_TerminateSegment = service_caller('TerminateSegment')
92    call_Ns2Topdl = service_caller('Ns2Topdl')
93
94    def __init__(self, config=None, auth=None):
95        """
96        Intialize the various attributes, most from the config object
97        """
98
99        def parse_tarfile_list(tf):
100            """
101            Parse a tarfile list from the configuration.  This is a set of
102            paths and tarfiles separated by spaces.
103            """
104            rv = [ ]
105            if tf is not None:
106                tl = tf.split()
107                while len(tl) > 1:
108                    p, t = tl[0:2]
109                    del tl[0:2]
110                    rv.append((p, t))
111            return rv
112
113        self.list_log = list_log.list_log
114
115        self.cert_file = config.get("experiment_control", "cert_file")
116        if self.cert_file:
117            self.cert_pwd = config.get("experiment_control", "cert_pwd")
118        else:
119            self.cert_file = config.get("globals", "cert_file")
120            self.cert_pwd = config.get("globals", "cert_pwd")
121
122        self.trusted_certs = config.get("experiment_control", "trusted_certs") \
123                or config.get("globals", "trusted_certs")
124
125        self.repodir = config.get("experiment_control", "repodir")
126        self.repo_url = config.get("experiment_control", "repo_url", 
127                "https://users.isi.deterlab.net:23235");
128
129        self.exp_stem = "fed-stem"
130        self.log = logging.getLogger("fedd.experiment_control")
131        set_log_level(config, "experiment_control", self.log)
132        self.muxmax = 2
133        self.nthreads = 10
134        self.randomize_experiments = False
135
136        self.splitter = None
137        self.ssh_keygen = "/usr/bin/ssh-keygen"
138        self.ssh_identity_file = None
139
140
141        self.debug = config.getboolean("experiment_control", "create_debug")
142        self.cleanup = not config.getboolean("experiment_control", 
143                "leave_tmpfiles")
144        self.state_filename = config.get("experiment_control", 
145                "experiment_state")
146        self.store_filename = config.get("experiment_control", 
147                "synch_store")
148        self.store_url = config.get("experiment_control", "store_url")
149        self.splitter_url = config.get("experiment_control", "ns2topdl_uri")
150        self.fedkit = parse_tarfile_list(\
151                config.get("experiment_control", "fedkit"))
152        self.gatewaykit = parse_tarfile_list(\
153                config.get("experiment_control", "gatewaykit"))
154        accessdb_file = config.get("experiment_control", "accessdb")
155
156        dt = config.get("experiment_control", "direct_transit")
157        self.auth_type = config.get('experiment_control', 'auth_type') \
158                or 'legacy'
159        self.auth_dir = config.get('experiment_control', 'auth_dir')
160        if dt: self.direct_transit = [ tb.strip() for tb in dt.split(",")]
161        else: self.direct_transit = [ ]
162        # NB for internal master/slave ops, not experiment setup
163        self.ssh_type = config.get("experiment_control", "sshkeytype", "rsa")
164
165        self.overrides = set([])
166        ovr = config.get('experiment_control', 'overrides')
167        if ovr:
168            for o in ovr.split(","):
169                o = o.strip()
170                if o.startswith('fedid:'): o = o[len('fedid:'):]
171                self.overrides.add(fedid(hexstr=o))
172
173        self.state = { }
174        self.state_lock = Lock()
175        self.tclsh = "/usr/local/bin/otclsh"
176        self.tcl_splitter = config.get("ns2topdl", "tcl_splitter") or \
177                config.get("experiment_control", "tcl_splitter",
178                        "/usr/testbed/lib/ns2ir/parse.tcl")
179        mapdb_file = config.get("experiment_control", "mapdb")
180        self.trace_file = sys.stderr
181
182        self.def_expstart = \
183                "sudo -H /bin/sh /usr/local/federation/bin/federate.sh >& " +\
184                "/tmp/federate";
185        self.def_mexpstart = "sudo -H /usr/local/federation/bin/make_hosts " +\
186                "FEDDIR/hosts";
187        self.def_gwstart = \
188                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF>& " +\
189                "/tmp/bridge.log";
190        self.def_mgwstart = \
191                "sudo -H /usr/local/federation/bin/fed-tun.pl -f GWCONF >& " +\
192                "/tmp/bridge.log";
193        self.def_gwimage = "FBSD61-TUNNEL2";
194        self.def_gwtype = "pc";
195        self.local_access = { }
196
197        if self.auth_type == 'legacy':
198            if auth:
199                self.auth = auth
200            else:
201                self.log.error( "[access]: No authorizer initialized, " +\
202                        "creating local one.")
203                auth = authorizer()
204            self.get_access = self.legacy_get_access
205        elif self.auth_type == 'abac':
206            self.auth = abac_authorizer(load=self.auth_dir)
207        else:
208            raise service_error(service_error.internal, 
209                    "Unknown auth_type: %s" % self.auth_type)
210
211        if mapdb_file:
212            self.read_mapdb(mapdb_file)
213        else:
214            self.log.warn("[experiment_control] No testbed map, using defaults")
215            self.tbmap = { 
216                    'deter':'https://users.isi.deterlab.net:23235',
217                    'emulab':'https://users.isi.deterlab.net:23236',
218                    'ucb':'https://users.isi.deterlab.net:23237',
219                    }
220
221        if accessdb_file:
222                self.read_accessdb(accessdb_file)
223        else:
224            raise service_error(service_error.internal,
225                    "No accessdb specified in config")
226
227        # Grab saved state.  OK to do this w/o locking because it's read only
228        # and only one thread should be in existence that can see self.state at
229        # this point.
230        if self.state_filename:
231            self.read_state()
232
233        if self.store_filename:
234            self.read_store()
235        else:
236            self.log.warning("No saved synch store")
237            self.synch_store = synch_store
238
239        # Dispatch tables
240        self.soap_services = {\
241                'New': soap_handler('New', self.new_experiment),
242                'Create': soap_handler('Create', self.create_experiment),
243                'Vtopo': soap_handler('Vtopo', self.get_vtopo),
244                'Vis': soap_handler('Vis', self.get_vis),
245                'Info': soap_handler('Info', self.get_info),
246                'MultiInfo': soap_handler('MultiInfo', self.get_multi_info),
247                'Terminate': soap_handler('Terminate', 
248                    self.terminate_experiment),
249                'GetValue': soap_handler('GetValue', self.GetValue),
250                'SetValue': soap_handler('SetValue', self.SetValue),
251        }
252
253        self.xmlrpc_services = {\
254                'New': xmlrpc_handler('New', self.new_experiment),
255                'Create': xmlrpc_handler('Create', self.create_experiment),
256                'Vtopo': xmlrpc_handler('Vtopo', self.get_vtopo),
257                'Vis': xmlrpc_handler('Vis', self.get_vis),
258                'Info': xmlrpc_handler('Info', self.get_info),
259                'MultiInfo': xmlrpc_handler('MultiInfo', self.get_multi_info),
260                'Terminate': xmlrpc_handler('Terminate',
261                    self.terminate_experiment),
262                'GetValue': xmlrpc_handler('GetValue', self.GetValue),
263                'SetValue': xmlrpc_handler('SetValue', self.SetValue),
264        }
265
266    # Call while holding self.state_lock
267    def write_state(self):
268        """
269        Write a new copy of experiment state after copying the existing state
270        to a backup.
271
272        State format is a simple pickling of the state dictionary.
273        """
274        if os.access(self.state_filename, os.W_OK):
275            copy_file(self.state_filename, \
276                    "%s.bak" % self.state_filename)
277        try:
278            f = open(self.state_filename, 'w')
279            pickle.dump(self.state, f)
280        except EnvironmentError, e:
281            self.log.error("Can't write file %s: %s" % \
282                    (self.state_filename, e))
283        except pickle.PicklingError, e:
284            self.log.error("Pickling problem: %s" % e)
285        except TypeError, e:
286            self.log.error("Pickling problem (TypeError): %s" % e)
287
288    @staticmethod
289    def get_alloc_ids(exp):
290        """
291        Used by read_store and read state.  This used to be worse.
292        """
293
294        return [ a.allocID for a in exp.get_all_allocations() ]
295
296
297    # Call while holding self.state_lock
298    def read_state(self):
299        """
300        Read a new copy of experiment state.  Old state is overwritten.
301
302        State format is a simple pickling of the state dictionary.
303        """
304       
305        try:
306            f = open(self.state_filename, "r")
307            self.state = pickle.load(f)
308            self.log.debug("[read_state]: Read state from %s" % \
309                    self.state_filename)
310        except EnvironmentError, e:
311            self.log.warning("[read_state]: No saved state: Can't open %s: %s"\
312                    % (self.state_filename, e))
313        except pickle.UnpicklingError, e:
314            self.log.warning(("[read_state]: No saved state: " + \
315                    "Unpickling failed: %s") % e)
316       
317        for s in self.state.values():
318            try:
319
320                eid = s.fedid
321                if eid : 
322                    if self.auth_type == 'legacy':
323                        # XXX: legacy
324                        # Give the owner rights to the experiment
325                        #self.auth.set_attribute(s['owner'], eid)
326                        # And holders of the eid as well
327                        self.auth.set_attribute(eid, eid)
328                        # allow overrides to control experiments as well
329                        for o in self.overrides:
330                            self.auth.set_attribute(o, eid)
331                        # Set permissions to allow reading of the software
332                        # repo, if any, as well.
333                        for a in self.get_alloc_ids(s):
334                            self.auth.set_attribute(a, 'repo/%s' % eid)
335                else:
336                    raise KeyError("No experiment id")
337            except KeyError, e:
338                self.log.warning("[read_state]: State ownership or identity " +\
339                        "misformatted in %s: %s" % (self.state_filename, e))
340
341
342    def read_accessdb(self, accessdb_file):
343        """
344        Read the mapping from fedids that can create experiments to their name
345        in the 3-level access namespace.  All will be asserted from this
346        testbed and can include the local username and porject that will be
347        asserted on their behalf by this fedd.  Each fedid is also added to the
348        authorization system with the "create" attribute.
349        """
350        self.accessdb = {}
351        # These are the regexps for parsing the db
352        name_expr = "[" + string.ascii_letters + string.digits + "\.\-]+"
353        project_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
354                "\s*->\(\s*("+name_expr+")\s*,\s*("+name_expr+")\s*\)\s*$")
355        user_line = re.compile("^\s*fedid:([" + string.hexdigits + "]+)"+ \
356                "\s*->\s*(" + name_expr + ")\s*$")
357        lineno = 0
358
359        # Parse the mappings and store in self.authdb, a dict of
360        # fedid -> (proj, user)
361        try:
362            f = open(accessdb_file, "r")
363            for line in f:
364                lineno += 1
365                line = line.strip()
366                if len(line) == 0 or line.startswith('#'):
367                    continue
368                m = project_line.match(line)
369                if m:
370                    fid = fedid(hexstr=m.group(1))
371                    project, user = m.group(2,3)
372                    if not self.accessdb.has_key(fid):
373                        self.accessdb[fid] = []
374                    self.accessdb[fid].append((project, user))
375                    continue
376
377                m = user_line.match(line)
378                if m:
379                    fid = fedid(hexstr=m.group(1))
380                    project = None
381                    user = m.group(2)
382                    if not self.accessdb.has_key(fid):
383                        self.accessdb[fid] = []
384                    self.accessdb[fid].append((project, user))
385                    continue
386                self.log.warn("[experiment_control] Error parsing access " +\
387                        "db %s at line %d" %  (accessdb_file, lineno))
388        except EnvironmentError:
389            raise service_error(service_error.internal,
390                    ("Error opening/reading %s as experiment " +\
391                            "control accessdb") %  accessdb_file)
392        f.close()
393
394        # Initialize the authorization attributes
395        # XXX: legacy
396        if self.auth_type == 'legacy':
397            for fid in self.accessdb.keys():
398                self.auth.set_attribute(fid, 'create')
399                self.auth.set_attribute(fid, 'new')
400
401    def read_mapdb(self, file):
402        """
403        Read a simple colon separated list of mappings for the
404        label-to-testbed-URL mappings.  Clears or creates self.tbmap.
405        """
406
407        self.tbmap = { }
408        lineno =0
409        try:
410            f = open(file, "r")
411            for line in f:
412                lineno += 1
413                line = line.strip()
414                if line.startswith('#') or len(line) == 0:
415                    continue
416                try:
417                    label, url = line.split(':', 1)
418                    self.tbmap[label] = url
419                except ValueError, e:
420                    self.log.warn("[read_mapdb] Ignored bad line (%d) in " +\
421                            "map db: %s %s" % (lineno, line, e))
422        except EnvironmentError, e:
423            self.log.warning("[read_mapdb]: No saved map database: Can't " +\
424                    "open %s: %s" % (file, e))
425        else:
426            f.close()
427
428    def read_store(self):
429        try:
430            self.synch_store = synch_store()
431            self.synch_store.load(self.store_filename)
432            self.log.debug("[read_store]: Read store from %s" % \
433                    self.store_filename)
434        except EnvironmentError, e:
435            self.log.warning("[read_store]: No saved store: Can't open %s: %s"\
436                    % (self.state_filename, e))
437            self.synch_store = synch_store()
438
439        # Set the initial permissions on data in the store.  XXX: This ad hoc
440        # authorization attribute initialization is getting out of hand.
441        # XXX: legacy
442        if self.auth_type == 'legacy':
443            for k in self.synch_store.all_keys():
444                try:
445                    if k.startswith('fedid:'):
446                        fid = fedid(hexstr=k[6:46])
447                        if self.state.has_key(fid):
448                            for a in self.get_alloc_ids(self.state[fid]):
449                                self.auth.set_attribute(a, k)
450                except ValueError, e:
451                    self.log.warn("Cannot deduce permissions for %s" % k)
452
453
454    def write_store(self):
455        """
456        Write a new copy of synch_store after writing current state
457        to a backup.  We use the internal synch_store pickle method to avoid
458        incinsistent data.
459
460        State format is a simple pickling of the store.
461        """
462        if os.access(self.store_filename, os.W_OK):
463            copy_file(self.store_filename, \
464                    "%s.bak" % self.store_filename)
465        try:
466            self.synch_store.save(self.store_filename)
467        except EnvironmentError, e:
468            self.log.error("Can't write file %s: %s" % \
469                    (self.store_filename, e))
470        except TypeError, e:
471            self.log.error("Pickling problem (TypeError): %s" % e)
472
473
474    def remove_dirs(self, dir):
475        """
476        Remove the directory tree and all files rooted at dir.  Log any errors,
477        but continue.
478        """
479        self.log.debug("[removedirs]: removing %s" % dir)
480        try:
481            for path, dirs, files in os.walk(dir, topdown=False):
482                for f in files:
483                    os.remove(os.path.join(path, f))
484                for d in dirs:
485                    os.rmdir(os.path.join(path, d))
486            os.rmdir(dir)
487        except EnvironmentError, e:
488            self.log.error("Error deleting directory tree in %s" % e);
489
490    @staticmethod
491    def make_temp_certfile(expcert, tmpdir):
492        """
493        make a protected copy of the access certificate so the experiment
494        controller can act as the experiment principal.  mkstemp is the most
495        secure way to do that. The directory should be created by
496        mkdtemp.  Return the filename.
497        """
498        if expcert and tmpdir:
499            try:
500                certf, certfn = tempfile.mkstemp(suffix=".pem", dir=tmpdir)
501                f = os.fdopen(certf, 'w')
502                print >> f, expcert
503                f.close()
504            except EnvironmentError, e:
505                raise service_error(service_error.internal, 
506                        "Cannot create temp cert file?")
507            return certfn
508        else:
509            return None
510
511       
512    def generate_ssh_keys(self, dest, type="rsa" ):
513        """
514        Generate a set of keys for the gateways to use to talk.
515
516        Keys are of type type and are stored in the required dest file.
517        """
518        valid_types = ("rsa", "dsa")
519        t = type.lower();
520        if t not in valid_types: raise ValueError
521        cmd = [self.ssh_keygen, '-t', t, '-N', '', '-f', dest]
522
523        try:
524            trace = open("/dev/null", "w")
525        except EnvironmentError:
526            raise service_error(service_error.internal,
527                    "Cannot open /dev/null??");
528
529        # May raise CalledProcessError
530        self.log.debug("[generate_ssh_keys]: %s" % " ".join(cmd))
531        rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
532        if rv != 0:
533            raise service_error(service_error.internal, 
534                    "Cannot generate nonce ssh keys.  %s return code %d" \
535                            % (self.ssh_keygen, rv))
536
537    def generate_seer_certs(self, destdir):
538        '''
539        Create a SEER ca cert and a node cert in destdir/ca.pem and
540        destdir/node.pem respectively.  These will be distributed throughout
541        the federated experiment.  This routine reports errors via
542        service_errors.
543        '''
544        openssl = '/usr/bin/openssl'
545        # All the filenames and parameters we need for openssl calls below
546        ca_key =os.path.join(destdir, 'ca.key') 
547        ca_pem = os.path.join(destdir, 'ca.pem')
548        node_key =os.path.join(destdir, 'node.key') 
549        node_pem = os.path.join(destdir, 'node.pem')
550        node_req = os.path.join(destdir, 'node.req')
551        node_signed = os.path.join(destdir, 'node.signed')
552        days = '%s' % (365 * 10)
553        serial = '%s' % random.randint(0, 1<<16)
554
555        try:
556            # Sequence of calls to create a CA key, create a ca cert, create a
557            # node key, node signing request, and finally a signed node
558            # certificate.
559            sequence = (
560                    (openssl, 'genrsa', '-out', ca_key, '1024'),
561                    (openssl, 'req', '-new', '-x509', '-key', ca_key, '-out', 
562                        ca_pem, '-days', days, '-subj', 
563                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=CA' ),
564                    (openssl, 'genrsa', '-out', node_key, '1024'),
565                    (openssl, 'req', '-new', '-key', node_key, '-out', 
566                        node_req, '-days', days, '-subj', 
567                        '/C=US/ST=CA/O=DETER/OU=fedd/CN=node' ),
568                    (openssl, 'x509', '-CA', ca_pem, '-CAkey', ca_key, 
569                        '-set_serial', serial, '-req', '-in', node_req, 
570                        '-out', node_signed, '-days', days),
571                )
572            # Do all that stuff; bail if there's an error, and push all the
573            # output to dev/null.
574            for cmd in sequence:
575                trace = open("/dev/null", "w")
576                rv = call(cmd, stdout=trace, stderr=trace, close_fds=True)
577                if rv != 0:
578                    raise service_error(service_error.internal, 
579                            "Cannot generate SEER certs.  %s return code %d" \
580                                    % (' '.join(cmd), rv))
581            # Concatinate the node key and signed certificate into node.pem
582            f = open(node_pem, 'w')
583            for comp in (node_signed, node_key):
584                g = open(comp, 'r')
585                f.write(g.read())
586                g.close()
587            f.close()
588
589            # Throw out intermediaries.
590            for fn in (ca_key, node_key, node_req, node_signed):
591                os.unlink(fn)
592
593        except EnvironmentError, e:
594            # Any difficulties with the file system wind up here
595            raise service_error(service_error.internal,
596                    "File error on  %s while creating SEER certs: %s" % \
597                            (e.filename, e.strerror))
598
599
600
601    def gentopo(self, str):
602        """
603        Generate the topology data structure from the splitter's XML
604        representation of it.
605
606        The topology XML looks like:
607            <experiment>
608                <nodes>
609                    <node><vname></vname><ips>ip1:ip2</ips></node>
610                </nodes>
611                <lans>
612                    <lan>
613                        <vname></vname><vnode></vnode><ip></ip>
614                        <bandwidth></bandwidth><member>node:port</member>
615                    </lan>
616                </lans>
617        """
618        class topo_parse:
619            """
620            Parse the topology XML and create the dats structure.
621            """
622            def __init__(self):
623                # Typing of the subelements for data conversion
624                self.str_subelements = ('vname', 'vnode', 'ips', 'ip', 'member')
625                self.int_subelements = ( 'bandwidth',)
626                self.float_subelements = ( 'delay',)
627                # The final data structure
628                self.nodes = [ ]
629                self.lans =  [ ]
630                self.topo = { \
631                        'node': self.nodes,\
632                        'lan' : self.lans,\
633                    }
634                self.element = { }  # Current element being created
635                self.chars = ""     # Last text seen
636
637            def end_element(self, name):
638                # After each sub element the contents is added to the current
639                # element or to the appropriate list.
640                if name == 'node':
641                    self.nodes.append(self.element)
642                    self.element = { }
643                elif name == 'lan':
644                    self.lans.append(self.element)
645                    self.element = { }
646                elif name in self.str_subelements:
647                    self.element[name] = self.chars
648                    self.chars = ""
649                elif name in self.int_subelements:
650                    self.element[name] = int(self.chars)
651                    self.chars = ""
652                elif name in self.float_subelements:
653                    self.element[name] = float(self.chars)
654                    self.chars = ""
655
656            def found_chars(self, data):
657                self.chars += data.rstrip()
658
659
660        tp = topo_parse();
661        parser = xml.parsers.expat.ParserCreate()
662        parser.EndElementHandler = tp.end_element
663        parser.CharacterDataHandler = tp.found_chars
664
665        parser.Parse(str)
666
667        return tp.topo
668       
669
670    def genviz(self, topo):
671        """
672        Generate the visualization the virtual topology
673        """
674
675        neato = "/usr/local/bin/neato"
676        # These are used to parse neato output and to create the visualization
677        # file.
678        vis_re = re.compile('^\s*"?([\w\-]+)"?\s+\[.*pos="([\d\.]+),([\d\.]+)"')
679        vis_fmt = "<node><name>%s</name><x>%s</x><y>%s</y><type>" + \
680                "%s</type></node>"
681
682        try:
683            # Node names
684            nodes = [ n['vname'] for n in topo['node'] ]
685            topo_lans = topo['lan']
686        except KeyError, e:
687            raise service_error(service_error.internal, "Bad topology: %s" %e)
688
689        lans = { }
690        links = { }
691
692        # Walk through the virtual topology, organizing the connections into
693        # 2-node connections (links) and more-than-2-node connections (lans).
694        # When a lan is created, it's added to the list of nodes (there's a
695        # node in the visualization for the lan).
696        for l in topo_lans:
697            if links.has_key(l['vname']):
698                if len(links[l['vname']]) < 2:
699                    links[l['vname']].append(l['vnode'])
700                else:
701                    nodes.append(l['vname'])
702                    lans[l['vname']] = links[l['vname']]
703                    del links[l['vname']]
704                    lans[l['vname']].append(l['vnode'])
705            elif lans.has_key(l['vname']):
706                lans[l['vname']].append(l['vnode'])
707            else:
708                links[l['vname']] = [ l['vnode'] ]
709
710
711        # Open up a temporary file for dot to turn into a visualization
712        try:
713            df, dotname = tempfile.mkstemp()
714            dotfile = os.fdopen(df, 'w')
715        except EnvironmentError:
716            raise service_error(service_error.internal,
717                    "Failed to open file in genviz")
718
719        try:
720            dnull = open('/dev/null', 'w')
721        except EnvironmentError:
722            service_error(service_error.internal,
723                    "Failed to open /dev/null in genviz")
724
725        # Generate a dot/neato input file from the links, nodes and lans
726        try:
727            print >>dotfile, "graph G {"
728            for n in nodes:
729                print >>dotfile, '\t"%s"' % n
730            for l in links.keys():
731                print >>dotfile, '\t"%s" -- "%s"' %  tuple(links[l])
732            for l in lans.keys():
733                for n in lans[l]:
734                    print >>dotfile, '\t "%s" -- "%s"' % (n,l)
735            print >>dotfile, "}"
736            dotfile.close()
737        except TypeError:
738            raise service_error(service_error.internal,
739                    "Single endpoint link in vtopo")
740        except EnvironmentError:
741            raise service_error(service_error.internal, "Cannot write dot file")
742
743        # Use dot to create a visualization
744        try:
745            dot = Popen([neato, '-Gstart=rand', '-Gepsilon=0.005',
746                '-Gmaxiter=2000', '-Gpack=true', dotname], stdout=PIPE,
747                stderr=dnull, close_fds=True)
748        except EnvironmentError:
749            raise service_error(service_error.internal, 
750                    "Cannot generate visualization: is graphviz available?")
751        dnull.close()
752
753        # Translate dot to vis format
754        vis_nodes = [ ]
755        vis = { 'node': vis_nodes }
756        for line in dot.stdout:
757            m = vis_re.match(line)
758            if m:
759                vn = m.group(1)
760                vis_node = {'name': vn, \
761                        'x': float(m.group(2)),\
762                        'y' : float(m.group(3)),\
763                    }
764                if vn in links.keys() or vn in lans.keys():
765                    vis_node['type'] = 'lan'
766                else:
767                    vis_node['type'] = 'node'
768                vis_nodes.append(vis_node)
769        rv = dot.wait()
770
771        os.remove(dotname)
772        # XXX: graphviz seems to use low return codes for warnings, like
773        # "couldn't find font"
774        if rv < 2 : return vis
775        else: return None
776
777
778    def release_access(self, tb, aid, tbmap=None, uri=None, cert_file=None,
779            cert_pwd=None):
780        """
781        Release access to testbed through fedd
782        """
783
784        if not uri and tbmap:
785            uri = tbmap.get(tb, None)
786        if not uri:
787            raise service_error(service_error.server_config, 
788                    "Unknown testbed: %s" % tb)
789
790        if self.local_access.has_key(uri):
791            resp = self.local_access[uri].ReleaseAccess(\
792                    { 'ReleaseAccessRequestBody' : 
793                        {'allocID': {'fedid': aid}},}, 
794                    fedid(file=cert_file))
795            resp = { 'ReleaseAccessResponseBody': resp } 
796        else:
797            resp = self.call_ReleaseAccess(uri, {'allocID': {'fedid': aid} },
798                    cert_file, cert_pwd, self.trusted_certs)
799
800        # better error coding
801
802    def remote_ns2topdl(self, uri, desc):
803
804        req = {
805                'description' : { 'ns2description': desc },
806            }
807
808        r = self.call_Ns2Topdl(uri, req, self.cert_file, self.cert_pwd, 
809                self.trusted_certs)
810
811        if r.has_key('Ns2TopdlResponseBody'):
812            r = r['Ns2TopdlResponseBody']
813            ed = r.get('experimentdescription', None)
814            if ed.has_key('topdldescription'):
815                return topdl.Topology(**ed['topdldescription'])
816            else:
817                raise service_error(service_error.protocol, 
818                        "Bad splitter response (no output)")
819        else:
820            raise service_error(service_error.protocol, "Bad splitter response")
821
822    class start_segment:
823        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
824                cert_pwd=None, trusted_certs=None, caller=None,
825                log_collector=None):
826            self.log = log
827            self.debug = debug
828            self.cert_file = cert_file
829            self.cert_pwd = cert_pwd
830            self.trusted_certs = None
831            self.caller = caller
832            self.testbed = testbed
833            self.log_collector = log_collector
834            self.response = None
835            self.node = { }
836            self.proof = None
837
838        def make_map(self, resp):
839            if 'segmentdescription' not in resp  or \
840                    'topdldescription' not in resp['segmentdescription']:
841                self.log.warn('No topology returned from startsegment')
842                return 
843
844            top = topdl.Topology(
845                    **resp['segmentdescription']['topdldescription'])
846
847            for e in top.elements:
848                if isinstance(e, topdl.Computer):
849                    self.node[e.name] = \
850                            (e.localname, e.status, e.service, e.operation)
851
852        def __call__(self, uri, aid, topo, masters, attrs=None, connInfo=None):
853            req = {
854                    'allocID': { 'fedid' : aid }, 
855                    'segmentdescription': { 
856                        'topdldescription': topo.to_dict(),
857                    },
858                }
859
860            if connInfo:
861                req['connection'] = connInfo
862
863            import_svcs = [ s for m in masters.values() \
864                    for s in m if self.testbed in s.importers]
865
866            if import_svcs or self.testbed in masters:
867                req['service'] = []
868
869            for s in import_svcs:
870                for r in s.reqs:
871                    sr = copy.deepcopy(r)
872                    sr['visibility'] = 'import';
873                    req['service'].append(sr)
874
875            for s in masters.get(self.testbed, []):
876                for r in s.reqs:
877                    sr = copy.deepcopy(r)
878                    sr['visibility'] = 'export';
879                    req['service'].append(sr)
880
881            if attrs:
882                req['fedAttr'] = attrs
883
884            try:
885                self.log.debug("Calling StartSegment at %s " % uri)
886                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
887                        self.trusted_certs)
888                if r.has_key('StartSegmentResponseBody'):
889                    lval = r['StartSegmentResponseBody'].get('allocationLog',
890                            None)
891                    if lval and self.log_collector:
892                        for line in  lval.splitlines(True):
893                            self.log_collector.write(line)
894                    self.make_map(r['StartSegmentResponseBody'])
895                    if 'proof' in r: self.proof = r['proof']
896                    self.response = r
897                else:
898                    raise service_error(service_error.internal, 
899                            "Bad response!?: %s" %r)
900                return True
901            except service_error, e:
902                self.log.error("Start segment failed on %s: %s" % \
903                        (self.testbed, e))
904                return False
905
906
907
908    class terminate_segment:
909        def __init__(self, debug=False, log=None, testbed="", cert_file=None,
910                cert_pwd=None, trusted_certs=None, caller=None):
911            self.log = log
912            self.debug = debug
913            self.cert_file = cert_file
914            self.cert_pwd = cert_pwd
915            self.trusted_certs = None
916            self.caller = caller
917            self.testbed = testbed
918
919        def __call__(self, uri, aid ):
920            req = {
921                    'allocID': {'fedid': aid }, 
922                }
923            try:
924                r = self.caller(uri, req, self.cert_file, self.cert_pwd,
925                        self.trusted_certs)
926                return True
927            except service_error, e:
928                self.log.error("Terminate segment failed on %s: %s" % \
929                        (self.testbed, e))
930                return False
931
932    def allocate_resources(self, allocated, masters, eid, expid, 
933            tbparams, top, topo, tmpdir, alloc_log=None, log_collector=None, 
934            attrs=None, connInfo={}, tbmap=None, expcert=None):
935
936        started = { }           # Testbeds where a sub-experiment started
937                                # successfully
938
939        # XXX
940        fail_soft = False
941
942        if tbmap is None: tbmap = { }
943
944        log = alloc_log or self.log
945
946        tp = thread_pool(self.nthreads)
947        threads = [ ]
948        starters = [ ]
949
950        if expcert:
951            cert = expcert
952            pw = None
953        else:
954            cert = self.cert_file
955            pw = self.cert_pwd
956
957        for tb in allocated.keys():
958            # Create and start a thread to start the segment, and save it
959            # to get the return value later
960            tb_attrs = copy.copy(attrs)
961            tp.wait_for_slot()
962            uri = tbparams[tb].get('uri', tbmap.get(testbed_base(tb), None))
963            base, suffix = split_testbed(tb)
964            if suffix:
965                tb_attrs.append({'attribute': 'experiment_name', 
966                    'value': "%s-%s" % (eid, suffix)})
967            else:
968                tb_attrs.append({'attribute': 'experiment_name', 'value': eid})
969            if not uri:
970                raise service_error(service_error.internal, 
971                        "Unknown testbed %s !?" % tb)
972
973            if tbparams[tb].has_key('allocID') and \
974                    tbparams[tb]['allocID'].has_key('fedid'):
975                aid = tbparams[tb]['allocID']['fedid']
976            else:
977                raise service_error(service_error.internal, 
978                        "No alloc id for testbed %s !?" % tb)
979
980            s = self.start_segment(log=log, debug=self.debug,
981                    testbed=tb, cert_file=cert,
982                    cert_pwd=pw, trusted_certs=self.trusted_certs,
983                    caller=self.call_StartSegment,
984                    log_collector=log_collector)
985            starters.append(s)
986            t  = pooled_thread(\
987                    target=s, name=tb,
988                    args=(uri, aid, topo[tb], masters, tb_attrs, connInfo[tb]),
989                    pdata=tp, trace_file=self.trace_file)
990            threads.append(t)
991            t.start()
992
993        # Wait until all finish (keep pinging the log, though)
994        mins = 0
995        revoked = False
996        while not tp.wait_for_all_done(60.0):
997            mins += 1
998            alloc_log.info("Waiting for sub threads (it has been %d mins)" \
999                    % mins)
1000            if not revoked and \
1001                    len([ t.getName() for t in threads if t.rv == False]) > 0:
1002                # a testbed has failed.  Revoke this experiment's
1003                # synchronizarion values so that sub experiments will not
1004                # deadlock waiting for synchronization that will never happen
1005                self.log.info("A subexperiment has failed to swap in, " + \
1006                        "revoking synch keys")
1007                var_key = "fedid:%s" % expid
1008                for k in self.synch_store.all_keys():
1009                    if len(k) > 45 and k[0:46] == var_key:
1010                        self.synch_store.revoke_key(k)
1011                revoked = True
1012
1013        failed = [ t.getName() for t in threads if not t.rv ]
1014        succeeded = [tb for tb in allocated.keys() if tb not in failed]
1015
1016        # If one failed clean up, unless fail_soft is set
1017        if failed:
1018            if not fail_soft:
1019                tp.clear()
1020                for tb in succeeded:
1021                    # Create and start a thread to stop the segment
1022                    tp.wait_for_slot()
1023                    uri = tbparams[tb]['uri']
1024                    t  = pooled_thread(\
1025                            target=self.terminate_segment(log=log,
1026                                testbed=tb,
1027                                cert_file=cert, 
1028                                cert_pwd=pw,
1029                                trusted_certs=self.trusted_certs,
1030                                caller=self.call_TerminateSegment),
1031                            args=(uri, tbparams[tb]['federant']['allocID']),
1032                            name=tb,
1033                            pdata=tp, trace_file=self.trace_file)
1034                    t.start()
1035                # Wait until all finish (if any are being stopped)
1036                if succeeded:
1037                    tp.wait_for_all_done()
1038
1039                # release the allocations
1040                for tb in tbparams.keys():
1041                    try:
1042                        self.release_access(tb, tbparams[tb]['allocID'], 
1043                                tbmap=tbmap, uri=tbparams[tb].get('uri', None),
1044                                cert_file=cert, cert_pwd=pw)
1045                    except service_error, e:
1046                        self.log.warn("Error releasing access: %s" % e.desc)
1047                # Remove the placeholder
1048                self.state_lock.acquire()
1049                self.state[eid].status = 'failed'
1050                if self.state_filename: self.write_state()
1051                self.state_lock.release()
1052                # Remove the repo dir
1053                self.remove_dirs("%s/%s" %(self.repodir, expid))
1054                # Walk up tmpdir, deleting as we go
1055                if self.cleanup:
1056                    self.remove_dirs(tmpdir)
1057                else:
1058                    log.debug("[start_experiment]: not removing %s" % tmpdir)
1059
1060
1061                log.error("Swap in failed on %s" % ",".join(failed))
1062                return
1063        else:
1064            # Walk through the successes and gather the proofs
1065            proofs = { }
1066            for s in starters:
1067                if s.proof:
1068                    proofs[s.testbed] = s.proof
1069
1070            # Annotate the topology with embedding info
1071            for e in top.elements:
1072                if isinstance(e, topdl.Computer):
1073                    for s in starters:
1074                        ann = s.node.get(e.name, None)
1075                        if ann is not None:
1076                            e.localname.extend(ann[0])
1077                            e.status = ann[1]
1078                            e.service.extend(ann[2])
1079                            e.operation.extend(ann[3])
1080                            break
1081
1082            log.info("[start_segment]: Experiment %s active" % eid)
1083
1084
1085        # Walk up tmpdir, deleting as we go
1086        if self.cleanup:
1087            self.remove_dirs(tmpdir)
1088        else:
1089            log.debug("[start_experiment]: not removing %s" % tmpdir)
1090
1091        # Insert the experiment into our state and update the disk copy.
1092        self.state_lock.acquire()
1093        self.state[expid].status = 'active'
1094        self.state[eid] = self.state[expid]
1095        self.state[eid].top = top
1096        # Append startup proofs
1097        for f in self.state[eid].get_all_allocations():
1098            if f.tb in proofs:
1099                f.proof.append(proofs[f.tb])
1100
1101        if self.state_filename: self.write_state()
1102        self.state_lock.release()
1103        return
1104
1105
1106    def add_kit(self, e, kit):
1107        """
1108        Add a Software object created from the list of (install, location)
1109        tuples passed as kit  to the software attribute of an object e.  We
1110        do this enough to break out the code, but it's kind of a hack to
1111        avoid changing the old tuple rep.
1112        """
1113
1114        s = [ topdl.Software(install=i, location=l) for i, l in kit]
1115
1116        if isinstance(e.software, list): e.software.extend(s)
1117        else: e.software = s
1118
1119    def append_experiment_authorization(self, expid, attrs, 
1120            need_state_lock=True):
1121        """
1122        Append the authorization information to system state
1123        """
1124
1125        for p, a in attrs:
1126            self.auth.set_attribute(p, a)
1127        self.auth.save()
1128
1129        if need_state_lock: self.state_lock.acquire()
1130        # XXX: really a no op?
1131        #self.state[expid]['auth'].update(attrs)
1132        if self.state_filename: self.write_state()
1133        if need_state_lock: self.state_lock.release()
1134
1135    def clear_experiment_authorization(self, expid, need_state_lock=True):
1136        """
1137        Attrs is a set of attribute principal pairs that need to be removed
1138        from the authenticator.  Remove them and save the authenticator.
1139        """
1140
1141        if need_state_lock: self.state_lock.acquire()
1142        # XXX: should be a no-op
1143        #if expid in self.state and 'auth' in self.state[expid]:
1144            #for p, a in self.state[expid]['auth']:
1145                #self.auth.unset_attribute(p, a)
1146            #self.state[expid]['auth'] = set()
1147        if self.state_filename: self.write_state()
1148        if need_state_lock: self.state_lock.release()
1149        self.auth.save()
1150
1151
1152    def create_experiment_state(self, fid, req, expid, expcert,
1153            state='starting'):
1154        """
1155        Create the initial entry in the experiment's state.  The expid and
1156        expcert are the experiment's fedid and certifacte that represents that
1157        ID, which are installed in the experiment state.  If the request
1158        includes a suggested local name that is used if possible.  If the local
1159        name is already taken by an experiment owned by this user that has
1160        failed, it is overwritten.  Otherwise new letters are added until a
1161        valid localname is found.  The generated local name is returned.
1162        """
1163
1164        if req.has_key('experimentID') and \
1165                req['experimentID'].has_key('localname'):
1166            overwrite = False
1167            eid = req['experimentID']['localname']
1168            # If there's an old failed experiment here with the same local name
1169            # and accessible by this user, we'll overwrite it, otherwise we'll
1170            # fall through and do the collision avoidance.
1171            old_expid = self.get_experiment_fedid(eid)
1172            if old_expid:
1173                users_experiment = True
1174                try:
1175                    self.check_experiment_access(fid, old_expid)
1176                except service_error, e:
1177                    if e.code == service_error.access: users_experiment = False
1178                    else: raise e
1179                if users_experiment:
1180                    self.state_lock.acquire()
1181                    status = self.state[eid].status
1182                    if status and status == 'failed':
1183                        # remove the old access attributes
1184                        self.clear_experiment_authorization(eid,
1185                                need_state_lock=False)
1186                        overwrite = True
1187                        del self.state[eid]
1188                        del self.state[old_expid]
1189                    self.state_lock.release()
1190                else:
1191                    self.log.info('Experiment %s exists, ' % eid + \
1192                            'but this user cannot access it')
1193            self.state_lock.acquire()
1194            while (self.state.has_key(eid) and not overwrite):
1195                eid += random.choice(string.ascii_letters)
1196            # Initial state
1197            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1198                    identity=expcert)
1199            self.state[expid] = self.state[eid]
1200            if self.state_filename: self.write_state()
1201            self.state_lock.release()
1202        else:
1203            eid = self.exp_stem
1204            for i in range(0,5):
1205                eid += random.choice(string.ascii_letters)
1206            self.state_lock.acquire()
1207            while (self.state.has_key(eid)):
1208                eid = self.exp_stem
1209                for i in range(0,5):
1210                    eid += random.choice(string.ascii_letters)
1211            # Initial state
1212            self.state[eid] = experiment_info(fedid=expid, localname=eid, 
1213                    identity=expcert)
1214            self.state[expid] = self.state[eid]
1215            if self.state_filename: self.write_state()
1216            self.state_lock.release()
1217
1218        # Let users touch the state.  Authorize this fid and the expid itself
1219        # to touch the experiment, as well as allowing th eoverrides.
1220        self.append_experiment_authorization(eid, 
1221                set([(fid, expid), (expid,expid)] + \
1222                        [ (o, expid) for o in self.overrides]))
1223
1224        return eid
1225
1226
1227    def allocate_ips_to_topo(self, top):
1228        """
1229        Add an ip4_address attribute to all the hosts in the topology, based on
1230        the shared substrates on which they sit.  An /etc/hosts file is also
1231        created and returned as a list of hostfiles entries.  We also return
1232        the allocator, because we may need to allocate IPs to portals
1233        (specifically DRAGON portals).
1234        """
1235        subs = sorted(top.substrates, 
1236                cmp=lambda x,y: cmp(len(x.interfaces), len(y.interfaces)),
1237                reverse=True)
1238        ips = ip_allocator(int(ip_addr("10.0.0.0")), 2 **24)
1239        ifs = { }
1240        hosts = [ ]
1241
1242        for idx, s in enumerate(subs):
1243            net_size = len(s.interfaces)+2
1244
1245            a = ips.allocate(net_size)
1246            if a :
1247                base, num = a
1248                if num < net_size: 
1249                    raise service_error(service_error.internal,
1250                            "Allocator returned wrong number of IPs??")
1251            else:
1252                raise service_error(service_error.req, 
1253                        "Cannot allocate IP addresses")
1254            mask = ips.min_alloc
1255            while mask < net_size:
1256                mask *= 2
1257
1258            netmask = ((2**32-1) ^ (mask-1))
1259
1260            base += 1
1261            for i in s.interfaces:
1262                i.attribute.append(
1263                        topdl.Attribute('ip4_address', 
1264                            "%s" % ip_addr(base)))
1265                i.attribute.append(
1266                        topdl.Attribute('ip4_netmask', 
1267                            "%s" % ip_addr(int(netmask))))
1268
1269                hname = i.element.name
1270                if ifs.has_key(hname):
1271                    hosts.append("%s\t%s-%s %s-%d" % \
1272                            (ip_addr(base), hname, s.name, hname,
1273                                ifs[hname]))
1274                else:
1275                    ifs[hname] = 0
1276                    hosts.append("%s\t%s-%s %s-%d %s" % \
1277                            (ip_addr(base), hname, s.name, hname,
1278                                ifs[hname], hname))
1279
1280                ifs[hname] += 1
1281                base += 1
1282        return hosts, ips
1283
1284    def get_access_to_testbeds(self, testbeds, fid, allocated, 
1285            tbparam, masters, tbmap, expid=None, expcert=None):
1286        for tb in testbeds:
1287            self.get_access(tb, tbparam, fid, masters, tbmap, expid,
1288                    expcert)
1289            allocated[tb] = 1
1290
1291    def get_access(self, tb, tbparam,fid, masters, tbmap, expid=None, 
1292            expcert=None):
1293        """
1294        Get access to testbed through fedd and set the parameters for that tb
1295        """
1296        def get_export_project(svcs):
1297            """
1298            Look through for the list of federated_service for this testbed
1299            objects for a project_export service, and extract the project
1300            parameter.
1301            """
1302
1303            pe = [s for s in svcs if s.name=='project_export']
1304            if len(pe) == 1:
1305                return pe[0].params.get('project', None)
1306            elif len(pe) == 0:
1307                return None
1308            else:
1309                raise service_error(service_error.req,
1310                        "More than one project export is not supported")
1311
1312        def add_services(svcs, type, slist, keys):
1313            """
1314            Add the given services to slist.  type is import or export.  Also
1315            add a mapping entry from the assigned id to the original service
1316            record.
1317            """
1318            for i, s in enumerate(svcs):
1319                idx = '%s%d' % (type, i)
1320                keys[idx] = s
1321                sr = {'id': idx, 'name': s.name, 'visibility': type }
1322                if s.params:
1323                    sr['fedAttr'] = [ { 'attribute': k, 'value': v } \
1324                            for k, v in s.params.items()]
1325                slist.append(sr)
1326
1327        uri = tbmap.get(testbed_base(tb), None)
1328        if not uri:
1329            raise service_error(service_error.server_config, 
1330                    "Unknown testbed: %s" % tb)
1331
1332        export_svcs = masters.get(tb,[])
1333        import_svcs = [ s for m in masters.values() \
1334                for s in m \
1335                    if tb in s.importers ]
1336
1337        export_project = get_export_project(export_svcs)
1338        # Compose the credential list so that IDs come before attributes
1339        creds = set()
1340        keys = set()
1341        certs = self.auth.get_creds_for_principal(fid)
1342        if expid:
1343            certs.update(self.auth.get_creds_for_principal(expid))
1344        for c in certs:
1345            keys.add(c.issuer_cert())
1346            creds.add(c.attribute_cert())
1347        creds = list(keys) + list(creds)
1348
1349        if expcert: cert, pw = expcert, None
1350        else: cert, pw = self.cert_file, self.cert_pw
1351
1352        # Request credentials
1353        req = {
1354                'abac_credential': creds,
1355            }
1356        # Make the service request from the services we're importing and
1357        # exporting.  Keep track of the export request ids so we can
1358        # collect the resulting info from the access response.
1359        e_keys = { }
1360        if import_svcs or export_svcs:
1361            slist = []
1362            add_services(import_svcs, 'import', slist, e_keys)
1363            add_services(export_svcs, 'export', slist, e_keys)
1364            req['service'] = slist
1365
1366        if self.local_access.has_key(uri):
1367            # Local access call
1368            req = { 'RequestAccessRequestBody' : req }
1369            r = self.local_access[uri].RequestAccess(req, 
1370                    fedid(file=self.cert_file))
1371            r = { 'RequestAccessResponseBody' : r }
1372        else:
1373            r = self.call_RequestAccess(uri, req, cert, pw, self.trusted_certs)
1374
1375        if r.has_key('RequestAccessResponseBody'):
1376            # Through to here we have a valid response, not a fault.
1377            # Access denied is a fault, so something better or worse than
1378            # access denied has happened.
1379            r = r['RequestAccessResponseBody']
1380            self.log.debug("[get_access] Access granted")
1381        else:
1382            raise service_error(service_error.protocol,
1383                        "Bad proxy response")
1384        if 'proof' not in r:
1385            raise service_error(service_error.protocol,
1386                        "Bad access response (no access proof)")
1387       
1388        tbparam[tb] = { 
1389                "allocID" : r['allocID'],
1390                "uri": uri,
1391                "proof": [ r['proof'] ],
1392                }
1393
1394        # Collect the responses corresponding to the services this testbed
1395        # exports.  These will be the service requests that we will include in
1396        # the start segment requests (with appropriate visibility values) to
1397        # import and export the segments.
1398        for s in r.get('service', []):
1399            id = s.get('id', None)
1400            if id and id in e_keys:
1401                e_keys[id].reqs.append(s)
1402
1403        # Add attributes to parameter space.  We don't allow attributes to
1404        # overlay any parameters already installed.
1405        for a in r.get('fedAttr', []):
1406            try:
1407                if a['attribute'] and \
1408                        isinstance(a['attribute'], basestring)\
1409                        and not tbparam[tb].has_key(a['attribute'].lower()):
1410                    tbparam[tb][a['attribute'].lower()] = a['value']
1411            except KeyError:
1412                self.log.error("Bad attribute in response: %s" % a)
1413
1414
1415    def split_topology(self, top, topo, testbeds):
1416        """
1417        Create the sub-topologies that are needed for experiment instantiation.
1418        """
1419        for tb in testbeds:
1420            topo[tb] = top.clone()
1421            # copy in for loop allows deletions from the original
1422            for e in [ e for e in topo[tb].elements]:
1423                etb = e.get_attribute('testbed')
1424                # NB: elements without a testbed attribute won't appear in any
1425                # sub topologies. 
1426                if not etb or etb != tb:
1427                    for i in e.interface:
1428                        for s in i.subs:
1429                            try:
1430                                s.interfaces.remove(i)
1431                            except ValueError:
1432                                raise service_error(service_error.internal,
1433                                        "Can't remove interface??")
1434                    topo[tb].elements.remove(e)
1435            topo[tb].make_indices()
1436
1437    def confirm_software(self, top):
1438        """
1439        Make sure that the software to be loaded in the topo is all available
1440        before we begin making access requests, etc.  This is a subset of
1441        wrangle_software.
1442        """
1443        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1444        pkgs.update([x.location for e in top.elements for x in e.software])
1445
1446        for pkg in pkgs:
1447            loc = pkg
1448
1449            scheme, host, path = urlparse(loc)[0:3]
1450            dest = os.path.basename(path)
1451            if not scheme:
1452                if not loc.startswith('/'):
1453                    loc = "/%s" % loc
1454                loc = "file://%s" %loc
1455            # NB: if scheme was found, loc == pkg
1456            try:
1457                u = urlopen(loc)
1458                u.close()
1459            except Exception, e:
1460                raise service_error(service_error.req, 
1461                        "Cannot open %s: %s" % (loc, e))
1462        return True
1463
1464    def wrangle_software(self, expid, top, topo, tbparams):
1465        """
1466        Copy software out to the repository directory, allocate permissions and
1467        rewrite the segment topologies to look for the software in local
1468        places.
1469        """
1470
1471        # Copy the rpms and tarfiles to a distribution directory from
1472        # which the federants can retrieve them
1473        linkpath = "%s/software" %  expid
1474        softdir ="%s/%s" % ( self.repodir, linkpath)
1475        softmap = { }
1476
1477        # self.fedkit and self.gateway kit are lists of tuples of
1478        # (install_location, download_location) this extracts the download
1479        # locations.
1480        pkgs = set([ d for i, d in self.fedkit + self.gatewaykit ])
1481        pkgs.update([x.location for e in top.elements for x in e.software])
1482        try:
1483            os.makedirs(softdir)
1484        except EnvironmentError, e:
1485            raise service_error(
1486                    "Cannot create software directory: %s" % e)
1487        # The actual copying.  Everything's converted into a url for copying.
1488        auth_attrs = set()
1489        for pkg in pkgs:
1490            loc = pkg
1491
1492            scheme, host, path = urlparse(loc)[0:3]
1493            dest = os.path.basename(path)
1494            if not scheme:
1495                if not loc.startswith('/'):
1496                    loc = "/%s" % loc
1497                loc = "file://%s" %loc
1498            # NB: if scheme was found, loc == pkg
1499            try:
1500                u = urlopen(loc)
1501            except Exception, e:
1502                raise service_error(service_error.req, 
1503                        "Cannot open %s: %s" % (loc, e))
1504            try:
1505                f = open("%s/%s" % (softdir, dest) , "w")
1506                self.log.debug("Writing %s/%s" % (softdir,dest) )
1507                data = u.read(4096)
1508                while data:
1509                    f.write(data)
1510                    data = u.read(4096)
1511                f.close()
1512                u.close()
1513            except Exception, e:
1514                raise service_error(service_error.internal,
1515                        "Could not copy %s: %s" % (loc, e))
1516            path = re.sub("/tmp", "", linkpath)
1517            # XXX
1518            softmap[pkg] = \
1519                    "%s/%s/%s" %\
1520                    ( self.repo_url, path, dest)
1521
1522            # Allow the individual segments to access the software by assigning
1523            # an attribute to each testbed allocation that encodes the data to
1524            # be released.  This expression collects the data for each run of
1525            # the loop.
1526            auth_attrs.update([
1527                (tbparams[tb]['allocID']['fedid'], "/%s/%s" % ( path, dest)) \
1528                        for tb in tbparams.keys()])
1529
1530        self.append_experiment_authorization(expid, auth_attrs)
1531
1532        # Convert the software locations in the segments into the local
1533        # copies on this host
1534        for soft in [ s for tb in topo.values() \
1535                for e in tb.elements \
1536                    if getattr(e, 'software', False) \
1537                        for s in e.software ]:
1538            if softmap.has_key(soft.location):
1539                soft.location = softmap[soft.location]
1540
1541
1542    def new_experiment(self, req, fid):
1543        """
1544        The external interface to empty initial experiment creation called from
1545        the dispatcher.
1546
1547        Creates a working directory, splits the incoming description using the
1548        splitter script and parses out the avrious subsections using the
1549        lcasses above.  Once each sub-experiment is created, use pooled threads
1550        to instantiate them and start it all up.
1551        """
1552        req = req.get('NewRequestBody', None)
1553        if not req:
1554            raise service_error(service_error.req,
1555                    "Bad request format (no NewRequestBody)")
1556
1557        if self.auth.import_credentials(data_list=req.get('credential', [])):
1558            self.auth.save()
1559
1560        access_ok, proof = self.auth.check_attribute(fid, 'new', 
1561                with_proof=True)
1562
1563        if not access_ok:
1564            raise service_error(service_error.access, "New access denied",
1565                    proof=[proof])
1566
1567        try:
1568            tmpdir = tempfile.mkdtemp(prefix="split-")
1569        except EnvironmentError:
1570            raise service_error(service_error.internal, "Cannot create tmp dir")
1571
1572        try:
1573            access_user = self.accessdb[fid]
1574        except KeyError:
1575            raise service_error(service_error.internal,
1576                    "Access map and authorizer out of sync in " + \
1577                            "new_experiment for fedid %s"  % fid)
1578
1579        # Generate an ID for the experiment (slice) and a certificate that the
1580        # allocator can use to prove they own it.  We'll ship it back through
1581        # the encrypted connection.  If the requester supplied one, use it.
1582        if 'experimentAccess' in req and 'X509' in req['experimentAccess']:
1583            expcert = req['experimentAccess']['X509']
1584            expid = fedid(certstr=expcert)
1585            self.state_lock.acquire()
1586            if expid in self.state:
1587                self.state_lock.release()
1588                raise service_error(service_error.req, 
1589                        'fedid %s identifies an existing experiment' % expid)
1590            self.state_lock.release()
1591        else:
1592            (expid, expcert) = generate_fedid("test", dir=tmpdir, log=self.log)
1593
1594        #now we're done with the tmpdir, and it should be empty
1595        if self.cleanup:
1596            self.log.debug("[new_experiment]: removing %s" % tmpdir)
1597            os.rmdir(tmpdir)
1598        else:
1599            self.log.debug("[new_experiment]: not removing %s" % tmpdir)
1600
1601        eid = self.create_experiment_state(fid, req, expid, expcert, 
1602                state='empty')
1603
1604        rv = {
1605                'experimentID': [
1606                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
1607                ],
1608                'experimentStatus': 'empty',
1609                'experimentAccess': { 'X509' : expcert },
1610                'proof': proof.to_dict(),
1611            }
1612
1613        return rv
1614
1615    # create_experiment sub-functions
1616
1617    @staticmethod
1618    def get_experiment_key(req, field='experimentID'):
1619        """
1620        Parse the experiment identifiers out of the request (the request body
1621        tag has been removed).  Specifically this pulls either the fedid or the
1622        localname out of the experimentID field.  A fedid is preferred.  If
1623        neither is present or the request does not contain the fields,
1624        service_errors are raised.
1625        """
1626        # Get the experiment access
1627        exp = req.get(field, None)
1628        if exp:
1629            if exp.has_key('fedid'):
1630                key = exp['fedid']
1631            elif exp.has_key('localname'):
1632                key = exp['localname']
1633            else:
1634                raise service_error(service_error.req, "Unknown lookup type")
1635        else:
1636            raise service_error(service_error.req, "No request?")
1637
1638        return key
1639
1640    def get_experiment_ids_and_start(self, key, tmpdir):
1641        """
1642        Get the experiment name, id and access certificate from the state, and
1643        set the experiment state to 'starting'.  returns a triple (fedid,
1644        localname, access_cert_file). The access_cert_file is a copy of the
1645        contents of the access certificate, created in the tempdir with
1646        restricted permissions.  If things are confused, raise an exception.
1647        """
1648
1649        expid = eid = None
1650        self.state_lock.acquire()
1651        if key in self.state:
1652            exp = self.state[key]
1653            exp.status = "starting"
1654            expid = exp.fedid
1655            eid = exp.localname
1656            expcert = exp.identity
1657        self.state_lock.release()
1658
1659        # make a protected copy of the access certificate so the experiment
1660        # controller can act as the experiment principal.
1661        if expcert:
1662            expcert_file = self.make_temp_certfile(expcert, tmpdir)
1663            if not expcert_file:
1664                raise service_error(service_error.internal, 
1665                        "Cannot create temp cert file?")
1666        else:
1667            expcert_file = None
1668
1669        return (eid, expid, expcert_file)
1670
1671    def get_topology(self, req, tmpdir):
1672        """
1673        Get the ns2 content and put it into a file for parsing.  Call the local
1674        or remote parser and return the topdl.Topology.  Errors result in
1675        exceptions.  req is the request and tmpdir is a work directory.
1676        """
1677
1678        # The tcl parser needs to read a file so put the content into that file
1679        descr=req.get('experimentdescription', None)
1680        if descr:
1681            if 'ns2description' in descr:
1682                file_content=descr['ns2description']
1683            elif 'topdldescription' in descr:
1684                return topdl.Topology(**descr['topdldescription'])
1685            else:
1686                raise service_error(service_error.req, 
1687                        'Unknown experiment description type')
1688        else:
1689            raise service_error(service_error.req, "No experiment description")
1690
1691
1692        if self.splitter_url:
1693            self.log.debug("Calling remote topdl translator at %s" % \
1694                    self.splitter_url)
1695            top = self.remote_ns2topdl(self.splitter_url, file_content)
1696        else:
1697            tclfile = os.path.join(tmpdir, "experiment.tcl")
1698            if file_content:
1699                try:
1700                    f = open(tclfile, 'w')
1701                    f.write(file_content)
1702                    f.close()
1703                except EnvironmentError:
1704                    raise service_error(service_error.internal,
1705                            "Cannot write temp experiment description")
1706            else:
1707                raise service_error(service_error.req, 
1708                        "Only ns2descriptions supported")
1709            pid = "dummy"
1710            gid = "dummy"
1711            eid = "dummy"
1712
1713            tclcmd = [self.tclsh, self.tcl_splitter, '-t', '-x', 
1714                str(self.muxmax), '-m', 'dummy']
1715
1716            tclcmd.extend([pid, gid, eid, tclfile])
1717
1718            self.log.debug("running local splitter %s", " ".join(tclcmd))
1719            # This is just fantastic.  As a side effect the parser copies
1720            # tb_compat.tcl into the current directory, so that directory
1721            # must be writable by the fedd user.  Doing this in the
1722            # temporary subdir ensures this is the case.
1723            tclparser = Popen(tclcmd, stdout=PIPE, close_fds=True, 
1724                    cwd=tmpdir)
1725            split_data = tclparser.stdout
1726
1727            top = topdl.topology_from_xml(file=split_data, top="experiment")
1728            os.remove(tclfile)
1729
1730        return top
1731
1732    def get_testbed_services(self, req, testbeds):
1733        """
1734        Parse the services section of the request into into two dicts mapping
1735        testbed to lists of federated_service objects.  The first lists all
1736        exporters of services, and the second all exporters of services that
1737        need control portals in the experiment.
1738        """
1739        masters = { }
1740        pmasters = { }
1741        for s in req.get('service', []):
1742            # If this is a service request with the importall field
1743            # set, fill it out.
1744
1745            if s.get('importall', False):
1746                s['import'] = [ tb for tb in testbeds \
1747                        if tb not in s.get('export',[])]
1748                del s['importall']
1749
1750            # Add the service to masters
1751            for tb in s.get('export', []):
1752                if s.get('name', None):
1753
1754                    params = { }
1755                    for a in s.get('fedAttr', []):
1756                        params[a.get('attribute', '')] = a.get('value','')
1757
1758                    fser = federated_service(name=s['name'],
1759                            exporter=tb, importers=s.get('import',[]),
1760                            params=params)
1761                    if fser.name == 'hide_hosts' \
1762                            and 'hosts' not in fser.params:
1763                        fser.params['hosts'] = \
1764                                ",".join(tb_hosts.get(fser.exporter, []))
1765                    if tb in masters: masters[tb].append(fser)
1766                    else: masters[tb] = [fser]
1767
1768                    if fser.portal:
1769                        if tb not in pmasters: pmasters[tb] = [ fser ]
1770                        else: pmasters[tb].append(fser)
1771                else:
1772                    self.log.error('Testbed service does not have name " + \
1773                            "and importers')
1774        return masters, pmasters
1775
1776    def generate_keys_and_hosts(self, tmpdir, expid, hosts, tbparams):
1777        """
1778        Create the ssh keys necessary for interconnecting the portal nodes and
1779        the global hosts file for letting each segment know about the IP
1780        addresses in play.  Save these into the repo.  Add attributes to the
1781        autorizer allowing access controllers to download them and return a set
1782        of attributes that inform the segments where to find this stuff.  May
1783        raise service_errors in if there are problems.
1784        """
1785        gw_pubkey_base = "fed.%s.pub" % self.ssh_type
1786        gw_secretkey_base = "fed.%s" % self.ssh_type
1787        keydir = os.path.join(tmpdir, 'keys')
1788        gw_pubkey = os.path.join(keydir, gw_pubkey_base)
1789        gw_secretkey = os.path.join(keydir, gw_secretkey_base)
1790
1791        try:
1792            self.generate_ssh_keys(gw_secretkey, self.ssh_type)
1793        except ValueError:
1794            raise service_error(service_error.server_config, 
1795                    "Bad key type (%s)" % self.ssh_type)
1796
1797        self.generate_seer_certs(keydir)
1798
1799        # Copy configuration files into the remote file store
1800        # The config urlpath
1801        configpath = "/%s/config" % expid
1802        # The config file system location
1803        configdir ="%s%s" % ( self.repodir, configpath)
1804        try:
1805            os.makedirs(configdir)
1806        except EnvironmentError, e:
1807            raise service_error(service_error.internal,
1808                    "Cannot create config directory: %s" % e)
1809        try:
1810            f = open("%s/hosts" % configdir, "w")
1811            print >> f, string.join(hosts, '\n')
1812            f.close()
1813        except EnvironmentError, e:
1814            raise service_error(service_error.internal, 
1815                    "Cannot write hosts file: %s" % e)
1816        try:
1817            copy_file(gw_pubkey, os.path.join(configdir, gw_pubkey_base))
1818            copy_file(gw_secretkey, os.path.join(configdir, gw_secretkey_base))
1819            copy_file(os.path.join(keydir, 'ca.pem'), 
1820                    os.path.join(configdir, 'ca.pem'))
1821            copy_file(os.path.join(keydir, 'node.pem'), 
1822                    os.path.join(configdir, 'node.pem'))
1823        except EnvironmentError, e:
1824            raise service_error(service_error.internal, 
1825                    "Cannot copy keyfiles: %s" % e)
1826
1827        # Allow the individual testbeds to access the configuration files,
1828        # again by setting an attribute for the relevant pathnames on each
1829        # allocation principal.  Yeah, that's a long list comprehension.
1830        self.append_experiment_authorization(expid, set([
1831            (tbparams[tb]['allocID']['fedid'], "%s/%s" % (configpath, f)) \
1832                    for tb in tbparams.keys() \
1833                        for f in ("hosts", 'ca.pem', 'node.pem', 
1834                            gw_secretkey_base, gw_pubkey_base)]))
1835
1836        attrs = [ 
1837                {
1838                    'attribute': 'ssh_pubkey', 
1839                    'value': '%s/%s/config/%s' % \
1840                            (self.repo_url, expid, gw_pubkey_base)
1841                },
1842                {
1843                    'attribute': 'ssh_secretkey', 
1844                    'value': '%s/%s/config/%s' % \
1845                            (self.repo_url, expid, gw_secretkey_base)
1846                },
1847                {
1848                    'attribute': 'hosts', 
1849                    'value': '%s/%s/config/hosts' % \
1850                            (self.repo_url, expid)
1851                },
1852                {
1853                    'attribute': 'seer_ca_pem', 
1854                    'value': '%s/%s/config/%s' % \
1855                            (self.repo_url, expid, 'ca.pem')
1856                },
1857                {
1858                    'attribute': 'seer_node_pem', 
1859                    'value': '%s/%s/config/%s' % \
1860                            (self.repo_url, expid, 'node.pem')
1861                },
1862            ]
1863        return attrs
1864
1865
1866    def get_vtopo(self, req, fid):
1867        """
1868        Return the stored virtual topology for this experiment
1869        """
1870        rv = None
1871        state = None
1872
1873        req = req.get('VtopoRequestBody', None)
1874        if not req:
1875            raise service_error(service_error.req,
1876                    "Bad request format (no VtopoRequestBody)")
1877        exp = req.get('experiment', None)
1878        if exp:
1879            if exp.has_key('fedid'):
1880                key = exp['fedid']
1881                keytype = "fedid"
1882            elif exp.has_key('localname'):
1883                key = exp['localname']
1884                keytype = "localname"
1885            else:
1886                raise service_error(service_error.req, "Unknown lookup type")
1887        else:
1888            raise service_error(service_error.req, "No request?")
1889
1890        proof = self.check_experiment_access(fid, key)
1891
1892        self.state_lock.acquire()
1893        # XXX: this needs to be recalculated
1894        if self.state.has_key(key):
1895            if self.state[key].has_key('vtopo'):
1896                rv = { 'experiment' : {keytype: key },
1897                        'vtopo': self.state[key]['vtopo'],
1898                        'proof': proof.to_dict(), 
1899                    }
1900            else:
1901                state = self.state[key]['experimentStatus']
1902        self.state_lock.release()
1903
1904        if rv: return rv
1905        else: 
1906            if state:
1907                raise service_error(service_error.partial, 
1908                        "Not ready: %s" % state)
1909            else:
1910                raise service_error(service_error.req, "No such experiment")
1911
1912    def get_vis(self, req, fid):
1913        """
1914        Return the stored visualization for this experiment
1915        """
1916        rv = None
1917        state = None
1918
1919        req = req.get('VisRequestBody', None)
1920        if not req:
1921            raise service_error(service_error.req,
1922                    "Bad request format (no VisRequestBody)")
1923        exp = req.get('experiment', None)
1924        if exp:
1925            if exp.has_key('fedid'):
1926                key = exp['fedid']
1927                keytype = "fedid"
1928            elif exp.has_key('localname'):
1929                key = exp['localname']
1930                keytype = "localname"
1931            else:
1932                raise service_error(service_error.req, "Unknown lookup type")
1933        else:
1934            raise service_error(service_error.req, "No request?")
1935
1936        proof = self.check_experiment_access(fid, key)
1937
1938        self.state_lock.acquire()
1939        # XXX: this needs to be recalculated
1940        if self.state.has_key(key):
1941            if self.state[key].has_key('vis'):
1942                rv =  { 'experiment' : {keytype: key },
1943                        'vis': self.state[key]['vis'],
1944                        'proof': proof.to_dict(), 
1945                        }
1946            else:
1947                state = self.state[key]['experimentStatus']
1948        self.state_lock.release()
1949
1950        if rv: return rv
1951        else:
1952            if state:
1953                raise service_error(service_error.partial, 
1954                        "Not ready: %s" % state)
1955            else:
1956                raise service_error(service_error.req, "No such experiment")
1957
1958   
1959    def save_federant_information(self, allocated, tbparams, eid, vtopo, vis,
1960            top):
1961        """
1962        Store the various data that have changed in the experiment state
1963        between when it was started and the beginning of resource allocation.
1964        This is basically the information about each local allocation.  This
1965        fills in the values of the placeholder allocation in the state.  It
1966        also collects the access proofs and returns them as dicts for a
1967        response message.
1968        """
1969        self.state_lock.acquire()
1970        # XXX:
1971        #self.state[eid]['vtopo'] = vtopo
1972        #self.state[eid]['vis'] = vis
1973        exp = self.state[eid]
1974        exp.topology = top
1975        # save federant information
1976        for k in allocated.keys():
1977            exp.add_allocation(
1978                    allocation_info(
1979                        tb=k, 
1980                        allocID=tbparams[k]['allocID'].get('fedid', None),
1981                        uri=tbparams[k]['uri'],
1982                        proof=tbparams[k]['proof'])
1983                    )
1984
1985        # Access proofs for the response message
1986        proofs = [copy.deepcopy(p) for k in tbparams.keys()\
1987                    for p in tbparams[k]['proof']]
1988        if self.state_filename: 
1989            self.write_state()
1990        self.state_lock.release()
1991        return proofs
1992
1993    def clear_placeholder(self, eid, expid, tmpdir):
1994        """
1995        Clear the placeholder and remove any allocated temporary dir.
1996        """
1997
1998        self.state_lock.acquire()
1999        del self.state[eid]
2000        del self.state[expid]
2001        if self.state_filename: self.write_state()
2002        self.state_lock.release()
2003        if tmpdir and self.cleanup:
2004            self.remove_dirs(tmpdir)
2005
2006    # end of create_experiment sub-functions
2007
2008    def create_experiment(self, req, fid):
2009        """
2010        The external interface to experiment creation called from the
2011        dispatcher.
2012
2013        Creates a working directory, splits the incoming description using the
2014        splitter script and parses out the various subsections using the
2015        classes above.  Once each sub-experiment is created, use pooled threads
2016        to instantiate them and start it all up.
2017        """
2018
2019        req = req.get('CreateRequestBody', None)
2020        if req:
2021            key = self.get_experiment_key(req)
2022        else:
2023            raise service_error(service_error.req,
2024                    "Bad request format (no CreateRequestBody)")
2025
2026        # Import information from the requester
2027        if self.auth.import_credentials(data_list=req.get('credential', [])):
2028            self.auth.save()
2029
2030        # Make sure that the caller can talk to us
2031        proof = self.check_experiment_access(fid, key)
2032
2033        # Install the testbed map entries supplied with the request into a copy
2034        # of the testbed map.
2035        tbmap = dict(self.tbmap)
2036        for m in req.get('testbedmap', []):
2037            if 'testbed' in m and 'uri' in m:
2038                tbmap[m['testbed']] = m['uri']
2039
2040        # a place to work
2041        try:
2042            tmpdir = tempfile.mkdtemp(prefix="split-")
2043            os.mkdir(tmpdir+"/keys")
2044        except EnvironmentError:
2045            raise service_error(service_error.internal, "Cannot create tmp dir")
2046
2047        tbparams = { }
2048
2049        eid, expid, expcert_file = \
2050                self.get_experiment_ids_and_start(key, tmpdir)
2051
2052        # This catches exceptions to clear the placeholder if necessary
2053        try: 
2054            if not (eid and expid):
2055                raise service_error(service_error.internal, 
2056                        "Cannot find local experiment info!?")
2057
2058            top = self.get_topology(req, tmpdir)
2059            self.confirm_software(top)
2060            # Assign the IPs
2061            hosts, ip_allocator = self.allocate_ips_to_topo(top)
2062            # Find the testbeds to look up
2063            tb_hosts = { }
2064            testbeds = [ ]
2065            for e in top.elements:
2066                if isinstance(e, topdl.Computer):
2067                    tb = e.get_attribute('testbed') or 'default'
2068                    if tb in tb_hosts: tb_hosts[tb].append(e.name)
2069                    else: 
2070                        tb_hosts[tb] = [ e.name ]
2071                        testbeds.append(tb)
2072
2073            masters, pmasters = self.get_testbed_services(req, testbeds)
2074            allocated = { }         # Testbeds we can access
2075            topo ={ }               # Sub topologies
2076            connInfo = { }          # Connection information
2077
2078            self.split_topology(top, topo, testbeds)
2079
2080            self.get_access_to_testbeds(testbeds, fid, allocated, 
2081                    tbparams, masters, tbmap, expid, expcert_file)
2082
2083            attrs = self.generate_keys_and_hosts(tmpdir, expid, hosts, tbparams)
2084
2085            part = experiment_partition(self.auth, self.store_url, tbmap,
2086                    self.muxmax, self.direct_transit)
2087            part.add_portals(top, topo, eid, pmasters, tbparams, ip_allocator,
2088                    connInfo, expid)
2089
2090            auth_attrs = set()
2091            # Now get access to the dynamic testbeds (those added above)
2092            for tb in [ t for t in topo if t not in allocated]:
2093                self.get_access(tb, tbparams, fid, masters, tbmap, 
2094                        expid, expcert_file)
2095                allocated[tb] = 1
2096                store_keys = topo[tb].get_attribute('store_keys')
2097                # Give the testbed access to keys it exports or imports
2098                if store_keys:
2099                    auth_attrs.update(set([
2100                        (tbparams[tb]['allocID']['fedid'], sk) \
2101                                for sk in store_keys.split(" ")]))
2102
2103            if auth_attrs:
2104                self.append_experiment_authorization(expid, auth_attrs)
2105
2106            # transit and disconnected testbeds may not have a connInfo entry.
2107            # Fill in the blanks.
2108            for t in allocated.keys():
2109                if not connInfo.has_key(t):
2110                    connInfo[t] = { }
2111
2112            self.wrangle_software(expid, top, topo, tbparams)
2113
2114            vtopo = topdl.topology_to_vtopo(top)
2115            vis = self.genviz(vtopo)
2116            proofs = self.save_federant_information(allocated, tbparams, 
2117                    eid, vtopo, vis, top)
2118        except service_error, e:
2119            # If something goes wrong in the parse (usually an access error)
2120            # clear the placeholder state.  From here on out the code delays
2121            # exceptions.  Failing at this point returns a fault to the remote
2122            # caller.
2123            self.clear_placeholder(eid, expid, tmpdir)
2124            raise e
2125
2126        # Start the background swapper and return the starting state.  From
2127        # here on out, the state will stick around a while.
2128
2129        # Create a logger that logs to the experiment's state object as well as
2130        # to the main log file.
2131        alloc_log = logging.getLogger('fedd.experiment_control.%s' % eid)
2132        alloc_collector = self.list_log(self.state[eid].log)
2133        h = logging.StreamHandler(alloc_collector)
2134        # XXX: there should be a global one of these rather than repeating the
2135        # code.
2136        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2137                    '%d %b %y %H:%M:%S'))
2138        alloc_log.addHandler(h)
2139
2140        # Start a thread to do the resource allocation
2141        t  = Thread(target=self.allocate_resources,
2142                args=(allocated, masters, eid, expid, tbparams, 
2143                    top, topo, tmpdir, alloc_log, alloc_collector, attrs,
2144                    connInfo, tbmap, expcert_file),
2145                name=eid)
2146        t.start()
2147
2148        rv = {
2149                'experimentID': [
2150                    {'localname' : eid }, { 'fedid': copy.copy(expid) }
2151                ],
2152                'experimentStatus': 'starting',
2153                'proof': [ proof.to_dict() ] + proofs,
2154            }
2155
2156        return rv
2157   
2158    def get_experiment_fedid(self, key):
2159        """
2160        find the fedid associated with the localname key in the state database.
2161        """
2162
2163        rv = None
2164        self.state_lock.acquire()
2165        if key in self.state:
2166            rv = self.state[key].fedid
2167        self.state_lock.release()
2168        return rv
2169
2170    def check_experiment_access(self, fid, key):
2171        """
2172        Confirm that the fid has access to the experiment.  Though a request
2173        may be made in terms of a local name, the access attribute is always
2174        the experiment's fedid.
2175        """
2176        if not isinstance(key, fedid):
2177            key = self.get_experiment_fedid(key)
2178
2179        access_ok, proof = self.auth.check_attribute(fid, key, with_proof=True)
2180
2181        if access_ok:
2182            return proof
2183        else:
2184            raise service_error(service_error.access, "Access Denied",
2185                proof)
2186
2187
2188    def get_handler(self, path, fid):
2189        """
2190        Perhaps surprisingly named, this function handles HTTP GET requests to
2191        this server (SOAP requests are POSTs).
2192        """
2193        self.log.info("Get handler %s %s" % (path, fid))
2194        # XXX: log proofs?
2195        if self.auth.check_attribute(fid, path):
2196            return ("%s/%s" % (self.repodir, path), "application/binary")
2197        else:
2198            return (None, None)
2199
2200   
2201    def get_info(self, req, fid):
2202        """
2203        Return all the stored info about this experiment
2204        """
2205        rv = None
2206
2207        req = req.get('InfoRequestBody', None)
2208        if not req:
2209            raise service_error(service_error.req,
2210                    "Bad request format (no InfoRequestBody)")
2211        exp = req.get('experiment', None)
2212        if exp:
2213            if exp.has_key('fedid'):
2214                key = exp['fedid']
2215                keytype = "fedid"
2216            elif exp.has_key('localname'):
2217                key = exp['localname']
2218                keytype = "localname"
2219            else:
2220                raise service_error(service_error.req, "Unknown lookup type")
2221        else:
2222            raise service_error(service_error.req, "No request?")
2223
2224        proof = self.check_experiment_access(fid, key)
2225
2226        self.state_lock.acquire()
2227        if self.state.has_key(key):
2228            rv = self.state[key].get_info()
2229        self.state_lock.release()
2230
2231        if rv:
2232            rv['proof'] = proof.to_dict()
2233            return rv
2234        else:
2235            raise service_error(service_error.req, "No such experiment")
2236
2237    def get_multi_info(self, req, fid):
2238        """
2239        Return all the stored info that this fedid can access
2240        """
2241        rv = { 'info': [ ], 'proof': [ ] }
2242
2243        self.state_lock.acquire()
2244        for key in [ k for k in self.state.keys() if isinstance(k, fedid)]:
2245            try:
2246                proof = self.check_experiment_access(fid, key)
2247            except service_error, e:
2248                if e.code == service_error.access:
2249                    continue
2250                else:
2251                    self.state_lock.release()
2252                    raise e
2253
2254            if self.state.has_key(key):
2255                e = self.state[key].get_info()
2256                e['proof'] = proof.to_dict()
2257                rv['info'].append(e)
2258                rv['proof'].append(proof.to_dict())
2259        self.state_lock.release()
2260        return rv
2261
2262    def check_termination_status(self, fed_exp, force):
2263        """
2264        Confirm that the experiment is sin a valid state to stop (or force it)
2265        return the state - invalid states for deletion and force settings cause
2266        exceptions.
2267        """
2268        self.state_lock.acquire()
2269        status = fed_exp.status
2270
2271        if status:
2272            if status in ('starting', 'terminating'):
2273                if not force:
2274                    self.state_lock.release()
2275                    raise service_error(service_error.partial, 
2276                            'Experiment still being created or destroyed')
2277                else:
2278                    self.log.warning('Experiment in %s state ' % status + \
2279                            'being terminated by force.')
2280            self.state_lock.release()
2281            return status
2282        else:
2283            # No status??? trouble
2284            self.state_lock.release()
2285            raise service_error(service_error.internal,
2286                    "Experiment has no status!?")
2287
2288
2289    def get_termination_info(self, fed_exp):
2290        ids = []
2291        term_params = { }
2292        self.state_lock.acquire()
2293        ids = [ x for x in (fed_exp.localname, fed_exp.fedid) if x is not None ]
2294        expcert = fed_exp.identity
2295        repo = "%s" % fed_exp.fedid
2296
2297        # Collect the allocation/segment ids into a dict keyed by the fedid
2298        # of the allocation that contains a tuple of uri, aid
2299        for i, fed in enumerate(fed_exp.get_all_allocations()):
2300            uri = fed.uri
2301            aid = fed.allocID
2302            term_params[aid] = (uri, aid)
2303        # Change the experiment state
2304        fed_exp.status = 'terminating'
2305        if self.state_filename: self.write_state()
2306        self.state_lock.release()
2307
2308        return ids, term_params, expcert, repo
2309
2310
2311    def deallocate_resources(self, term_params, expcert, status, force, 
2312            dealloc_log):
2313        tmpdir = None
2314        # This try block makes sure the tempdir is cleared
2315        try:
2316            # If no expcert, try the deallocation as the experiment
2317            # controller instance.
2318            if expcert and self.auth_type != 'legacy': 
2319                try:
2320                    tmpdir = tempfile.mkdtemp(prefix="term-")
2321                except EnvironmentError:
2322                    raise service_error(service_error.internal, 
2323                            "Cannot create tmp dir")
2324                cert_file = self.make_temp_certfile(expcert, tmpdir)
2325                pw = None
2326            else: 
2327                cert_file = self.cert_file
2328                pw = self.cert_pwd
2329
2330            # Stop everyone.  NB, wait_for_all waits until a thread starts
2331            # and then completes, so we can't wait if nothing starts.  So,
2332            # no tbparams, no start.
2333            if len(term_params) > 0:
2334                tp = thread_pool(self.nthreads)
2335                for k, (uri, aid) in term_params.items():
2336                    # Create and start a thread to stop the segment
2337                    tp.wait_for_slot()
2338                    t  = pooled_thread(\
2339                            target=self.terminate_segment(log=dealloc_log,
2340                                testbed=uri,
2341                                cert_file=cert_file, 
2342                                cert_pwd=pw,
2343                                trusted_certs=self.trusted_certs,
2344                                caller=self.call_TerminateSegment),
2345                            args=(uri, aid), name=k,
2346                            pdata=tp, trace_file=self.trace_file)
2347                    t.start()
2348                # Wait for completions
2349                tp.wait_for_all_done()
2350
2351            # release the allocations (failed experiments have done this
2352            # already, and starting experiments may be in odd states, so we
2353            # ignore errors releasing those allocations
2354            try: 
2355                for k, (uri, aid)  in term_params.items():
2356                    self.release_access(None, aid, uri=uri,
2357                            cert_file=cert_file, cert_pwd=pw)
2358            except service_error, e:
2359                if status != 'failed' and not force:
2360                    raise e
2361
2362        # Clean up the tmpdir no matter what
2363        finally:
2364            if tmpdir: self.remove_dirs(tmpdir)
2365
2366    def terminate_experiment(self, req, fid):
2367        """
2368        Swap this experiment out on the federants and delete the shared
2369        information
2370        """
2371        tbparams = { }
2372        req = req.get('TerminateRequestBody', None)
2373        if not req:
2374            raise service_error(service_error.req,
2375                    "Bad request format (no TerminateRequestBody)")
2376
2377        key = self.get_experiment_key(req, 'experiment')
2378        proof = self.check_experiment_access(fid, key)
2379        exp = req.get('experiment', False)
2380        force = req.get('force', False)
2381
2382        dealloc_list = [ ]
2383
2384
2385        # Create a logger that logs to the dealloc_list as well as to the main
2386        # log file.
2387        dealloc_log = logging.getLogger('fedd.experiment_control.%s' % key)
2388        h = logging.StreamHandler(self.list_log(dealloc_list))
2389        # XXX: there should be a global one of these rather than repeating the
2390        # code.
2391        h.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s",
2392                    '%d %b %y %H:%M:%S'))
2393        dealloc_log.addHandler(h)
2394
2395        self.state_lock.acquire()
2396        fed_exp = self.state.get(key, None)
2397        self.state_lock.release()
2398        repo = None
2399
2400        if fed_exp:
2401            status = self.check_termination_status(fed_exp, force)
2402            ids, term_params, expcert, repo = self.get_termination_info(fed_exp)
2403            self.deallocate_resources(term_params, expcert, status, force, 
2404                    dealloc_log)
2405
2406            # Remove the terminated experiment
2407            self.state_lock.acquire()
2408            for id in ids:
2409                self.clear_experiment_authorization(id, need_state_lock=False)
2410                if id in self.state: del self.state[id]
2411
2412            if self.state_filename: self.write_state()
2413            self.state_lock.release()
2414
2415            # Delete any synch points associated with this experiment.  All
2416            # synch points begin with the fedid of the experiment.
2417            fedid_keys = set(["fedid:%s" % f for f in ids \
2418                    if isinstance(f, fedid)])
2419            for k in self.synch_store.all_keys():
2420                try:
2421                    if len(k) > 45 and k[0:46] in fedid_keys:
2422                        self.synch_store.del_value(k)
2423                except synch_store.BadDeletionError:
2424                    pass
2425            self.write_store()
2426
2427            # Remove software and other cached stuff from the filesystem.
2428            if repo:
2429                self.remove_dirs("%s/%s" % (self.repodir, repo))
2430       
2431            return { 
2432                    'experiment': exp , 
2433                    'deallocationLog': string.join(dealloc_list, ''),
2434                    'proof': [proof.to_dict()],
2435                    }
2436        else:
2437            raise service_error(service_error.req, "No saved state")
2438
2439
2440    def GetValue(self, req, fid):
2441        """
2442        Get a value from the synchronized store
2443        """
2444        req = req.get('GetValueRequestBody', None)
2445        if not req:
2446            raise service_error(service_error.req,
2447                    "Bad request format (no GetValueRequestBody)")
2448       
2449        name = req.get('name', None)
2450        wait = req.get('wait', False)
2451        rv = { 'name': name }
2452
2453        if not name:
2454            raise service_error(service_error.req, "No name?")
2455
2456        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2457
2458        if access_ok:
2459            self.log.debug("[GetValue] asking for %s " % name)
2460            try:
2461                v = self.synch_store.get_value(name, wait)
2462            except synch_store.RevokedKeyError:
2463                # No more synch on this key
2464                raise service_error(service_error.federant, 
2465                        "Synch key %s revoked" % name)
2466            if v is not None:
2467                rv['value'] = v
2468            rv['proof'] = proof.to_dict()
2469            self.log.debug("[GetValue] got %s from %s" % (v, name))
2470            return rv
2471        else:
2472            raise service_error(service_error.access, "Access Denied",
2473                    proof=proof)
2474       
2475
2476    def SetValue(self, req, fid):
2477        """
2478        Set a value in the synchronized store
2479        """
2480        req = req.get('SetValueRequestBody', None)
2481        if not req:
2482            raise service_error(service_error.req,
2483                    "Bad request format (no SetValueRequestBody)")
2484       
2485        name = req.get('name', None)
2486        v = req.get('value', '')
2487
2488        if not name:
2489            raise service_error(service_error.req, "No name?")
2490
2491        access_ok, proof = self.auth.check_attribute(fid, name, with_proof=True)
2492
2493        if access_ok:
2494            try:
2495                self.synch_store.set_value(name, v)
2496                self.write_store()
2497                self.log.debug("[SetValue] set %s to %s" % (name, v))
2498            except synch_store.CollisionError:
2499                # Translate into a service_error
2500                raise service_error(service_error.req,
2501                        "Value already set: %s" %name)
2502            except synch_store.RevokedKeyError:
2503                # No more synch on this key
2504                raise service_error(service_error.federant, 
2505                        "Synch key %s revoked" % name)
2506                return { 'name': name, 'value': v, 'proof': proof.to_dict() }
2507        else:
2508            raise service_error(service_error.access, "Access Denied",
2509                    proof=proof)
Note: See TracBrowser for help on using the repository browser.